org/objectweb/proactive/core/body/ft/protocols/pmlrb/managers/FTManagerPMLRB.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.ft.protocols.pmlrb.managers;
00032 
00033 import java.io.IOException;
00034 import java.rmi.RemoteException;
00035 import java.util.Hashtable;
00036 import java.util.Iterator;
00037 import java.util.List;
00038 
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.Body;
00041 import org.objectweb.proactive.core.ProActiveException;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.AbstractBody;
00044 import org.objectweb.proactive.core.body.UniversalBody;
00045 import org.objectweb.proactive.core.body.ft.checkpointing.Checkpoint;
00046 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00047 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00048 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00049 import org.objectweb.proactive.core.body.ft.protocols.pmlrb.infos.CheckpointInfoPMLRB;
00050 import org.objectweb.proactive.core.body.ft.protocols.pmlrb.infos.MessageInfoPMLRB;
00051 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00052 import org.objectweb.proactive.core.body.future.FuturePool;
00053 import org.objectweb.proactive.core.body.message.Message;
00054 import org.objectweb.proactive.core.body.reply.Reply;
00055 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00056 import org.objectweb.proactive.core.body.request.Request;
00057 import org.objectweb.proactive.core.util.MutableLong;
00058 import org.objectweb.proactive.core.util.log.Loggers;
00059 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00060 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00061 
00062 
00069 public class FTManagerPMLRB extends FTManager {
00070 
00072     public static final int INC_VALUE = Integer.MAX_VALUE;
00073 
00075     public static final int IGNORED_MSG = -1;
00076 
00077     //logger
00078     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_PML);
00079 
00080     // index of the latest received messae per senderID
00081     // UniqueID <-> MutableLong
00082     private Hashtable latestReceivedIndex;
00083 
00084     // true if this oa is recovering
00085     private boolean isRecovering;
00086 
00087     // timer
00088     private long checkpointTimer;
00089 
00090     //sequence number of sending for any messages
00091     private char sendNumber;
00092     private MessageInfoPMLRB replyInfos;
00093     private MessageInfoPMLRB requestInfos;
00094 
00095     // After a recovery, the last message of the log could be resend by its sender:
00096     // if the failure has occured between the logging and the end of the RDV.
00097     // Identify possible duplicatas
00098     private transient UniqueID potentialDuplicataSender;
00099     private transient long potentialDuplicataSequence;
00100 
00105     public int init(AbstractBody owner) throws ProActiveException {
00106         super.init(owner);
00107         this.latestReceivedIndex = new Hashtable();
00108         this.isRecovering = false;
00109         //checkpoint timer init: a checkpoint must be taken before any request service
00110         this.checkpointTimer = 0;
00111         // this.checkpointTimer = System.currentTimeMillis();
00112         this.sendNumber = 0;
00113         this.replyInfos = new MessageInfoPMLRB();
00114         this.requestInfos = new MessageInfoPMLRB();
00115         this.potentialDuplicataSender = null;
00116         this.potentialDuplicataSequence = 0;
00117         logger.info(" PML fault-tolerance is enabled for body " + this.ownerID);
00118         return 0;
00119     }
00120 
00125     public int onReceiveReply(Reply reply) {
00126         // if the message is sent by a non ft object
00127         if (reply.getMessageInfo() == null) {
00128             reply.setFTManager(this);
00129             return 0;
00130         }
00131 
00132         // Automatic continuation ---> Replies are not in sequence
00133         // we thus cannot block ac replies
00134         if (!reply.isAutomaticContinuation()) {
00135             if (this.alreadyReceived(reply)) {
00136                 // this message has already been received. Ignore it
00137                 reply.setIgnoreIt(true);
00138                 return IGNORED_MSG;
00139             }
00140         }
00141         reply.setFTManager(this);
00142         return 0;
00143     }
00144 
00149     public int onReceiveRequest(Request request) {
00150         // if the message is sent from a non ft object
00151         if (request.getMessageInfo() == null) {
00152             request.setFTManager(this);
00153             return 0;
00154         }
00155         if (this.alreadyReceived(request)) {
00156             // this message has already been received. Ignore it
00157             request.setIgnoreIt(true);
00158             return IGNORED_MSG;
00159         }
00160         request.setFTManager(this);
00161         return 0;
00162     }
00163 
00169     public int onDeliverReply(Reply reply) {
00170         // if the ao is recovering, message are not logged
00171         if (!this.isRecovering) {
00172             try {
00173                 // log the message
00174                 this.storage.storeReply(this.ownerID, reply);
00175                 // update latestIndex table
00176                 this.updateLatestRvdIndexTable(reply);
00177             } catch (RemoteException e) {
00178                 e.printStackTrace();
00179             }
00180         }
00181         return 0;
00182     }
00183 
00189     public int onDeliverRequest(Request request) {
00190         // if the ao is recovering, message are not logged
00191         if (!this.isRecovering) {
00192             try {
00193                 // log the message
00194                 this.storage.storeRequest(this.ownerID, request);
00195                 // update latestIndex table
00196                 this.updateLatestRvdIndexTable(request);
00197             } catch (RemoteException e) {
00198                 e.printStackTrace();
00199             }
00200         }
00201         return 0;
00202     }
00203 
00204     /*
00205      * Set the value of m.sourceBody to m.seqNumber
00206      */
00207     private void updateLatestRvdIndexTable(Message m) {
00208         // the first message from this sender?
00209         MutableLong index = (MutableLong) (this.latestReceivedIndex.get(m.getSourceBodyID()));
00210         MessageInfoPMLRB mi = (MessageInfoPMLRB) (m.getMessageInfo());
00211         if (mi == null) {
00212             // from a not ft
00213             return;
00214         }
00215         long msgIndex = mi.sentSequenceNumber;
00216         if (index != null) {
00217             index.setValue(msgIndex);
00218         } else {
00219             //first message
00220             this.latestReceivedIndex.put(m.getSourceBodyID(),
00221                 new MutableLong(msgIndex));
00222         }
00223     }
00224 
00225     /*
00226      * Return true if this message has already been received
00227      */
00228     private boolean alreadyReceived(Message m) {
00229         if ((this.potentialDuplicataSender != null) &&
00230                 (m.getSourceBodyID().equals(this.potentialDuplicataSender)) &&
00231                 (m.getSequenceNumber() == this.potentialDuplicataSequence)) {
00232             // this message has been already logged just before the failure of this.
00233             // no more such message can appear...
00234             this.potentialDuplicataSender = null;
00235             return true;
00236         } else {
00237             long msgIndex = ((MessageInfoPMLRB) (m.getMessageInfo())).sentSequenceNumber;
00238             MutableLong index = (MutableLong) (this.latestReceivedIndex.get(m.getSourceBodyID()));
00239             return (index != null) && (msgIndex <= index.getValue());
00240         }
00241     }
00242 
00246     public synchronized int onSendReplyBefore(Reply reply) {
00247         this.replyInfos.sentSequenceNumber = this.getNextSendNumber();
00248         reply.setMessageInfo(this.replyInfos);
00249         return 0;
00250     }
00251 
00255     public int onSendReplyAfter(Reply reply, int rdvValue,
00256         UniversalBody destination) {
00257         return 0;
00258     }
00259 
00263     public synchronized int onSendRequestBefore(Request request) {
00264         this.requestInfos.sentSequenceNumber = this.getNextSendNumber();
00265         request.setMessageInfo(this.requestInfos);
00266         return 0;
00267     }
00268 
00272     public int onSendRequestAfter(Request request, int rdvValue,
00273         UniversalBody destination) throws RenegotiateSessionException {
00274         return 0;
00275     }
00276 
00280     public int onServeRequestBefore(Request request) {
00281         if (this.haveToCheckpoint()) {
00282             this.checkpoint(request);
00283         }
00284         return 0;
00285     }
00286 
00290     public int onServeRequestAfter(Request request) {
00291         return 0;
00292     }
00293 
00297     public int beforeRestartAfterRecovery(CheckpointInfo ci, int inc) {
00298         // recovery mode: received message no longer logged
00299         this.isRecovering = true;
00300 
00301         //first must register incoming futures deserialized by the recovery thread
00302         this.owner.registerIncomingFutures();
00303 
00304         //get messages
00305         List replies = ((CheckpointInfoPMLRB) ci).getReplyLog();
00306         List request = ((CheckpointInfoPMLRB) ci).getRequestLog();
00307 
00308         // deal with potential duplicata of request
00309         // duplicata of replies are not treated since they are automaticaly ignored.
00310         Request potentialDuplicata = (Request) (request.get(request.size() - 1));
00311         this.potentialDuplicataSender = potentialDuplicata.getSourceBodyID();
00312         this.potentialDuplicataSequence = potentialDuplicata.getSequenceNumber();
00313 
00314         // add messages in the body context
00315         Iterator itRequest = request.iterator();
00316         BlockingRequestQueue queue = owner.getRequestQueue();
00317 
00318         while (itRequest.hasNext()) {
00319             queue.add((Request) (itRequest.next()));
00320         }
00321 
00322         // replies
00323         Iterator itReplies = replies.iterator();
00324         FuturePool fp = owner.getFuturePool();
00325         try {
00326             while (itReplies.hasNext()) {
00327                 Reply current = (Reply) (itReplies.next());
00328                 fp.receiveFutureValue(current.getSequenceNumber(),
00329                     current.getSourceBodyID(), current.getResult(), current);
00330             }
00331         } catch (IOException e) {
00332             e.printStackTrace();
00333         }
00334 
00335         //      add pending request to reuqestQueue
00336         Request pendingRequest = ((CheckpointInfoPMLRB) ci).getPendingRequest();
00337 
00338         // pending request could be null 
00339         if (pendingRequest != null) {
00340             queue.addToFront(pendingRequest);
00341         }
00342 
00343         // normal mode
00344         this.isRecovering = false;
00345 
00346         // enable communication
00347         this.owner.acceptCommunication();
00348 
00349         try {
00350             // update servers
00351             this.location.updateLocation(ownerID, owner.getRemoteAdapter());
00352             this.recovery.updateState(ownerID, RecoveryProcess.RUNNING);
00353         } catch (RemoteException e) {
00354             logger.error("Unable to connect with location server");
00355             e.printStackTrace();
00356         }
00357 
00358         this.checkpointTimer = System.currentTimeMillis();
00359 
00360         return 0;
00361     }
00362 
00366     public Object handleFTMessage(FTMessage fte) {
00367         return fte.handleFTMessage(this);
00368     }
00369 
00371     // private methods //
00373     private boolean haveToCheckpoint() {
00374         return ((this.checkpointTimer + this.ttc) < System.currentTimeMillis());
00375     }
00376 
00377     private void checkpoint(Request pending) {
00378         //System.out.println("[PMLRB] Checkpointing...");
00379         owner.blockCommunication();
00380         // checkpoint the active object
00381         try {
00382             this.setCheckpointTag(true);
00383             // create a checkpoint
00384             Checkpoint c = new Checkpoint((Body) owner, this.additionalCodebase);
00385 
00386             // create checkpoint info with the pending request
00387             CheckpointInfoPMLRB ci = new CheckpointInfoPMLRB(pending);
00388 
00389             // attach infos
00390             c.setCheckpointInfo(ci);
00391             // send it to server
00392             this.storage.storeCheckpoint(c, FTManagerPMLRB.DEFAULT_TTC_VALUE); // SEE INC VALUE !
00393 
00394             // reninit checkpoint values
00395             this.checkpointTimer = System.currentTimeMillis();
00396 
00397             this.setCheckpointTag(false);
00398         } catch (RemoteException e) {
00399             logger.error("[PMLRB] Unable to send checkpoint to the server");
00400             e.printStackTrace();
00401         }
00402 
00403         owner.acceptCommunication();
00404     }
00405 
00406     private synchronized char getNextSendNumber() {
00407         return ++sendNumber;
00408     }
00409 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1