org/objectweb/proactive/core/body/ft/protocols/cic/managers/FTManagerCIC.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.ft.protocols.cic.managers;
00032 
00033 import java.io.IOException;
00034 import java.rmi.RemoteException;
00035 import java.util.Enumeration;
00036 import java.util.Hashtable;
00037 import java.util.Iterator;
00038 import java.util.List;
00039 import java.util.ListIterator;
00040 import java.util.Vector;
00041 
00042 import org.apache.log4j.Logger;
00043 import org.objectweb.proactive.Body;
00044 import org.objectweb.proactive.core.ProActiveException;
00045 import org.objectweb.proactive.core.UniqueID;
00046 import org.objectweb.proactive.core.body.AbstractBody;
00047 import org.objectweb.proactive.core.body.UniversalBody;
00048 import org.objectweb.proactive.core.body.ft.checkpointing.Checkpoint;
00049 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00050 import org.objectweb.proactive.core.body.ft.exception.ProtocolErrorException;
00051 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00052 import org.objectweb.proactive.core.body.ft.internalmsg.GlobalStateCompletion;
00053 import org.objectweb.proactive.core.body.ft.internalmsg.OutputCommit;
00054 import org.objectweb.proactive.core.body.ft.message.HistoryUpdater;
00055 import org.objectweb.proactive.core.body.ft.message.MessageInfo;
00056 import org.objectweb.proactive.core.body.ft.message.MessageLog;
00057 import org.objectweb.proactive.core.body.ft.message.ReplyLog;
00058 import org.objectweb.proactive.core.body.ft.message.RequestLog;
00059 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00060 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.CheckpointInfoCIC;
00061 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.MessageInfoCIC;
00062 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00063 import org.objectweb.proactive.core.body.future.FutureResult;
00064 import org.objectweb.proactive.core.body.message.Message;
00065 import org.objectweb.proactive.core.body.reply.Reply;
00066 import org.objectweb.proactive.core.body.reply.ReplyImpl;
00067 import org.objectweb.proactive.core.body.request.AwaitedRequest;
00068 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00069 import org.objectweb.proactive.core.body.request.BlockingRequestQueueImpl;
00070 import org.objectweb.proactive.core.body.request.Request;
00071 import org.objectweb.proactive.core.body.request.RequestImpl;
00072 import org.objectweb.proactive.core.mop.Utils;
00073 import org.objectweb.proactive.core.util.CircularArrayList;
00074 import org.objectweb.proactive.core.util.MutableLong;
00075 import org.objectweb.proactive.core.util.log.Loggers;
00076 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00077 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00078 
00079 
00086 public class FTManagerCIC
00087     extends org.objectweb.proactive.core.body.ft.protocols.FTManager {
00088 
00090     public static final int RESEND_MESSAGE = -3;
00091 
00093     public static final int RECOVER = -4;
00094 
00095     //logger
00096     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_CIC);
00097 
00098     // local runtime
00099     // private static final Runtime runtime = Runtime.getRuntime();
00100     // FT Values
00101     private int incarnation;
00102     private int lastRecovery; // index of the last ckpt used for recovery
00103     private int checkpointIndex; //index of the latest perfomred checkpoint
00104     private long checkpointTimer;
00105     private int nextMax;
00106 
00107     // private int nextMin;
00108     private int historyIndex; // index of the latest closed history
00109 
00110     // logged messages
00111     private Hashtable requestToResend;
00112     private int latestRequestLog;
00113     private Hashtable replyToResend;
00114     private int latestReplyLog;
00115 
00116     // awaited request in owner request queue
00117     private Vector awaitedRequests;
00118 
00119     // pool of MessageInfo
00120     private MessageInfoCIC forSentRequest;
00121     private MessageInfoCIC forSentReply;
00122 
00123     // history
00124     private Vector history;
00125 
00126     // cannot lock hisotry itself, because it is modified in synchronized blocks !
00127     private final Character historyLock = new Character('l');
00128 
00129     // protocol for output commit
00130     private long deliveredRequestsCounter;
00131     private MutableLong lastServedRequestIndex; // reference to localVectorClock[this.ownerID]
00132     private Hashtable localVectorClock; // ids <-> lastServedRequest(local view)
00133     private long historyBaseIndex;
00134     private long lastCommitedIndex;
00135     private boolean completingCheckpoint; // true if the latest checkpoint is still not completed with its minimal history
00136 
00137     // ****** static toggle for OC [TEST] *******
00138     private final static boolean isOCEnable = false;
00139 
00140     public int init(AbstractBody owner) throws ProActiveException {
00141         super.init(owner);
00142         this.incarnation = 1;
00143         this.checkpointIndex = 0;
00144         this.historyIndex = 0;
00145         this.nextMax = 1;
00146         //this.nextMin = 0;
00147         this.lastRecovery = 0;
00148         this.checkpointTimer = System.currentTimeMillis();
00149         this.requestToResend = new Hashtable();
00150         this.latestRequestLog = 0;
00151         this.replyToResend = new Hashtable();
00152         this.latestReplyLog = 0;
00153         this.history = new Vector();
00154         this.awaitedRequests = new Vector();
00155         this.forSentRequest = new MessageInfoCIC();
00156         this.forSentReply = new MessageInfoCIC();
00157         this.deliveredRequestsCounter = -1;
00158         this.lastCommitedIndex = -1;
00159         this.lastServedRequestIndex = new MutableLong(0);
00160         this.historyBaseIndex = 0;
00161         this.completingCheckpoint = false;
00162         this.localVectorClock = new Hashtable();
00163         this.localVectorClock.put(this.ownerID, this.lastServedRequestIndex);
00164         logger.info(" CIC fault-tolerance is enabled for body " + this.ownerID);
00165         return 0;
00166     }
00167 
00168     public int onReceiveReply(Reply reply) {
00169         reply.setFTManager(this);
00170         return this.incarnationTest(reply);
00171     }
00172 
00173     public int onReceiveRequest(Request request) {
00174         request.setFTManager(this);
00175         return this.incarnationTest(request);
00176     }
00177 
00178     /*
00179      * This method test if the message m has to be take into account by the receiver;
00180      * If not, set the ignore tag in m.
00181      */
00182     private int incarnationTest(Message m) {
00183         if (this.isSignificant(m)) {
00184             MessageInfoCIC mi = (MessageInfoCIC) m.getMessageInfo();
00185             int localInt = this.incarnation;
00186             int inc = mi.incarnation;
00187             if (inc > localInt) {
00188                 // this body will recover
00189                 m.setIgnoreIt(true);
00190                 return FTManagerCIC.RESEND_MESSAGE;
00191             } else if (inc < localInt) {
00192                 // force the sender to recover and ignore this message
00193                 m.setIgnoreIt(true);
00194                 return FTManagerCIC.RECOVER;
00195             }
00196         }
00197         return 0; //This value is not returned to the sender
00198     }
00199 
00200     public synchronized int onDeliverReply(Reply reply) {
00201         int currentCheckpointIndex = this.checkpointIndex;
00202         if (this.isSignificant(reply)) {
00203             MessageInfoCIC mi = (MessageInfoCIC) reply.getMessageInfo();
00204 
00205             // history closure
00206             this.updateHistory(mi.historyIndex);
00207             // udpate checkpoint index
00208             if (mi.checkpointIndex > currentCheckpointIndex) {
00209                 this.nextMax = Math.max(this.nextMax, mi.checkpointIndex);
00210             }
00211         }
00212         return currentCheckpointIndex;
00213     }
00214 
00215     public synchronized int onDeliverRequest(Request request) {
00216         int currentCheckpointIndex = this.checkpointIndex;
00217 
00218         //System.out.println(""+ this.ownerID + " receive " + request);
00219         if (this.isSignificant(request)) {
00220             //System.out.println(""+ this.ownerID + " receive significant " + request);
00221             MessageInfoCIC mi = (MessageInfoCIC) request.getMessageInfo();
00222 
00223             // history closure
00224             this.updateHistory(mi.historyIndex);
00225             //is there any corresponding awaited request ?      
00226             if (!(this.updateAwaitedRequests(request))) {
00227                 if (FTManagerCIC.isOCEnable || this.completingCheckpoint) {
00228                     synchronized (historyLock) {
00229                         // if no, an awaited request is added to history
00230                         history.add(request.getSourceBodyID());
00231                     }
00232 
00233                     // inc the historized index
00234                     this.deliveredRequestsCounter++;
00235                     // manage local vector clock only if needed
00236                     if (FTManagerCIC.isOCEnable) {
00237                         // set the position in history of the request
00238                         mi.positionInHistory = this.deliveredRequestsCounter;
00239                         // update local vector clock 
00240                         this.updateLocalVectorClock(mi.vectorClock);
00241                     }
00242                 }
00243             } else {
00244                 //this request must be then ignored...
00245                 request.setIgnoreIt(true);
00246             }
00247 
00248             // udpate checkpoint index
00249             int ckptIndex = mi.checkpointIndex;
00250             if (ckptIndex > currentCheckpointIndex) {
00251                 this.nextMax = Math.max(this.nextMax, ckptIndex);
00252                 // mark the request that is orphan; it will be changed in awaited req in next ckpt
00253                 // oprhan du indexCkpt+1 a mi.ckptIndex compris
00254                 mi.isOrphanFor = (char) ckptIndex;
00255                 //System.out.println("" + this.ownerID + " will have orphans in checkpoint " + ckptIndex);
00256             }
00257         }
00258         return currentCheckpointIndex;
00259     }
00260 
00261     /*
00262      * Close and commit the current history if needed.
00263      */
00264     private void updateHistory(int index) {
00265         if (index > this.historyIndex) {
00266             // commit minimal history to the server
00267             this.commitHistories(this.checkpointIndex,
00268                 this.deliveredRequestsCounter, true, true);
00269             if (this.completingCheckpoint) {
00270                 this.completingCheckpoint = false;
00271             }
00272         }
00273     }
00274 
00275     /*
00276      * Return true if the message m is significant for the protocol, i.e
00277      * if it's not sent by a non_ft body nor a half body
00278      */
00279     private boolean isSignificant(Message m) {
00280         return ((m.getMessageInfo() != null) &&
00281         (!m.getMessageInfo().isFromHalfBody()));
00282     }
00283 
00284     /*
00285      * Update the local vector clock regarding the paramater.
00286      * if local[i]<param[i] or local[i] doesn't exist, then local[i]=param[i];
00287      */
00288     private void updateLocalVectorClock(Hashtable vectorClock) {
00289         Enumeration ids = vectorClock.keys();
00290         MutableLong localClock;
00291         MutableLong senderClock = null;
00292         UniqueID id = null;
00293         while (ids.hasMoreElements()) {
00294             id = (UniqueID) (ids.nextElement());
00295             localClock = (MutableLong) (this.localVectorClock.get(id));
00296             senderClock = (MutableLong) (vectorClock.get(id));
00297             if (localClock == null) {
00298                 // there is no clock for the AO id
00299                 this.localVectorClock.put(id,
00300                     new MutableLong(senderClock.getValue()));
00301             } else if (localClock.isLessThan(senderClock)) {
00302                 // local clock is not uptodate
00303                 localClock.setValue(senderClock.getValue());
00304             }
00305         }
00306     }
00307 
00308     public synchronized int onSendReplyBefore(Reply reply) {
00309         // set message info values
00310         this.forSentReply.checkpointIndex = (char) this.checkpointIndex;
00311         this.forSentReply.historyIndex = (char) this.historyIndex;
00312         this.forSentReply.incarnation = (char) this.incarnation;
00313         this.forSentReply.lastRecovery = (char) this.lastRecovery;
00314         this.forSentReply.isOrphanFor = Character.MAX_VALUE;
00315         this.forSentReply.fromHalfBody = false;
00316         this.forSentReply.vectorClock = null;
00317         reply.setMessageInfo(this.forSentReply);
00318 
00319         // output commit
00320         if (FTManagerCIC.isOCEnable && this.isOutputCommit(reply)) {
00321             try {
00322                 if (logger.isDebugEnabled()) {
00323                     logger.debug(this.ownerID +
00324                         " is output commiting for reply " + reply);
00325                 }
00326                 this.storage.outputCommit(this.forSentReply);
00327             } catch (RemoteException e) {
00328                 logger.error("**ERROR** Cannot perform output commit");
00329                 e.printStackTrace();
00330             }
00331         }
00332 
00333         return 0;
00334     }
00335 
00336     public synchronized int onSendRequestBefore(Request request) {
00337         // set message info values
00338         this.forSentRequest.checkpointIndex = (char) this.checkpointIndex;
00339         this.forSentRequest.historyIndex = (char) this.historyIndex;
00340         this.forSentRequest.incarnation = (char) this.incarnation;
00341         this.forSentRequest.lastRecovery = (char) this.lastRecovery;
00342         this.forSentRequest.isOrphanFor = Character.MAX_VALUE;
00343         this.forSentRequest.fromHalfBody = false;
00344         if (FTManagerCIC.isOCEnable) {
00345             this.forSentRequest.vectorClock = this.localVectorClock;
00346         }
00347         request.setMessageInfo(this.forSentRequest);
00348 
00349         // output commit
00350         if (FTManagerCIC.isOCEnable && this.isOutputCommit(request)) {
00351             try {
00352                 if (logger.isDebugEnabled()) {
00353                     logger.debug(this.ownerID +
00354                         " is output commiting for request " + request);
00355                 }
00356                 this.storage.outputCommit(this.forSentRequest);
00357             } catch (RemoteException e) {
00358                 logger.error("**ERROR** Cannot perform output commit");
00359                 e.printStackTrace();
00360             }
00361         }
00362         return 0;
00363     }
00364 
00365     /*
00366      * Return true if the sending of the paramter message is an output commit
00367      * ** TEST IMPLEMENTATION **
00368      */
00369     private boolean isOutputCommit(Message m) {
00370         if (FTManagerCIC.isOCEnable) {
00371             Request r = (Request) m;
00372             if (r.getMethodName().equals("logEvent")) {
00373                 return true;
00374             } else {
00375                 return false;
00376             }
00377         } else {
00378             return false;
00379         }
00380     }
00381 
00382     public synchronized int onSendReplyAfter(Reply reply, int rdvValue,
00383         UniversalBody destination) {
00384         // if return value is RESEND, receiver have to recover --> resend the message
00385         if (rdvValue == FTManagerCIC.RESEND_MESSAGE) {
00386             try {
00387                 reply.setIgnoreIt(false);
00388                 Thread.sleep(FTManager.TIME_TO_RESEND);
00389                 int rdvValueBis = sendReply(reply, destination);
00390                 return this.onSendReplyAfter(reply, rdvValueBis, destination);
00391             } catch (InterruptedException e) {
00392                 e.printStackTrace();
00393             }
00394         }
00395         int currentCheckpointIndex = this.checkpointIndex;
00396 
00397         // update checkpoint index
00398         if (rdvValue > currentCheckpointIndex) {
00399             this.nextMax = Math.max(this.nextMax, rdvValue);
00400             // log this in-transit message
00401             this.extendReplyLog(rdvValue);
00402             // must make a deep copy of result !
00403             try {
00404                 Reply toLog = null;
00405 
00406                 //try {
00407                 //    toLog = new ReplyImpl(reply.getSourceBodyID(),
00408                 //            reply.getSequenceNumber(), reply.getMethodName(),
00409                 //            (FutureResult) Utils.makeDeepCopy(reply.getResult()),
00410                 //            owner.getProActiveSecurityManager());
00411                 //} catch (SecurityNotAvailableException e1) {
00412                 toLog = new ReplyImpl(reply.getSourceBodyID(),
00413                         reply.getSequenceNumber(), reply.getMethodName(),
00414                         (FutureResult) Utils.makeDeepCopy(reply.getResult()),
00415                         null);
00416                 //}
00417                 MessageLog log = new ReplyLog(toLog,
00418                         destination.getRemoteAdapter());
00419                 for (int i = currentCheckpointIndex + 1; i <= rdvValue; i++) {
00420                     ((Vector) (this.replyToResend.get(new Integer(i)))).add(log);
00421                 }
00422             } catch (IOException e) {
00423                 e.printStackTrace();
00424             }
00425         }
00426         return 0;
00427     }
00428 
00429     public synchronized int onSendRequestAfter(Request request, int rdvValue,
00430         UniversalBody destination) throws RenegotiateSessionException {
00431         //      if return value is RESEDN, receiver have to recover --> resend the message
00432         if (rdvValue == FTManagerCIC.RESEND_MESSAGE) {
00433             try {
00434                 request.resetSendCounter();
00435                 request.setIgnoreIt(false);
00436                 Thread.sleep(FTManager.TIME_TO_RESEND);
00437                 int rdvValueBis = sendRequest(request, destination);
00438                 return this.onSendRequestAfter(request, rdvValueBis, destination);
00439             } catch (RenegotiateSessionException e1) {
00440                 throw e1;
00441             } catch (InterruptedException e) {
00442                 e.printStackTrace();
00443             }
00444         }
00445 
00446         int currentCheckpointIndex = this.checkpointIndex;
00447 
00448         // update checkpoint index
00449         if (rdvValue > currentCheckpointIndex) {
00450             this.nextMax = Math.max(this.nextMax, rdvValue);
00451             // log this in-transit message in the rdvValue-currentIndex next checkpoints
00452             this.extendRequestLog(rdvValue);
00453             try {
00454                 //must make deep copy of paramteres
00455                 request.getMethodCall().makeDeepCopyOfArguments();
00456                 //must reset the send counter (this request has not been forwarded)
00457                 request.resetSendCounter();
00458                 MessageLog log = new RequestLog(request,
00459                         destination.getRemoteAdapter());
00460                 for (int i = currentCheckpointIndex + 1; i <= rdvValue; i++) {
00461                     //System.out.println(""+this.ownerID + " logs a request for " + destination.getID());
00462                     ((Vector) (this.requestToResend.get(new Integer(i)))).add(log);
00463                 }
00464             } catch (IOException e) {
00465                 e.printStackTrace();
00466             }
00467         }
00468         return 0;
00469     }
00470 
00471     public int onServeRequestBefore(Request request) {
00472         // checkpoint if needed
00473         while (this.haveToCheckpoint()) {
00474             this.checkpoint(request);
00475         }
00476 
00477         // update the last served request index only if needed
00478         if (FTManagerCIC.isOCEnable) {
00479             MessageInfo mi = request.getMessageInfo();
00480             if (mi != null) {
00481                 long requestIndex = ((MessageInfoCIC) (mi)).positionInHistory;
00482                 if (this.lastServedRequestIndex.getValue() < requestIndex) {
00483                     this.lastServedRequestIndex.setValue(requestIndex);
00484                 }
00485             }
00486         }
00487         return 0;
00488     }
00489 
00490     public int onServeRequestAfter(Request request) {
00491         return 0;
00492     }
00493 
00494     // Active Object is created but not started 
00495     public int beforeRestartAfterRecovery(CheckpointInfo ci, int inc) {
00496         CheckpointInfoCIC cic = (CheckpointInfoCIC) ci;
00497         BlockingRequestQueue queue = ((AbstractBody) owner).getRequestQueue();
00498         int index = cic.checkpointIndex;
00499 
00500         //      reinit ft values
00501         this.history = new Vector();
00502         this.completingCheckpoint = false;
00503         this.lastCommitedIndex = cic.lastCommitedIndex;
00504         // historized requests are supposed to be "already received"
00505         this.deliveredRequestsCounter = cic.lastCommitedIndex; //cic.lastRcvdRequestIndex;
00506         // new history then begin at the end of the history of the checkpoint
00507         this.historyBaseIndex = cic.lastCommitedIndex + 1; //;cic.lastRcvdRequestIndex+1;
00508 
00509         // HERE, we need a proof that running in "histo mode" is equivalent that 
00510         // running in normal mode from the end of the histo.
00511         this.awaitedRequests = new Vector();
00512         this.replyToResend = new Hashtable();
00513         this.requestToResend = new Hashtable();
00514         this.checkpointIndex = index;
00515         this.nextMax = index;
00516         this.checkpointTimer = System.currentTimeMillis();
00517         this.historyIndex = index;
00518         this.lastRecovery = index;
00519         this.incarnation = inc;
00520 
00521         //add pending request to reuqestQueue
00522         Request pendingRequest = cic.pendingRequest;
00523 
00524         //pending request could be null with OOSPMD synchronization        
00525         if (pendingRequest != null) {
00526             queue.addToFront(pendingRequest);
00527         }
00528 
00529         //add orphan-tagged requests in request queue
00530         //this requests are also added to this.awaitedRequests
00531         this.filterQueue(queue, cic);
00532 
00533         // building history
00534         // System.out.println(""+ this.ownerID + " History size : " + cic.history.size());
00535         Iterator itHistory = cic.history.iterator();
00536         while (itHistory.hasNext()) {
00537             UniqueID cur = (UniqueID) (itHistory.next());
00538             Request currentAwaitedRequest = new AwaitedRequest(cur);
00539             queue.add(currentAwaitedRequest);
00540             this.awaitedRequests.add(currentAwaitedRequest);
00541         }
00542 
00543         //enable communication
00544         //System.out.println("[CIC] enable communication");
00545         ((AbstractBody) owner).acceptCommunication();
00546 
00547         try {
00548             // update servers
00549             this.location.updateLocation(ownerID, owner.getRemoteAdapter());
00550             this.recovery.updateState(ownerID, RecoveryProcess.RUNNING);
00551         } catch (RemoteException e) {
00552             logger.error("Unable to connect with location server");
00553             e.printStackTrace();
00554         }
00555 
00556         // resend all in-transit message
00557         this.sendLogs((CheckpointInfoCIC) ci);
00558 
00559         return 0;
00560     }
00561 
00562     public void updateLocationAtServer(UniqueID ownerID, UniversalBody remoteBodyAdapter) {
00563         try {
00564             // update servers
00565             this.location.updateLocation(ownerID, remoteBodyAdapter);
00566 //            this.recovery.updateState(ownerID, RecoveryProcess.RUNNING);
00567         } catch (RemoteException e) {
00568             logger.error("Unable to connect with location server");
00569             e.printStackTrace();
00570         }
00571     }
00572     /*
00573      * search for an awaited request from r.source.
00574      * if any, unfreeze ar and remove it from awaitedRequests list.
00575      * WARNING : this.awaitedRequests must be ordered. Do not use a map !
00576      */
00577     private boolean updateAwaitedRequests(Request r) {
00578         AwaitedRequest ar = null;
00579         Iterator it = this.awaitedRequests.iterator();
00580         while (it.hasNext()) {
00581             AwaitedRequest arq = (AwaitedRequest) (it.next());
00582             if ((arq.getAwaitedSender()).equals(r.getSourceBodyID())) {
00583                 ar = arq;
00584                 break;
00585             }
00586         }
00587         if (ar != null) {
00588             //System.err.println(""+ this.ownerID + " Request is updated by " + r.getSourceBodyID());
00589             ar.setAwaitedRequest(r);
00590             this.awaitedRequests.remove(ar);
00591             return true;
00592         } else {
00593             return false;
00594         }
00595     }
00596 
00597     /*
00598      * return true if this ao have to checkpoint
00599      */
00600     private boolean haveToCheckpoint() {
00601         int currentCheckpointIndex = this.checkpointIndex;
00602         int currentNextMax = this.nextMax;
00603 
00604         // checkpoint if next is greater than index
00605         if (currentNextMax > currentCheckpointIndex) {
00606             return true;
00607         }
00608         // checkpoint if TTC is elapsed
00609         else if ((this.checkpointTimer + this.ttc) < System.currentTimeMillis()) {
00610             return true;
00611         } else {
00612             return false;
00613         }
00614     }
00615 
00616     /*
00617      * Perform a checkpoint with index = current + 1
00618      */
00619     private Checkpoint checkpoint(Request pendingRequest) {
00620         //stop accepting communication
00621         ((AbstractBody) owner).blockCommunication();
00622         // synchronized on hisotry to avoid hisot commit during checkpoint
00623         synchronized (this.historyLock) {
00624             Checkpoint c;
00625 
00626             //long start;
00627             //long end;
00628             try {
00629                 //start = System.currentTimeMillis();
00630                 //System.out.println("BEGIN CHECKPOINT : used mem = " + this.getUsedMem() );
00631                 synchronized (this) {
00632                     if (logger.isDebugEnabled()) {
00633                         logger.debug("[CIC] Checkpointing with index = " +
00634                             (this.checkpointIndex + 1));
00635                     }
00636 
00637                     // create infos for checkpoint
00638                     CheckpointInfoCIC ci = new CheckpointInfoCIC();
00639                     this.extendReplyLog(this.checkpointIndex + 1);
00640                     this.extendRequestLog(this.checkpointIndex + 1);
00641                     ci.replyToResend = (Vector) (this.replyToResend.get(new Integer(this.checkpointIndex +
00642                                 1)));
00643                     ci.requestToResend = (Vector) (this.requestToResend.get(new Integer(this.checkpointIndex +
00644                                 1)));
00645                     ci.pendingRequest = pendingRequest;
00646                     ci.checkpointIndex = this.checkpointIndex + 1;
00647 
00648                     // delete logs
00649                     this.replyToResend.remove(new Integer(this.checkpointIndex +
00650                             1));
00651                     this.requestToResend.remove(new Integer(this.checkpointIndex +
00652                             1));
00653 
00654                     // inc checkpoint index
00655                     this.checkpointIndex++;
00656 
00657                     // Reset history only if OC is not possible
00658                     if (!FTManagerCIC.isOCEnable) {
00659                         this.history = new Vector();
00660                         this.historyBaseIndex = this.deliveredRequestsCounter +
00661                             1;
00662                         this.lastCommitedIndex = this.deliveredRequestsCounter;
00663                     }
00664 
00665                     // current informations must not be stored in the checkpoint 
00666                     Hashtable requestToSendTMP = this.requestToResend;
00667                     this.requestToResend = null;
00668                     Hashtable replyToSendTMP = this.replyToResend;
00669                     this.replyToResend = null;
00670                     Vector historyTMP = this.history;
00671                     this.history = null;
00672                     Vector awaitedRequestTMP = this.awaitedRequests;
00673                     this.awaitedRequests = null;
00674 
00675                     // record the next history base index
00676                     ci.lastRcvdRequestIndex = this.deliveredRequestsCounter;
00677                     // checkpoint the active object
00678                     this.setCheckpointTag(true);
00679                     c = new Checkpoint((Body) owner, this.additionalCodebase);
00680                     // add info to checkpoint
00681                     c.setCheckpointInfo(ci);
00682 
00683                     // send it to server
00684                     this.storage.storeCheckpoint(c, this.incarnation);
00685                     this.setCheckpointTag(false);
00686 
00687                     // restore current informations               
00688                     this.replyToResend = replyToSendTMP;
00689                     this.requestToResend = requestToSendTMP;
00690                     this.history = historyTMP;
00691                     this.awaitedRequests = awaitedRequestTMP;
00692 
00693                     // this checkpoint has to be completed with its minimal hisotry
00694                     this.completingCheckpoint = true;
00695 
00696                     // reninit checkpoint values
00697                     this.checkpointTimer = System.currentTimeMillis();
00698                 }
00699 
00700                 //end = System.currentTimeMillis();
00701                 //System.out.println("[BENCH] Cumulated Ckpt time at " + this.checkpointIndex + " : " + this.cumulatedCheckpointTime + " ms");// + System.currentTimeMillis() + "]");
00702                 //System.out.println("END CHECKPOINTING : used mem = " + this.getUsedMem());
00703                 return c;
00704             } catch (RemoteException e) {
00705                 logger.error("[CIC] Unable to send checkpoint to the server");
00706                 e.printStackTrace();
00707             } finally {
00708                 // allow communication
00709                 ((AbstractBody) owner).acceptCommunication();
00710             }
00711             return null;
00712         }
00713     }
00714 
00715     /*
00716      * Return by result and send to server (if sendToServer) the current hisotry, from the last commited
00717      * index up to upTo. IndexOfCkpt is the index of the checkpoint that must be completed with this hisotry.
00718      * IsMinimal is true if this commit correspond to an update of a checkpoint with its minimal history
00719      */
00720     private HistoryUpdater commitHistories(int indexOfCkpt, long upTo,
00721         boolean sendToServer, boolean isMinimal) {
00722         synchronized (this.historyLock) {
00723             if (isMinimal && (this.historyIndex >= indexOfCkpt)) {
00724                 // this minimal commit has already be performed
00725                 // by a message reception : nothing to do
00726                 return null;
00727             }
00728 
00729             // HISTO COMMIT
00730             List histoToCommit = this.getHistoryToCommit(this.lastCommitedIndex +
00731                     1, upTo);
00732 
00733             // histo to commit could be null (ckpting during histo mode)
00734             HistoryUpdater toSend = null;
00735             if (histoToCommit == null) {
00736                 // must send an empty histo to the server to commit the new recovery line !
00737                 toSend = new HistoryUpdater(histoToCommit, 0, 0, this.ownerID,
00738                         indexOfCkpt, this.incarnation);
00739                 this.historyIndex = this.checkpointIndex;
00740                 // last commited does not change
00741             } else {
00742                 toSend = new HistoryUpdater(histoToCommit,
00743                         this.lastCommitedIndex + 1, upTo, this.ownerID,
00744                         indexOfCkpt, this.incarnation);
00745                 this.historyIndex = this.checkpointIndex;
00746                 this.lastCommitedIndex = upTo;
00747                 // delete commited history
00748                 this.deleteCommitedHistory(toSend.base, toSend.last);
00749             }
00750 
00751             // send to the server if asked
00752             if (sendToServer) {
00753                 try {
00754                     this.storage.commitHistory(toSend);
00755                 } catch (RemoteException e) {
00756                     logger.error("[ERROR] Storage server is not reachable !");
00757                     e.printStackTrace();
00758                 }
00759             }
00760             return toSend;
00761         }
00762     }
00763 
00764     /*
00765      * Return the current hisotry from from up to upto
00766      */
00767     private List getHistoryToCommit(long from, long upTo) {
00768         if (from == (upTo + 1)) {
00769             // Activity is still in histo mode: no commit is needed
00770             return null;
00771         } else {
00772             int translatedFrom = (int) (from - this.historyBaseIndex);
00773             int translatedUpTo = (int) (upTo - this.historyBaseIndex);
00774             Vector toRet = new Vector(translatedUpTo - translatedFrom);
00775             Iterator itHisto = this.history.iterator();
00776             for (int i = 0; i < translatedFrom; i++) {
00777                 itHisto.next();
00778             }
00779             for (int i = translatedFrom; i <= translatedUpTo; i++) {
00780                 toRet.add(itHisto.next());
00781             }
00782             return toRet;
00783         }
00784     }
00785 
00786     /*
00787      * Delete history from from up to upto
00788      */
00789     private void deleteCommitedHistory(long from, long upTo) {
00790         if ((upTo < this.historyBaseIndex) || (from < this.historyBaseIndex)) {
00791             throw new ProtocolErrorException("Deleting from " + from +
00792                 " up to " + upTo + " while local is from " +
00793                 this.historyBaseIndex + " up to " +
00794                 this.deliveredRequestsCounter);
00795         } else {
00796             if (!this.history.isEmpty()) {
00797                 this.history.subList((int) (from - this.historyBaseIndex),
00798                     (int) ((upTo - this.historyBaseIndex) + 1)).clear();
00799                 this.historyBaseIndex = upTo + 1;
00800             }
00801         }
00802     }
00803 
00804     /*
00805      * send logged messages before recovery
00806      * !!! ALL FT-STATE VARAIBLES MUST BE SET !!!
00807      */
00808     private void sendLogs(CheckpointInfoCIC ci) {
00809         //send replies
00810         //System.out.println("[CIC] Sending logged messages...");
00811         Vector replies = ci.replyToResend;
00812         Iterator itReplies = replies.iterator();
00813         while (itReplies.hasNext()) {
00814             //System.out.println( this.owner.getID() + "      SEND REPLY");
00815             UniversalBody destination = null;
00816             Reply r = null;
00817             ReplyLog rl = (ReplyLog) (itReplies.next());
00818             r = rl.getReply();
00819             destination = rl.getDestination();
00820             this.sendReply(r, destination);
00821         }
00822 
00823         //send requests
00824         Vector requests = ci.requestToResend;
00825         Iterator itRequests = requests.iterator();
00826         while (itRequests.hasNext()) {
00827             try {
00828                 //System.out.println( this.owner.getID() + "      SEND REQUEST");
00829                 UniversalBody destination = null;
00830                 RequestLog lr = (RequestLog) (itRequests.next());
00831                 Request loggedRequest = lr.getRequest();
00832                 destination = lr.getDestination();
00833                 // must create a new req : the sender must be this.owner
00834                 Request r = new RequestImpl(loggedRequest.getMethodCall(),
00835                         this.owner.getRemoteAdapter(),
00836                         loggedRequest.isOneWay(),
00837                         loggedRequest.getSequenceNumber());
00838                 this.sendRequest(r, destination);
00839             } catch (RenegotiateSessionException e) {
00840                 e.printStackTrace();
00841             }
00842         }
00843     }
00844 
00845     // replace request that are orphan for cic.checkpointIndex by awaitedRequest
00846     // and identify existing AwRq in the request queue
00847     private void filterQueue(BlockingRequestQueue queue, CheckpointInfoCIC cic) {
00848         CircularArrayList internalQueue = ((BlockingRequestQueueImpl) queue).getInternalQueue();
00849         ListIterator itQueue = internalQueue.listIterator();
00850         while (itQueue.hasNext()) {
00851             Request current = (Request) (itQueue.next());
00852             MessageInfoCIC mi = (MessageInfoCIC) current.getMessageInfo();
00853             if (mi == null) {
00854                 // is request an awaited or a non ft ?
00855                 if (current instanceof AwaitedRequest) {
00856                     // current is an awaited request that is not updated
00857                     this.awaitedRequests.add(current);
00858                 }
00859             } else if (mi.isOrphanFor <= cic.checkpointIndex) {
00860                 // current is an orpahn request 
00861                 // System.out.println("" + this.ownerID + " is filtering some orphan requests ...");
00862                 AwaitedRequest ar = new AwaitedRequest(current.getSourceBodyID());
00863                 itQueue.set(ar);
00864                 this.awaitedRequests.add(ar);
00865             }
00866         }
00867     }
00868 
00869     /*
00870      * Adapt the log size
00871      */
00872     private void extendRequestLog(int size) {
00873         if (this.latestRequestLog < size) {
00874             //the log vector must grow
00875             for (int j = this.latestRequestLog + 1; j <= size; j++) {
00876                 this.requestToResend.put(new Integer(j), new Vector());
00877             }
00878             this.latestRequestLog = size;
00879         }
00880     }
00881 
00882     /*
00883      * Adapt the log size
00884      */
00885     private void extendReplyLog(int size) {
00886         if (this.latestReplyLog < size) {
00887             //the log vector must grow
00888             for (int j = this.latestReplyLog + 1; j <= size; j++) {
00889                 this.replyToResend.put(new Integer(j), new Vector());
00890             }
00891             this.latestReplyLog = size;
00892         }
00893     }
00894 
00895     public String toString() {
00896         String ret = " Incarnation = ";
00897         ret += this.incarnation;
00898         return ret;
00899     }
00900 
00901     /*
00902      * Return the memory actually used
00903      * For debugging stuff.
00904      */
00905 
00906     //private long getUsedMem() {
00907     //    return (FTManagerCIC.runtime.totalMemory() -
00908     //    FTManagerCIC.runtime.freeMemory()) / 1024;
00909     //}
00913     // Double Dispatch pattern
00914     public Object handleFTMessage(FTMessage fte) {
00915         return fte.handleFTMessage(this);
00916     }
00917 
00923     public HistoryUpdater handlingGSCEEvent(GlobalStateCompletion fte) {
00924         // we commit the ENTIRE history, upto the last element
00925         // the hisotry is NOT sent by this.commitHitories
00926         HistoryUpdater rh = this.commitHistories(this.checkpointIndex,
00927                 this.deliveredRequestsCounter, false, true);
00928 
00929         // this commit close the completingCheckpoint state, indeed the 
00930         // minimal history for the latest checkpoint has been sent.
00931         if (this.completingCheckpoint) {
00932             this.completingCheckpoint = false;
00933         }
00934         return rh;
00935     }
00936 
00943     public HistoryUpdater handlingOCEvent(OutputCommit fte) {
00944         // commit history up to upTo
00945         long upTo = fte.getLastIndexToRetreive();
00946 
00947         // this hisotry must be attached to the current checkpoint, except if the current 
00948         // checkpoint is not completed with its minimal history
00949         int attachedIndex = (this.completingCheckpoint)
00950             ? (this.checkpointIndex - 1) : (this.checkpointIndex);
00951         HistoryUpdater toSend = this.commitHistories(attachedIndex, upTo,
00952                 false, false);
00953         return toSend;
00954     }
00955 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1