00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.ft.protocols.pmlrb.managers;
00032
00033 import java.io.IOException;
00034 import java.rmi.RemoteException;
00035 import java.util.Hashtable;
00036 import java.util.Iterator;
00037 import java.util.List;
00038
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.Body;
00041 import org.objectweb.proactive.core.ProActiveException;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.AbstractBody;
00044 import org.objectweb.proactive.core.body.UniversalBody;
00045 import org.objectweb.proactive.core.body.ft.checkpointing.Checkpoint;
00046 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00047 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00048 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00049 import org.objectweb.proactive.core.body.ft.protocols.pmlrb.infos.CheckpointInfoPMLRB;
00050 import org.objectweb.proactive.core.body.ft.protocols.pmlrb.infos.MessageInfoPMLRB;
00051 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00052 import org.objectweb.proactive.core.body.future.FuturePool;
00053 import org.objectweb.proactive.core.body.message.Message;
00054 import org.objectweb.proactive.core.body.reply.Reply;
00055 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00056 import org.objectweb.proactive.core.body.request.Request;
00057 import org.objectweb.proactive.core.util.MutableLong;
00058 import org.objectweb.proactive.core.util.log.Loggers;
00059 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00060 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00061
00062
00069 public class FTManagerPMLRB extends FTManager {
00070
00072 public static final int INC_VALUE = Integer.MAX_VALUE;
00073
00075 public static final int IGNORED_MSG = -1;
00076
00077
00078 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_PML);
00079
00080
00081
00082 private Hashtable latestReceivedIndex;
00083
00084
00085 private boolean isRecovering;
00086
00087
00088 private long checkpointTimer;
00089
00090
00091 private char sendNumber;
00092 private MessageInfoPMLRB replyInfos;
00093 private MessageInfoPMLRB requestInfos;
00094
00095
00096
00097
00098 private transient UniqueID potentialDuplicataSender;
00099 private transient long potentialDuplicataSequence;
00100
00105 public int init(AbstractBody owner) throws ProActiveException {
00106 super.init(owner);
00107 this.latestReceivedIndex = new Hashtable();
00108 this.isRecovering = false;
00109
00110 this.checkpointTimer = 0;
00111
00112 this.sendNumber = 0;
00113 this.replyInfos = new MessageInfoPMLRB();
00114 this.requestInfos = new MessageInfoPMLRB();
00115 this.potentialDuplicataSender = null;
00116 this.potentialDuplicataSequence = 0;
00117 logger.info(" PML fault-tolerance is enabled for body " + this.ownerID);
00118 return 0;
00119 }
00120
00125 public int onReceiveReply(Reply reply) {
00126
00127 if (reply.getMessageInfo() == null) {
00128 reply.setFTManager(this);
00129 return 0;
00130 }
00131
00132
00133
00134 if (!reply.isAutomaticContinuation()) {
00135 if (this.alreadyReceived(reply)) {
00136
00137 reply.setIgnoreIt(true);
00138 return IGNORED_MSG;
00139 }
00140 }
00141 reply.setFTManager(this);
00142 return 0;
00143 }
00144
00149 public int onReceiveRequest(Request request) {
00150
00151 if (request.getMessageInfo() == null) {
00152 request.setFTManager(this);
00153 return 0;
00154 }
00155 if (this.alreadyReceived(request)) {
00156
00157 request.setIgnoreIt(true);
00158 return IGNORED_MSG;
00159 }
00160 request.setFTManager(this);
00161 return 0;
00162 }
00163
00169 public int onDeliverReply(Reply reply) {
00170
00171 if (!this.isRecovering) {
00172 try {
00173
00174 this.storage.storeReply(this.ownerID, reply);
00175
00176 this.updateLatestRvdIndexTable(reply);
00177 } catch (RemoteException e) {
00178 e.printStackTrace();
00179 }
00180 }
00181 return 0;
00182 }
00183
00189 public int onDeliverRequest(Request request) {
00190
00191 if (!this.isRecovering) {
00192 try {
00193
00194 this.storage.storeRequest(this.ownerID, request);
00195
00196 this.updateLatestRvdIndexTable(request);
00197 } catch (RemoteException e) {
00198 e.printStackTrace();
00199 }
00200 }
00201 return 0;
00202 }
00203
00204
00205
00206
00207 private void updateLatestRvdIndexTable(Message m) {
00208
00209 MutableLong index = (MutableLong) (this.latestReceivedIndex.get(m.getSourceBodyID()));
00210 MessageInfoPMLRB mi = (MessageInfoPMLRB) (m.getMessageInfo());
00211 if (mi == null) {
00212
00213 return;
00214 }
00215 long msgIndex = mi.sentSequenceNumber;
00216 if (index != null) {
00217 index.setValue(msgIndex);
00218 } else {
00219
00220 this.latestReceivedIndex.put(m.getSourceBodyID(),
00221 new MutableLong(msgIndex));
00222 }
00223 }
00224
00225
00226
00227
00228 private boolean alreadyReceived(Message m) {
00229 if ((this.potentialDuplicataSender != null) &&
00230 (m.getSourceBodyID().equals(this.potentialDuplicataSender)) &&
00231 (m.getSequenceNumber() == this.potentialDuplicataSequence)) {
00232
00233
00234 this.potentialDuplicataSender = null;
00235 return true;
00236 } else {
00237 long msgIndex = ((MessageInfoPMLRB) (m.getMessageInfo())).sentSequenceNumber;
00238 MutableLong index = (MutableLong) (this.latestReceivedIndex.get(m.getSourceBodyID()));
00239 return (index != null) && (msgIndex <= index.getValue());
00240 }
00241 }
00242
00246 public synchronized int onSendReplyBefore(Reply reply) {
00247 this.replyInfos.sentSequenceNumber = this.getNextSendNumber();
00248 reply.setMessageInfo(this.replyInfos);
00249 return 0;
00250 }
00251
00255 public int onSendReplyAfter(Reply reply, int rdvValue,
00256 UniversalBody destination) {
00257 return 0;
00258 }
00259
00263 public synchronized int onSendRequestBefore(Request request) {
00264 this.requestInfos.sentSequenceNumber = this.getNextSendNumber();
00265 request.setMessageInfo(this.requestInfos);
00266 return 0;
00267 }
00268
00272 public int onSendRequestAfter(Request request, int rdvValue,
00273 UniversalBody destination) throws RenegotiateSessionException {
00274 return 0;
00275 }
00276
00280 public int onServeRequestBefore(Request request) {
00281 if (this.haveToCheckpoint()) {
00282 this.checkpoint(request);
00283 }
00284 return 0;
00285 }
00286
00290 public int onServeRequestAfter(Request request) {
00291 return 0;
00292 }
00293
00297 public int beforeRestartAfterRecovery(CheckpointInfo ci, int inc) {
00298
00299 this.isRecovering = true;
00300
00301
00302 this.owner.registerIncomingFutures();
00303
00304
00305 List replies = ((CheckpointInfoPMLRB) ci).getReplyLog();
00306 List request = ((CheckpointInfoPMLRB) ci).getRequestLog();
00307
00308
00309
00310 Request potentialDuplicata = (Request) (request.get(request.size() - 1));
00311 this.potentialDuplicataSender = potentialDuplicata.getSourceBodyID();
00312 this.potentialDuplicataSequence = potentialDuplicata.getSequenceNumber();
00313
00314
00315 Iterator itRequest = request.iterator();
00316 BlockingRequestQueue queue = owner.getRequestQueue();
00317
00318 while (itRequest.hasNext()) {
00319 queue.add((Request) (itRequest.next()));
00320 }
00321
00322
00323 Iterator itReplies = replies.iterator();
00324 FuturePool fp = owner.getFuturePool();
00325 try {
00326 while (itReplies.hasNext()) {
00327 Reply current = (Reply) (itReplies.next());
00328 fp.receiveFutureValue(current.getSequenceNumber(),
00329 current.getSourceBodyID(), current.getResult(), current);
00330 }
00331 } catch (IOException e) {
00332 e.printStackTrace();
00333 }
00334
00335
00336 Request pendingRequest = ((CheckpointInfoPMLRB) ci).getPendingRequest();
00337
00338
00339 if (pendingRequest != null) {
00340 queue.addToFront(pendingRequest);
00341 }
00342
00343
00344 this.isRecovering = false;
00345
00346
00347 this.owner.acceptCommunication();
00348
00349 try {
00350
00351 this.location.updateLocation(ownerID, owner.getRemoteAdapter());
00352 this.recovery.updateState(ownerID, RecoveryProcess.RUNNING);
00353 } catch (RemoteException e) {
00354 logger.error("Unable to connect with location server");
00355 e.printStackTrace();
00356 }
00357
00358 this.checkpointTimer = System.currentTimeMillis();
00359
00360 return 0;
00361 }
00362
00366 public Object handleFTMessage(FTMessage fte) {
00367 return fte.handleFTMessage(this);
00368 }
00369
00371
00373 private boolean haveToCheckpoint() {
00374 return ((this.checkpointTimer + this.ttc) < System.currentTimeMillis());
00375 }
00376
00377 private void checkpoint(Request pending) {
00378
00379 owner.blockCommunication();
00380
00381 try {
00382 this.setCheckpointTag(true);
00383
00384 Checkpoint c = new Checkpoint((Body) owner, this.additionalCodebase);
00385
00386
00387 CheckpointInfoPMLRB ci = new CheckpointInfoPMLRB(pending);
00388
00389
00390 c.setCheckpointInfo(ci);
00391
00392 this.storage.storeCheckpoint(c, FTManagerPMLRB.DEFAULT_TTC_VALUE);
00393
00394
00395 this.checkpointTimer = System.currentTimeMillis();
00396
00397 this.setCheckpointTag(false);
00398 } catch (RemoteException e) {
00399 logger.error("[PMLRB] Unable to send checkpoint to the server");
00400 e.printStackTrace();
00401 }
00402
00403 owner.acceptCommunication();
00404 }
00405
00406 private synchronized char getNextSendNumber() {
00407 return ++sendNumber;
00408 }
00409 }