00001
package com.quadcap.sql.file;
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
import java.io.BufferedInputStream;
00042
import java.io.EOFException;
00043
import java.io.File;
00044
import java.io.IOException;
00045
import java.io.InputStream;
00046
import java.io.RandomAccessFile;
00047
00048
import java.util.Properties;
00049
00050
import com.quadcap.sql.io.ObjectOutputStream;
00051
import com.quadcap.sql.io.ObjectInputStream;
00052
00053
import com.quadcap.util.collections.LongMap;
00054
00055
import com.quadcap.util.Debug;
00056
import com.quadcap.util.Util;
00057
00058
00059
00060
00061
00062
00063 public class Logger1 implements Logger {
00064 LogBuffer cb;
00065 InputStream
cbIn = null;
00066 ObjectOutputStream
oos = null;
00067 ByteArrayRandomAccess bra =
new ByteArrayRandomAccess();
00068 RandomAccessInputStream ris =
new RandomAccessInputStream(bra);
00069 ObjectInputStream
ois =
new ObjectInputStream(ris);
00070 byte[]
tmp =
new byte[8];
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080 TransMap[]
trans =
new TransMap[16];
00081
00082
00083
00084
00085
00086
00087 int numTrans;
00088
00089
00090
00091
00092
00093 int liveTrans = 0;
00094
00095
00096
00097
00098 long prevOp;
00099
00100 int maxSize = 128 * 1024 * 1024;
00101 int minSize = 128 * 1024;
00102
00103 Log myLog;
00104 Datafile db;
00105
00106 Logger1() {}
00107
00108 public void init(
Log log,
boolean create, Properties props)
00109
throws IOException
00110 {
00111
this.myLog = log;
00112
this.db = log.getDatafile();
00113
this.cb =
new LogBuffer();
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
maxSize = Integer.parseInt(props.getProperty(
"maxLogSize",
"" +
maxSize));
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
minSize = Integer.parseInt(props.getProperty(
"minLogSize",
"" +
minSize));
00138
00139 File logfile =
new File(
db.
getDbRootDir(),
"logfile");
00140 RandomAccessFile raf =
new RandomAccessFile(logfile,
"rw");
00141
FileRandomAccess fra =
new FileRandomAccess(raf,
maxSize);
00142
RandomAccess ra = fra;
00143
if (create) {
00144
cb.
init(ra,
maxSize);
00145 }
else {
00146
lastSize = ra.
size();
00147
cb.
init(ra, props);
00148 }
00149 }
00150
00151 public void init(File file)
throws IOException {
00152 RandomAccessFile raf =
new RandomAccessFile(file,
"rw");
00153
FileRandomAccess fra =
new FileRandomAccess(raf, raf.length());
00154
RandomAccess ra = fra;
00155
this.cb =
new LogBuffer();
00156
lastSize = ra.
size();
00157
cb.
init(ra,
new Properties());
00158 }
00159
00160
00161
00162
00163
00164 class TransMap {
00165 long transId;
00166
00167
00168
00169
00170 int bufStart;
00171
00172
00173
00174
00175 int bufEnd;
00176 boolean complete;
00177
00178 public void init(
long t,
int p) {
00179
this.transId = t;
00180
this.bufStart = p;
00181
this.bufEnd = p;
00182
this.complete =
false;
00183 }
00184
00185 public void init(
TransMap t) {
00186
this.transId = t.
transId;
00187
this.bufStart = -1;
00188
this.bufEnd = -1;
00189
this.complete = t.
complete;
00190 }
00191
00192 public void copy(
TransMap t) {
00193
this.transId = t.
transId;
00194
this.bufStart = t.
bufStart;
00195
this.bufEnd = t.
bufEnd;
00196
this.complete = t.
complete;
00197 }
00198
00199 public String
toString() {
00200
return "TransMap[" +
transId +
": " +
bufStart +
"-" +
bufEnd +
00201 (
complete ?
" (COMPLETE)" :
"") +
"]";
00202 }
00203 }
00204
00205 public String
toString() {
00206 StringBuffer sb =
new StringBuffer(
"Logger1 {" +
cb.
getCheckpoint() +
00207
", " +
cb.
getEnd() +
"} ");
00208
for (
int i = 0; i <
trans.length; i++) {
00209
TransMap tm =
trans[i];
00210
if (tm != null && !tm.
complete) {
00211 sb.append(tm.
toString());
00212 }
00213 }
00214
return sb.
toString();
00215 }
00216
00217 public int getCheckpoint() {
00218
return cb.
getCheckpoint();
00219 }
00220
00221 public int getEnd() {
00222
return cb.
getEnd();
00223 }
00224
00225 public int rput(
LogEntry op)
throws IOException {
00226
int pos =
cb.
getEnd();
00227 op.setPosition(pos);
00228
00229
long id = op.getTransactionId();
00230
int tx =
findTransaction(
id);
00231
if (tx >= 0) {
00232
TransMap tm =
trans[tx];
00233 op.setPrev(tm.
bufEnd);
00234 tm.
bufEnd = pos;
00235 }
00236
00237
try {
00238
if (
oos == null) {
00239
oos =
new ObjectOutputStream(
cb.
getOutputStream());
00240 }
00241
oos.writeObject(op);
00242
oos.flush();
00243 }
catch (IOException ex) {
00244
if (tx >= 0) {
00245
trans[tx].
bufEnd = op.getPrev();
00246 }
00247
throw ex;
00248 }
00249
return pos;
00250 }
00251
00252 public void put(
LogEntry op)
throws IOException {
00253
int pos = rput(op);
00254
switch (op.getCode()) {
00255
case LogEntry.BEGIN_TRANSACTION:
00256
beginTransaction(op.getTransactionId(), pos);
00257
break;
00258
case LogEntry.COMMIT:
00259
endTransaction(op.getTransactionId());
00260
break;
00261 }
00262
00263
00264 }
00265
00266 public void setRedoState(
LogEntry op,
int state)
throws IOException {
00267
int pos = op.getPosition();
00268
00269
cb.
writeByte(pos+1, (byte)state);
00270 op.redoState = state;
00271 }
00272
00273 public LogEntry getLastOp(
long transId)
throws IOException {
00274
LogEntry ret = null;
00275
int tx =
findTransaction(transId);
00276
if (tx >= 0) {
00277
TransMap tm =
trans[tx];
00278 ret =
readEntry(tm.
bufEnd);
00279 }
00280
00281
if (ret == null) {
00282
Debug.println(
Util.stackTrace());
00283
Debug.println(
"getLastOp(" + transId +
") = null, trans:");
00284
for (
int i = 0; i <
numTrans; i++) {
00285
Debug.println(
"trans[" + i +
"] = " +
trans[i]);
00286 }
00287 }
00288
00289
return ret;
00290 }
00291
00292 public LogEntry getPrevOp(
LogEntry op)
throws IOException {
00293
int pos = op.
getPrev();
00294
int len = op.getPosition() - pos;
00295
LogEntry ret =
readEntry(pos, len);
00296
return ret;
00297 }
00298
00299 public LogEntry getFirstOp() throws IOException {
00300
LogEntry ret =
readEntry(
cb.
getBegin());
00301
return ret;
00302 }
00303
00304 public LogEntry getNextOp() throws IOException {
00305
return readEntry();
00306 }
00307
00308 public void sync() throws IOException {
00309
cb.
sync();
00310 }
00311
00312
00313 public void close() throws IOException {
00314
cb.
close();
00315 }
00316
00317 long lastSize = 0;
00318 public void reset() throws IOException {
00319
cb.
reset();
00320
if (
lastSize -
cb.
size() > 100 * 1000) {
00321
cb.
truncate();
00322 }
00323
lastSize =
cb.
size();
00324 }
00325
00326 public long getOldestTransaction() {
00327
if (
numTrans > 0) {
00328
return trans[0].
transId;
00329 }
else {
00330
return -1;
00331 }
00332 }
00333
00334 public int getActiveTransactionCount() {
00335
return liveTrans;
00336 }
00337
00338 public LongMap getActiveTransactions() {
00339
LongMap map =
new LongMap(16);
00340
for (
int i = 0; i <
numTrans; i++) {
00341
TransMap t =
trans[i];
00342
if (!t.
complete) {
00343 map.
put(t.
transId,
"");
00344 }
00345 }
00346
return map;
00347 }
00348
00349 public void checkpoint() throws IOException {
00350
int offset = 0;
00351
for (
int i = 0; i <
numTrans; i++) {
00352
TransMap t =
trans[i];
00353
if (t.
complete) {
00354 offset++;
00355 }
else if (offset > 0) {
00356 trans[i-offset].
init(t);
00357 }
00358 }
00359
00360
if (
Trace.bit(22)) {
00361
Debug.println(
"BEGIN checkpoint [" +
cb.
getBegin() +
"-" +
00362
cb.
getEnd() +
" (" + numTrans +
" transactions)]");
00363 }
00364
00365 numTrans -= offset;
00366
00367
LogEntry entry =
readEntry(0);
00368
cb.
reset();
00369
if (numTrans > 0) {
00370
while (entry != null) {
00371
boolean keep =
false;
00372
long id = entry.
getTransactionId();
00373
int tx =
findTransaction(
id);
00374
if (tx >= 0) {
00375
TransMap tm =
trans[tx];
00376 keep = !tm.
complete;
00377
if (keep) {
00378
if (tm.
bufStart < 0) tm.
bufStart =
cb.
getEnd();
00379 }
00380 }
00381
if (keep) {
00382
00383
if (
Trace.bit(23)) {
00384
Debug.println(
" [" +
cb.
getEnd() +
"] -> " + entry);
00385 }
00386
00387 rput(entry);
00388 }
else {
00389
00390
if (
Trace.bit(23)) {
00391
Debug.println(
" [" +
cb.
getEnd() +
"] -> DISCARD: " + entry);
00392 }
00393
00394 entry.
discard(
db);
00395 }
00396 entry =
readEntry();
00397 }
00398 }
00399
cb.
checkpoint();
00400
00401
if (
Trace.bit(22)) {
00402
Debug.println(
"END checkpoint [" +
cb.
getBegin() +
"-" +
cb.
getEnd() +
00403
" (" + numTrans +
" transactions)]");
00404 }
00405
00406 }
00407
00408
00409
00410 private final void beginTransaction(
long transId,
int pos) {
00411
if (
numTrans >=
trans.length) {
00412
int newcap =
numTrans + (
numTrans/2) + 2;
00413
trans = (
TransMap[])
Util.checkCapacity(
trans, newcap);
00414 }
00415
TransMap t =
trans[
numTrans];
00416
if (t == null) {
00417 t =
new TransMap();
00418 trans[
numTrans] = t;
00419 }
00420
00421
00422
00423
00424
00425
for (
int i =
numTrans-1; i >= 0; i--) {
00426
if (trans[i].
transId > transId) {
00427 trans[i+1].
copy(trans[i]);
00428 t = trans[i];
00429 }
else {
00430
break;
00431 }
00432 }
00433 numTrans++;
00434
00435
liveTrans++;
00436 t.
init(transId, pos);
00437
00438 }
00439
00440 private final void endTransaction(
final long transId)
throws IOException {
00441
00442
int pos =
findTransaction(transId);
00443
if (pos >= 0) {
00444
00445
00446
00447
00448
trans[pos].
complete =
true;
00449
liveTrans--;
00450
if (pos == 0 &&
cb.
size() >
minSize) {
00451
checkpoint();
00452 }
00453 }
00454 }
00455
00456
00457
00458
00459 int findTransaction(
long transId) {
00460
int lo = 0;
00461
int hi =
numTrans-1;
00462
while (hi >= lo) {
00463
int mid = (hi+lo) / 2;
00464
TransMap t =
trans[mid];
00465
if (t.
transId == transId) {
00466
return mid;
00467 }
00468
if (t.
transId < transId) {
00469 lo = mid+1;
00470 }
else {
00471 hi = mid-1;
00472 }
00473 }
00474
return -1;
00475 }
00476
00477 private final LogEntry readEntry(
int pos,
int len)
throws IOException {
00478
if (pos < 0)
return null;
00479
bra.
resize(len);
00480 byte[] buf =
bra.
getBytes();
00481
cb.
read(pos, buf, 0, len);
00482
ris.
setPosition(0);
00483
ois.setInputStream(
ris);
00484
ois.setPosition(pos);
00485
00486
LogEntry ret =
readEntry();
00487
return ret;
00488 }
00489
00490 private final LogEntry readEntry(
int pos)
throws IOException {
00491
if (pos < 0)
return null;
00492
cbIn =
new BufferedInputStream(
cb.
getInputStream(pos));
00493
ois.setInputStream(
cbIn);
00494
ois.setPosition(pos);
00495
00496
LogEntry ret =
readEntry();
00497
return ret;
00498 }
00499
00500 private final LogEntry readEntry() throws IOException {
00501 Object obj = null;
00502
try {
00503
int pos = (
int)
ois.getPosition();
00504
LogEntry ret = (
LogEntry)(obj =
ois.readObject());
00505
if (ret != null) {
00506 ret.
setPosition(pos);
00507 }
00508
return ret;
00509 }
catch (EOFException ex) {
00510
return null;
00511 }
catch (ClassCastException ez) {
00512
throw new DatafileException(ez);
00513 }
catch (ClassNotFoundException ex) {
00514
throw new DatafileException(ex);
00515 }
00516 }
00517 }
00518