00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.ft.protocols.cic.managers;
00032
00033 import java.io.IOException;
00034 import java.rmi.RemoteException;
00035 import java.util.Enumeration;
00036 import java.util.Hashtable;
00037 import java.util.Iterator;
00038 import java.util.List;
00039 import java.util.ListIterator;
00040 import java.util.Vector;
00041
00042 import org.apache.log4j.Logger;
00043 import org.objectweb.proactive.Body;
00044 import org.objectweb.proactive.core.ProActiveException;
00045 import org.objectweb.proactive.core.UniqueID;
00046 import org.objectweb.proactive.core.body.AbstractBody;
00047 import org.objectweb.proactive.core.body.UniversalBody;
00048 import org.objectweb.proactive.core.body.ft.checkpointing.Checkpoint;
00049 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00050 import org.objectweb.proactive.core.body.ft.exception.ProtocolErrorException;
00051 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00052 import org.objectweb.proactive.core.body.ft.internalmsg.GlobalStateCompletion;
00053 import org.objectweb.proactive.core.body.ft.internalmsg.OutputCommit;
00054 import org.objectweb.proactive.core.body.ft.message.HistoryUpdater;
00055 import org.objectweb.proactive.core.body.ft.message.MessageInfo;
00056 import org.objectweb.proactive.core.body.ft.message.MessageLog;
00057 import org.objectweb.proactive.core.body.ft.message.ReplyLog;
00058 import org.objectweb.proactive.core.body.ft.message.RequestLog;
00059 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00060 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.CheckpointInfoCIC;
00061 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.MessageInfoCIC;
00062 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00063 import org.objectweb.proactive.core.body.future.FutureResult;
00064 import org.objectweb.proactive.core.body.message.Message;
00065 import org.objectweb.proactive.core.body.reply.Reply;
00066 import org.objectweb.proactive.core.body.reply.ReplyImpl;
00067 import org.objectweb.proactive.core.body.request.AwaitedRequest;
00068 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00069 import org.objectweb.proactive.core.body.request.BlockingRequestQueueImpl;
00070 import org.objectweb.proactive.core.body.request.Request;
00071 import org.objectweb.proactive.core.body.request.RequestImpl;
00072 import org.objectweb.proactive.core.mop.Utils;
00073 import org.objectweb.proactive.core.util.CircularArrayList;
00074 import org.objectweb.proactive.core.util.MutableLong;
00075 import org.objectweb.proactive.core.util.log.Loggers;
00076 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00077 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00078
00079
00086 public class FTManagerCIC
00087 extends org.objectweb.proactive.core.body.ft.protocols.FTManager {
00088
00090 public static final int RESEND_MESSAGE = -3;
00091
00093 public static final int RECOVER = -4;
00094
00095
00096 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_CIC);
00097
00098
00099
00100
00101 private int incarnation;
00102 private int lastRecovery;
00103 private int checkpointIndex;
00104 private long checkpointTimer;
00105 private int nextMax;
00106
00107
00108 private int historyIndex;
00109
00110
00111 private Hashtable requestToResend;
00112 private int latestRequestLog;
00113 private Hashtable replyToResend;
00114 private int latestReplyLog;
00115
00116
00117 private Vector awaitedRequests;
00118
00119
00120 private MessageInfoCIC forSentRequest;
00121 private MessageInfoCIC forSentReply;
00122
00123
00124 private Vector history;
00125
00126
00127 private final Character historyLock = new Character('l');
00128
00129
00130 private long deliveredRequestsCounter;
00131 private MutableLong lastServedRequestIndex;
00132 private Hashtable localVectorClock;
00133 private long historyBaseIndex;
00134 private long lastCommitedIndex;
00135 private boolean completingCheckpoint;
00136
00137
00138 private final static boolean isOCEnable = false;
00139
00140 public int init(AbstractBody owner) throws ProActiveException {
00141 super.init(owner);
00142 this.incarnation = 1;
00143 this.checkpointIndex = 0;
00144 this.historyIndex = 0;
00145 this.nextMax = 1;
00146
00147 this.lastRecovery = 0;
00148 this.checkpointTimer = System.currentTimeMillis();
00149 this.requestToResend = new Hashtable();
00150 this.latestRequestLog = 0;
00151 this.replyToResend = new Hashtable();
00152 this.latestReplyLog = 0;
00153 this.history = new Vector();
00154 this.awaitedRequests = new Vector();
00155 this.forSentRequest = new MessageInfoCIC();
00156 this.forSentReply = new MessageInfoCIC();
00157 this.deliveredRequestsCounter = -1;
00158 this.lastCommitedIndex = -1;
00159 this.lastServedRequestIndex = new MutableLong(0);
00160 this.historyBaseIndex = 0;
00161 this.completingCheckpoint = false;
00162 this.localVectorClock = new Hashtable();
00163 this.localVectorClock.put(this.ownerID, this.lastServedRequestIndex);
00164 logger.info(" CIC fault-tolerance is enabled for body " + this.ownerID);
00165 return 0;
00166 }
00167
00168 public int onReceiveReply(Reply reply) {
00169 reply.setFTManager(this);
00170 return this.incarnationTest(reply);
00171 }
00172
00173 public int onReceiveRequest(Request request) {
00174 request.setFTManager(this);
00175 return this.incarnationTest(request);
00176 }
00177
00178
00179
00180
00181
00182 private int incarnationTest(Message m) {
00183 if (this.isSignificant(m)) {
00184 MessageInfoCIC mi = (MessageInfoCIC) m.getMessageInfo();
00185 int localInt = this.incarnation;
00186 int inc = mi.incarnation;
00187 if (inc > localInt) {
00188
00189 m.setIgnoreIt(true);
00190 return FTManagerCIC.RESEND_MESSAGE;
00191 } else if (inc < localInt) {
00192
00193 m.setIgnoreIt(true);
00194 return FTManagerCIC.RECOVER;
00195 }
00196 }
00197 return 0;
00198 }
00199
00200 public synchronized int onDeliverReply(Reply reply) {
00201 int currentCheckpointIndex = this.checkpointIndex;
00202 if (this.isSignificant(reply)) {
00203 MessageInfoCIC mi = (MessageInfoCIC) reply.getMessageInfo();
00204
00205
00206 this.updateHistory(mi.historyIndex);
00207
00208 if (mi.checkpointIndex > currentCheckpointIndex) {
00209 this.nextMax = Math.max(this.nextMax, mi.checkpointIndex);
00210 }
00211 }
00212 return currentCheckpointIndex;
00213 }
00214
00215 public synchronized int onDeliverRequest(Request request) {
00216 int currentCheckpointIndex = this.checkpointIndex;
00217
00218
00219 if (this.isSignificant(request)) {
00220
00221 MessageInfoCIC mi = (MessageInfoCIC) request.getMessageInfo();
00222
00223
00224 this.updateHistory(mi.historyIndex);
00225
00226 if (!(this.updateAwaitedRequests(request))) {
00227 if (FTManagerCIC.isOCEnable || this.completingCheckpoint) {
00228 synchronized (historyLock) {
00229
00230 history.add(request.getSourceBodyID());
00231 }
00232
00233
00234 this.deliveredRequestsCounter++;
00235
00236 if (FTManagerCIC.isOCEnable) {
00237
00238 mi.positionInHistory = this.deliveredRequestsCounter;
00239
00240 this.updateLocalVectorClock(mi.vectorClock);
00241 }
00242 }
00243 } else {
00244
00245 request.setIgnoreIt(true);
00246 }
00247
00248
00249 int ckptIndex = mi.checkpointIndex;
00250 if (ckptIndex > currentCheckpointIndex) {
00251 this.nextMax = Math.max(this.nextMax, ckptIndex);
00252
00253
00254 mi.isOrphanFor = (char) ckptIndex;
00255
00256 }
00257 }
00258 return currentCheckpointIndex;
00259 }
00260
00261
00262
00263
00264 private void updateHistory(int index) {
00265 if (index > this.historyIndex) {
00266
00267 this.commitHistories(this.checkpointIndex,
00268 this.deliveredRequestsCounter, true, true);
00269 if (this.completingCheckpoint) {
00270 this.completingCheckpoint = false;
00271 }
00272 }
00273 }
00274
00275
00276
00277
00278
00279 private boolean isSignificant(Message m) {
00280 return ((m.getMessageInfo() != null) &&
00281 (!m.getMessageInfo().isFromHalfBody()));
00282 }
00283
00284
00285
00286
00287
00288 private void updateLocalVectorClock(Hashtable vectorClock) {
00289 Enumeration ids = vectorClock.keys();
00290 MutableLong localClock;
00291 MutableLong senderClock = null;
00292 UniqueID id = null;
00293 while (ids.hasMoreElements()) {
00294 id = (UniqueID) (ids.nextElement());
00295 localClock = (MutableLong) (this.localVectorClock.get(id));
00296 senderClock = (MutableLong) (vectorClock.get(id));
00297 if (localClock == null) {
00298
00299 this.localVectorClock.put(id,
00300 new MutableLong(senderClock.getValue()));
00301 } else if (localClock.isLessThan(senderClock)) {
00302
00303 localClock.setValue(senderClock.getValue());
00304 }
00305 }
00306 }
00307
00308 public synchronized int onSendReplyBefore(Reply reply) {
00309
00310 this.forSentReply.checkpointIndex = (char) this.checkpointIndex;
00311 this.forSentReply.historyIndex = (char) this.historyIndex;
00312 this.forSentReply.incarnation = (char) this.incarnation;
00313 this.forSentReply.lastRecovery = (char) this.lastRecovery;
00314 this.forSentReply.isOrphanFor = Character.MAX_VALUE;
00315 this.forSentReply.fromHalfBody = false;
00316 this.forSentReply.vectorClock = null;
00317 reply.setMessageInfo(this.forSentReply);
00318
00319
00320 if (FTManagerCIC.isOCEnable && this.isOutputCommit(reply)) {
00321 try {
00322 if (logger.isDebugEnabled()) {
00323 logger.debug(this.ownerID +
00324 " is output commiting for reply " + reply);
00325 }
00326 this.storage.outputCommit(this.forSentReply);
00327 } catch (RemoteException e) {
00328 logger.error("**ERROR** Cannot perform output commit");
00329 e.printStackTrace();
00330 }
00331 }
00332
00333 return 0;
00334 }
00335
00336 public synchronized int onSendRequestBefore(Request request) {
00337
00338 this.forSentRequest.checkpointIndex = (char) this.checkpointIndex;
00339 this.forSentRequest.historyIndex = (char) this.historyIndex;
00340 this.forSentRequest.incarnation = (char) this.incarnation;
00341 this.forSentRequest.lastRecovery = (char) this.lastRecovery;
00342 this.forSentRequest.isOrphanFor = Character.MAX_VALUE;
00343 this.forSentRequest.fromHalfBody = false;
00344 if (FTManagerCIC.isOCEnable) {
00345 this.forSentRequest.vectorClock = this.localVectorClock;
00346 }
00347 request.setMessageInfo(this.forSentRequest);
00348
00349
00350 if (FTManagerCIC.isOCEnable && this.isOutputCommit(request)) {
00351 try {
00352 if (logger.isDebugEnabled()) {
00353 logger.debug(this.ownerID +
00354 " is output commiting for request " + request);
00355 }
00356 this.storage.outputCommit(this.forSentRequest);
00357 } catch (RemoteException e) {
00358 logger.error("**ERROR** Cannot perform output commit");
00359 e.printStackTrace();
00360 }
00361 }
00362 return 0;
00363 }
00364
00365
00366
00367
00368
00369 private boolean isOutputCommit(Message m) {
00370 if (FTManagerCIC.isOCEnable) {
00371 Request r = (Request) m;
00372 if (r.getMethodName().equals("logEvent")) {
00373 return true;
00374 } else {
00375 return false;
00376 }
00377 } else {
00378 return false;
00379 }
00380 }
00381
00382 public synchronized int onSendReplyAfter(Reply reply, int rdvValue,
00383 UniversalBody destination) {
00384
00385 if (rdvValue == FTManagerCIC.RESEND_MESSAGE) {
00386 try {
00387 reply.setIgnoreIt(false);
00388 Thread.sleep(FTManager.TIME_TO_RESEND);
00389 int rdvValueBis = sendReply(reply, destination);
00390 return this.onSendReplyAfter(reply, rdvValueBis, destination);
00391 } catch (InterruptedException e) {
00392 e.printStackTrace();
00393 }
00394 }
00395 int currentCheckpointIndex = this.checkpointIndex;
00396
00397
00398 if (rdvValue > currentCheckpointIndex) {
00399 this.nextMax = Math.max(this.nextMax, rdvValue);
00400
00401 this.extendReplyLog(rdvValue);
00402
00403 try {
00404 Reply toLog = null;
00405
00406
00407
00408
00409
00410
00411
00412 toLog = new ReplyImpl(reply.getSourceBodyID(),
00413 reply.getSequenceNumber(), reply.getMethodName(),
00414 (FutureResult) Utils.makeDeepCopy(reply.getResult()),
00415 null);
00416
00417 MessageLog log = new ReplyLog(toLog,
00418 destination.getRemoteAdapter());
00419 for (int i = currentCheckpointIndex + 1; i <= rdvValue; i++) {
00420 ((Vector) (this.replyToResend.get(new Integer(i)))).add(log);
00421 }
00422 } catch (IOException e) {
00423 e.printStackTrace();
00424 }
00425 }
00426 return 0;
00427 }
00428
00429 public synchronized int onSendRequestAfter(Request request, int rdvValue,
00430 UniversalBody destination) throws RenegotiateSessionException {
00431
00432 if (rdvValue == FTManagerCIC.RESEND_MESSAGE) {
00433 try {
00434 request.resetSendCounter();
00435 request.setIgnoreIt(false);
00436 Thread.sleep(FTManager.TIME_TO_RESEND);
00437 int rdvValueBis = sendRequest(request, destination);
00438 return this.onSendRequestAfter(request, rdvValueBis, destination);
00439 } catch (RenegotiateSessionException e1) {
00440 throw e1;
00441 } catch (InterruptedException e) {
00442 e.printStackTrace();
00443 }
00444 }
00445
00446 int currentCheckpointIndex = this.checkpointIndex;
00447
00448
00449 if (rdvValue > currentCheckpointIndex) {
00450 this.nextMax = Math.max(this.nextMax, rdvValue);
00451
00452 this.extendRequestLog(rdvValue);
00453 try {
00454
00455 request.getMethodCall().makeDeepCopyOfArguments();
00456
00457 request.resetSendCounter();
00458 MessageLog log = new RequestLog(request,
00459 destination.getRemoteAdapter());
00460 for (int i = currentCheckpointIndex + 1; i <= rdvValue; i++) {
00461
00462 ((Vector) (this.requestToResend.get(new Integer(i)))).add(log);
00463 }
00464 } catch (IOException e) {
00465 e.printStackTrace();
00466 }
00467 }
00468 return 0;
00469 }
00470
00471 public int onServeRequestBefore(Request request) {
00472
00473 while (this.haveToCheckpoint()) {
00474 this.checkpoint(request);
00475 }
00476
00477
00478 if (FTManagerCIC.isOCEnable) {
00479 MessageInfo mi = request.getMessageInfo();
00480 if (mi != null) {
00481 long requestIndex = ((MessageInfoCIC) (mi)).positionInHistory;
00482 if (this.lastServedRequestIndex.getValue() < requestIndex) {
00483 this.lastServedRequestIndex.setValue(requestIndex);
00484 }
00485 }
00486 }
00487 return 0;
00488 }
00489
00490 public int onServeRequestAfter(Request request) {
00491 return 0;
00492 }
00493
00494
00495 public int beforeRestartAfterRecovery(CheckpointInfo ci, int inc) {
00496 CheckpointInfoCIC cic = (CheckpointInfoCIC) ci;
00497 BlockingRequestQueue queue = ((AbstractBody) owner).getRequestQueue();
00498 int index = cic.checkpointIndex;
00499
00500
00501 this.history = new Vector();
00502 this.completingCheckpoint = false;
00503 this.lastCommitedIndex = cic.lastCommitedIndex;
00504
00505 this.deliveredRequestsCounter = cic.lastCommitedIndex;
00506
00507 this.historyBaseIndex = cic.lastCommitedIndex + 1;
00508
00509
00510
00511 this.awaitedRequests = new Vector();
00512 this.replyToResend = new Hashtable();
00513 this.requestToResend = new Hashtable();
00514 this.checkpointIndex = index;
00515 this.nextMax = index;
00516 this.checkpointTimer = System.currentTimeMillis();
00517 this.historyIndex = index;
00518 this.lastRecovery = index;
00519 this.incarnation = inc;
00520
00521
00522 Request pendingRequest = cic.pendingRequest;
00523
00524
00525 if (pendingRequest != null) {
00526 queue.addToFront(pendingRequest);
00527 }
00528
00529
00530
00531 this.filterQueue(queue, cic);
00532
00533
00534
00535 Iterator itHistory = cic.history.iterator();
00536 while (itHistory.hasNext()) {
00537 UniqueID cur = (UniqueID) (itHistory.next());
00538 Request currentAwaitedRequest = new AwaitedRequest(cur);
00539 queue.add(currentAwaitedRequest);
00540 this.awaitedRequests.add(currentAwaitedRequest);
00541 }
00542
00543
00544
00545 ((AbstractBody) owner).acceptCommunication();
00546
00547 try {
00548
00549 this.location.updateLocation(ownerID, owner.getRemoteAdapter());
00550 this.recovery.updateState(ownerID, RecoveryProcess.RUNNING);
00551 } catch (RemoteException e) {
00552 logger.error("Unable to connect with location server");
00553 e.printStackTrace();
00554 }
00555
00556
00557 this.sendLogs((CheckpointInfoCIC) ci);
00558
00559 return 0;
00560 }
00561
00562 public void updateLocationAtServer(UniqueID ownerID, UniversalBody remoteBodyAdapter) {
00563 try {
00564
00565 this.location.updateLocation(ownerID, remoteBodyAdapter);
00566
00567 } catch (RemoteException e) {
00568 logger.error("Unable to connect with location server");
00569 e.printStackTrace();
00570 }
00571 }
00572
00573
00574
00575
00576
00577 private boolean updateAwaitedRequests(Request r) {
00578 AwaitedRequest ar = null;
00579 Iterator it = this.awaitedRequests.iterator();
00580 while (it.hasNext()) {
00581 AwaitedRequest arq = (AwaitedRequest) (it.next());
00582 if ((arq.getAwaitedSender()).equals(r.getSourceBodyID())) {
00583 ar = arq;
00584 break;
00585 }
00586 }
00587 if (ar != null) {
00588
00589 ar.setAwaitedRequest(r);
00590 this.awaitedRequests.remove(ar);
00591 return true;
00592 } else {
00593 return false;
00594 }
00595 }
00596
00597
00598
00599
00600 private boolean haveToCheckpoint() {
00601 int currentCheckpointIndex = this.checkpointIndex;
00602 int currentNextMax = this.nextMax;
00603
00604
00605 if (currentNextMax > currentCheckpointIndex) {
00606 return true;
00607 }
00608
00609 else if ((this.checkpointTimer + this.ttc) < System.currentTimeMillis()) {
00610 return true;
00611 } else {
00612 return false;
00613 }
00614 }
00615
00616
00617
00618
00619 private Checkpoint checkpoint(Request pendingRequest) {
00620
00621 ((AbstractBody) owner).blockCommunication();
00622
00623 synchronized (this.historyLock) {
00624 Checkpoint c;
00625
00626
00627
00628 try {
00629
00630
00631 synchronized (this) {
00632 if (logger.isDebugEnabled()) {
00633 logger.debug("[CIC] Checkpointing with index = " +
00634 (this.checkpointIndex + 1));
00635 }
00636
00637
00638 CheckpointInfoCIC ci = new CheckpointInfoCIC();
00639 this.extendReplyLog(this.checkpointIndex + 1);
00640 this.extendRequestLog(this.checkpointIndex + 1);
00641 ci.replyToResend = (Vector) (this.replyToResend.get(new Integer(this.checkpointIndex +
00642 1)));
00643 ci.requestToResend = (Vector) (this.requestToResend.get(new Integer(this.checkpointIndex +
00644 1)));
00645 ci.pendingRequest = pendingRequest;
00646 ci.checkpointIndex = this.checkpointIndex + 1;
00647
00648
00649 this.replyToResend.remove(new Integer(this.checkpointIndex +
00650 1));
00651 this.requestToResend.remove(new Integer(this.checkpointIndex +
00652 1));
00653
00654
00655 this.checkpointIndex++;
00656
00657
00658 if (!FTManagerCIC.isOCEnable) {
00659 this.history = new Vector();
00660 this.historyBaseIndex = this.deliveredRequestsCounter +
00661 1;
00662 this.lastCommitedIndex = this.deliveredRequestsCounter;
00663 }
00664
00665
00666 Hashtable requestToSendTMP = this.requestToResend;
00667 this.requestToResend = null;
00668 Hashtable replyToSendTMP = this.replyToResend;
00669 this.replyToResend = null;
00670 Vector historyTMP = this.history;
00671 this.history = null;
00672 Vector awaitedRequestTMP = this.awaitedRequests;
00673 this.awaitedRequests = null;
00674
00675
00676 ci.lastRcvdRequestIndex = this.deliveredRequestsCounter;
00677
00678 this.setCheckpointTag(true);
00679 c = new Checkpoint((Body) owner, this.additionalCodebase);
00680
00681 c.setCheckpointInfo(ci);
00682
00683
00684 this.storage.storeCheckpoint(c, this.incarnation);
00685 this.setCheckpointTag(false);
00686
00687
00688 this.replyToResend = replyToSendTMP;
00689 this.requestToResend = requestToSendTMP;
00690 this.history = historyTMP;
00691 this.awaitedRequests = awaitedRequestTMP;
00692
00693
00694 this.completingCheckpoint = true;
00695
00696
00697 this.checkpointTimer = System.currentTimeMillis();
00698 }
00699
00700
00701
00702
00703 return c;
00704 } catch (RemoteException e) {
00705 logger.error("[CIC] Unable to send checkpoint to the server");
00706 e.printStackTrace();
00707 } finally {
00708
00709 ((AbstractBody) owner).acceptCommunication();
00710 }
00711 return null;
00712 }
00713 }
00714
00715
00716
00717
00718
00719
00720 private HistoryUpdater commitHistories(int indexOfCkpt, long upTo,
00721 boolean sendToServer, boolean isMinimal) {
00722 synchronized (this.historyLock) {
00723 if (isMinimal && (this.historyIndex >= indexOfCkpt)) {
00724
00725
00726 return null;
00727 }
00728
00729
00730 List histoToCommit = this.getHistoryToCommit(this.lastCommitedIndex +
00731 1, upTo);
00732
00733
00734 HistoryUpdater toSend = null;
00735 if (histoToCommit == null) {
00736
00737 toSend = new HistoryUpdater(histoToCommit, 0, 0, this.ownerID,
00738 indexOfCkpt, this.incarnation);
00739 this.historyIndex = this.checkpointIndex;
00740
00741 } else {
00742 toSend = new HistoryUpdater(histoToCommit,
00743 this.lastCommitedIndex + 1, upTo, this.ownerID,
00744 indexOfCkpt, this.incarnation);
00745 this.historyIndex = this.checkpointIndex;
00746 this.lastCommitedIndex = upTo;
00747
00748 this.deleteCommitedHistory(toSend.base, toSend.last);
00749 }
00750
00751
00752 if (sendToServer) {
00753 try {
00754 this.storage.commitHistory(toSend);
00755 } catch (RemoteException e) {
00756 logger.error("[ERROR] Storage server is not reachable !");
00757 e.printStackTrace();
00758 }
00759 }
00760 return toSend;
00761 }
00762 }
00763
00764
00765
00766
00767 private List getHistoryToCommit(long from, long upTo) {
00768 if (from == (upTo + 1)) {
00769
00770 return null;
00771 } else {
00772 int translatedFrom = (int) (from - this.historyBaseIndex);
00773 int translatedUpTo = (int) (upTo - this.historyBaseIndex);
00774 Vector toRet = new Vector(translatedUpTo - translatedFrom);
00775 Iterator itHisto = this.history.iterator();
00776 for (int i = 0; i < translatedFrom; i++) {
00777 itHisto.next();
00778 }
00779 for (int i = translatedFrom; i <= translatedUpTo; i++) {
00780 toRet.add(itHisto.next());
00781 }
00782 return toRet;
00783 }
00784 }
00785
00786
00787
00788
00789 private void deleteCommitedHistory(long from, long upTo) {
00790 if ((upTo < this.historyBaseIndex) || (from < this.historyBaseIndex)) {
00791 throw new ProtocolErrorException("Deleting from " + from +
00792 " up to " + upTo + " while local is from " +
00793 this.historyBaseIndex + " up to " +
00794 this.deliveredRequestsCounter);
00795 } else {
00796 if (!this.history.isEmpty()) {
00797 this.history.subList((int) (from - this.historyBaseIndex),
00798 (int) ((upTo - this.historyBaseIndex) + 1)).clear();
00799 this.historyBaseIndex = upTo + 1;
00800 }
00801 }
00802 }
00803
00804
00805
00806
00807
00808 private void sendLogs(CheckpointInfoCIC ci) {
00809
00810
00811 Vector replies = ci.replyToResend;
00812 Iterator itReplies = replies.iterator();
00813 while (itReplies.hasNext()) {
00814
00815 UniversalBody destination = null;
00816 Reply r = null;
00817 ReplyLog rl = (ReplyLog) (itReplies.next());
00818 r = rl.getReply();
00819 destination = rl.getDestination();
00820 this.sendReply(r, destination);
00821 }
00822
00823
00824 Vector requests = ci.requestToResend;
00825 Iterator itRequests = requests.iterator();
00826 while (itRequests.hasNext()) {
00827 try {
00828
00829 UniversalBody destination = null;
00830 RequestLog lr = (RequestLog) (itRequests.next());
00831 Request loggedRequest = lr.getRequest();
00832 destination = lr.getDestination();
00833
00834 Request r = new RequestImpl(loggedRequest.getMethodCall(),
00835 this.owner.getRemoteAdapter(),
00836 loggedRequest.isOneWay(),
00837 loggedRequest.getSequenceNumber());
00838 this.sendRequest(r, destination);
00839 } catch (RenegotiateSessionException e) {
00840 e.printStackTrace();
00841 }
00842 }
00843 }
00844
00845
00846
00847 private void filterQueue(BlockingRequestQueue queue, CheckpointInfoCIC cic) {
00848 CircularArrayList internalQueue = ((BlockingRequestQueueImpl) queue).getInternalQueue();
00849 ListIterator itQueue = internalQueue.listIterator();
00850 while (itQueue.hasNext()) {
00851 Request current = (Request) (itQueue.next());
00852 MessageInfoCIC mi = (MessageInfoCIC) current.getMessageInfo();
00853 if (mi == null) {
00854
00855 if (current instanceof AwaitedRequest) {
00856
00857 this.awaitedRequests.add(current);
00858 }
00859 } else if (mi.isOrphanFor <= cic.checkpointIndex) {
00860
00861
00862 AwaitedRequest ar = new AwaitedRequest(current.getSourceBodyID());
00863 itQueue.set(ar);
00864 this.awaitedRequests.add(ar);
00865 }
00866 }
00867 }
00868
00869
00870
00871
00872 private void extendRequestLog(int size) {
00873 if (this.latestRequestLog < size) {
00874
00875 for (int j = this.latestRequestLog + 1; j <= size; j++) {
00876 this.requestToResend.put(new Integer(j), new Vector());
00877 }
00878 this.latestRequestLog = size;
00879 }
00880 }
00881
00882
00883
00884
00885 private void extendReplyLog(int size) {
00886 if (this.latestReplyLog < size) {
00887
00888 for (int j = this.latestReplyLog + 1; j <= size; j++) {
00889 this.replyToResend.put(new Integer(j), new Vector());
00890 }
00891 this.latestReplyLog = size;
00892 }
00893 }
00894
00895 public String toString() {
00896 String ret = " Incarnation = ";
00897 ret += this.incarnation;
00898 return ret;
00899 }
00900
00901
00902
00903
00904
00905
00906
00907
00908
00909
00913
00914 public Object handleFTMessage(FTMessage fte) {
00915 return fte.handleFTMessage(this);
00916 }
00917
00923 public HistoryUpdater handlingGSCEEvent(GlobalStateCompletion fte) {
00924
00925
00926 HistoryUpdater rh = this.commitHistories(this.checkpointIndex,
00927 this.deliveredRequestsCounter, false, true);
00928
00929
00930
00931 if (this.completingCheckpoint) {
00932 this.completingCheckpoint = false;
00933 }
00934 return rh;
00935 }
00936
00943 public HistoryUpdater handlingOCEvent(OutputCommit fte) {
00944
00945 long upTo = fte.getLastIndexToRetreive();
00946
00947
00948
00949 int attachedIndex = (this.completingCheckpoint)
00950 ? (this.checkpointIndex - 1) : (this.checkpointIndex);
00951 HistoryUpdater toSend = this.commitHistories(attachedIndex, upTo,
00952 false, false);
00953 return toSend;
00954 }
00955 }