org/objectweb/proactive/core/body/ft/protocols/cic/servers/CheckpointServerCIC.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.ft.protocols.cic.servers;
00032 
00033 import java.io.IOException;
00034 import java.rmi.RemoteException;
00035 import java.util.ArrayList;
00036 import java.util.Enumeration;
00037 import java.util.Hashtable;
00038 import java.util.Iterator;
00039 import java.util.Vector;
00040 
00041 import org.apache.log4j.Logger;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.UniversalBody;
00044 import org.objectweb.proactive.core.body.ft.checkpointing.Checkpoint;
00045 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00046 import org.objectweb.proactive.core.body.ft.exception.NotImplementedException;
00047 import org.objectweb.proactive.core.body.ft.internalmsg.GlobalStateCompletion;
00048 import org.objectweb.proactive.core.body.ft.internalmsg.OutputCommit;
00049 import org.objectweb.proactive.core.body.ft.message.HistoryUpdater;
00050 import org.objectweb.proactive.core.body.ft.message.MessageInfo;
00051 import org.objectweb.proactive.core.body.ft.message.ReceptionHistory;
00052 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.CheckpointInfoCIC;
00053 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.MessageInfoCIC;
00054 import org.objectweb.proactive.core.body.ft.servers.FTServer;
00055 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryJob;
00056 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00057 import org.objectweb.proactive.core.body.ft.servers.storage.CheckpointServerImpl;
00058 import org.objectweb.proactive.core.body.ft.servers.util.ActiveQueue;
00059 import org.objectweb.proactive.core.body.ft.servers.util.ActiveQueueJob;
00060 import org.objectweb.proactive.core.body.ft.servers.util.JobBarrier;
00061 import org.objectweb.proactive.core.body.reply.Reply;
00062 import org.objectweb.proactive.core.body.request.Request;
00063 import org.objectweb.proactive.core.node.Node;
00064 import org.objectweb.proactive.core.node.NodeException;
00065 import org.objectweb.proactive.core.node.NodeFactory;
00066 import org.objectweb.proactive.core.util.MutableInteger;
00067 import org.objectweb.proactive.core.util.MutableLong;
00068 import org.objectweb.proactive.core.util.log.Loggers;
00069 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00070 
00071 
00077 public class CheckpointServerCIC extends CheckpointServerImpl {
00078 
00080     public static final int DEFAULT_GC_PERIOD = 40000;
00081 
00082     //logger
00083     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_CIC);
00084 
00085     //monitoring latest global state
00086     private Hashtable stateMonitor; //ckpt index -> number of stored checkpoint
00087     private int lastGlobalState;
00088     private int lastRegisteredCkpt;
00089 
00090     // current incarnation 
00091     private int globalIncarnation;
00092 
00093     // monitoring recovery line
00094     private Hashtable greatestCommitedHistory; // ids <-> index of the greatest commited histo
00095     private Hashtable recoveryLineMonitor; // ckpt index <-> number of completed checkpoints
00096     private int recoveryLine;
00097 
00098     // handling histories
00099     private Hashtable histories;
00100 
00101     // garbage collection
00102     private ActiveQueue gc;
00103 
00104     // profiling
00105     private boolean displayCkptSize;
00106 
00107     public CheckpointServerCIC(FTServer server) {
00108         super(server);
00109 
00110         this.stateMonitor = new Hashtable();
00111         this.lastGlobalState = 0;
00112         this.greatestCommitedHistory = new Hashtable();
00113         this.recoveryLineMonitor = new Hashtable();
00114         this.recoveryLine = 0;
00115         this.lastRegisteredCkpt = 0;
00116         this.globalIncarnation = 1;
00117         this.histories = new Hashtable();
00118 
00119         // garbage collection
00120         this.gc = new ActiveQueue("ActiveQueue: GC");
00121         gc.start();
00122         gc.addJob(new GarbageCollectionJob(this, DEFAULT_GC_PERIOD));
00123 
00124         this.displayCkptSize = false; // debugging stuff
00125     }
00126 
00130     public synchronized int storeCheckpoint(Checkpoint c, int incarnation)
00131         throws RemoteException {
00132         if (incarnation < this.globalIncarnation) {
00133             logger.warn("** WARNING ** : Object with incarnation " +
00134                 incarnation + " is trying to store checkpoint");
00135             return 0;
00136         }
00137 
00138         ArrayList ckptList = (ArrayList) checkpointStorage.get(c.getBodyID());
00139 
00140         // the first checkpoint ...
00141         if (ckptList == null) {
00142             // new storage slot
00143             ArrayList checkpoints = new ArrayList();
00144 
00145             //dummy first checkpoint
00146             checkpoints.add(new Object());
00147             UniqueID id = c.getBodyID();
00148             checkpointStorage.put(id, checkpoints);
00149             checkpoints.add(c);
00150             // new history slot
00151             this.histories.put(c.getBodyID(), new ReceptionHistory());
00152             // new greatestHisto slot
00153             this.greatestCommitedHistory.put(c.getBodyID(),
00154                 new MutableInteger(0));
00155         } else {
00156             //add checkpoint
00157             ckptList.add(c);
00158         }
00159 
00160         // updating monitoring
00161         int index = ((CheckpointInfoCIC) (c.getCheckpointInfo())).checkpointIndex;
00162         if (index > this.lastRegisteredCkpt) {
00163             this.lastRegisteredCkpt = index;
00164         }
00165         MutableInteger currentGlobalState = (MutableInteger) (this.stateMonitor.get(new MutableInteger(
00166                     index)));
00167         if (currentGlobalState == null) {
00168             // this is the first checkpoint store for the global state index
00169             this.stateMonitor.put(new MutableInteger(index),
00170                 new MutableInteger(1));
00171         } else {
00172             currentGlobalState.add(1);
00173         }
00174 
00175         //this.checkLastGlobalState();
00176         logger.info("[CKPT] Receive checkpoint indexed " + index +
00177             " from body " + c.getBodyID() + " (used memory = " +
00178             this.getUsedMem() + " Kb)"); // + "[" + System.currentTimeMillis() + "]");
00179 
00180         if (displayCkptSize) {
00181             logger.info("[CKPT] Size of ckpt " + index + " before addInfo : " +
00182                 this.getSize(c) + " bytes");
00183         }
00184 
00185         // broadcast history closure if a new globalState is built
00186         if (this.checkLastGlobalState()) {
00187             // send a GSC message to all
00188             Enumeration all = this.checkpointStorage.keys();
00189             while (all.hasMoreElements()) {
00190                 UniqueID callee = (UniqueID) (all.nextElement());
00191                 this.server.submitJob(new GSCESender(this.server, callee,
00192                         new GlobalStateCompletion(this.lastGlobalState)));
00193             }
00194         }
00195         return this.lastGlobalState;
00196     }
00197 
00201     public Checkpoint getCheckpoint(UniqueID id, int sequenceNumber)
00202         throws RemoteException {
00203         // TODO : checkpoints with multiple index ??
00204         return (Checkpoint) ((ArrayList) (checkpointStorage.get(id))).get(sequenceNumber);
00205     }
00206 
00210     public Checkpoint getLastCheckpoint(UniqueID id) throws RemoteException {
00211         ArrayList checkpoints = (java.util.ArrayList) (checkpointStorage.get(id));
00212         int size = checkpoints.size();
00213         return (Checkpoint) (checkpoints.get(size - 1));
00214     }
00215 
00219     public synchronized void addInfoToCheckpoint(CheckpointInfo ci,
00220         UniqueID id, int sequenceNumber, int incarnation)
00221         throws RemoteException {
00222     }
00223 
00227     public synchronized void commitHistory(HistoryUpdater rh)
00228         throws RemoteException {
00229         if (rh.incarnation < this.globalIncarnation) {
00230             logger.warn("** WARNING ** : Object with incarnation " +
00231                 rh.incarnation +
00232                 " is trying to store checkpoint infos (Current inc = " +
00233                 this.globalIncarnation + ")");
00234             return;
00235         }
00236 
00237         // update the histo if needed
00238         if (rh.elements != null) {
00239             ReceptionHistory ih = (ReceptionHistory) this.histories.get(rh.owner);
00240             ih.updateHistory(rh);
00241         }
00242 
00243         // update the recovery line monitoring
00244         MutableInteger greatestIndexSent = ((MutableInteger) (this.greatestCommitedHistory.get(rh.owner)));
00245         if (greatestIndexSent.getValue() < rh.checkpointIndex) {
00246             // must update rc monitoring
00247             greatestIndexSent.setValue(rh.checkpointIndex);
00248             // inc the rcv counter for the index indexOfCheckpoint
00249             MutableInteger counter = (MutableInteger) (this.recoveryLineMonitor.get(greatestIndexSent));
00250             if (counter == null) {
00251                 // this is the first histo commit with index indexOfCkpt
00252                 this.recoveryLineMonitor.put(new MutableInteger(
00253                         rh.checkpointIndex), new MutableInteger(1));
00254             } else {
00255                 counter.add(1);
00256             }
00257 
00258             // test if a new recovery line has been created
00259             // update histories if any
00260             this.checkRecoveryLine();
00261         }
00262     }
00263 
00267     public CheckpointInfo getInfoFromCheckpoint(UniqueID id, int sequenceNumber)
00268         throws RemoteException {
00269         throw new NotImplementedException();
00270     }
00271 
00276     public void storeRequest(UniqueID receiverId, Request request)
00277         throws RemoteException {
00278         throw new NotImplementedException();
00279     }
00280 
00285     public void storeReply(UniqueID receiverID, Reply reply)
00286         throws RemoteException {
00287         throw new NotImplementedException();
00288     }
00289 
00290     // return true if a new globalState is found
00291     private boolean checkLastGlobalState() {
00292         try {
00293             //logger.info("[CKPT] Checking last global state...");
00294             int systemSize = this.server.getSystemSize();
00295             int lastGB = this.lastGlobalState;
00296             int lastCkpt = this.lastRegisteredCkpt;
00297             MutableInteger mi = new MutableInteger(lastCkpt);
00298             for (int i = lastCkpt; i > lastGB; i--, mi.add(-1)) {
00299                 int numRegistered = ((MutableInteger) (this.stateMonitor.get(mi))).getValue();
00300                 if (numRegistered == systemSize) {
00301                     this.lastGlobalState = i;
00302                     return true;
00303                 }
00304             }
00305             return false;
00306         } catch (RemoteException e) {
00307             logger.error("**ERROR** Cannot contact recoveryProcess");
00308             e.printStackTrace();
00309             return false;
00310         }
00311     }
00312 
00313     //return true if the recoveryline has changed
00314     // Recovery increase only 1 by 1 ... TO DO
00315     private boolean checkRecoveryLine() {
00316         try {
00317             int systemSize = this.server.getSystemSize();
00318             MutableInteger nextPossible = (MutableInteger) (this.recoveryLineMonitor.get(new MutableInteger(this.recoveryLine +
00319                         1)));
00320 
00321             // THIS PART MUST BE ATOMIC
00322             if ((nextPossible != null) &&
00323                     (nextPossible.getValue() == systemSize)) {
00324                 // a new recovery line has been created
00325                 // update histories
00326                 Enumeration itKey = this.histories.keys();
00327                 while (itKey.hasMoreElements()) {
00328                     UniqueID key = (UniqueID) (itKey.nextElement());
00329                     ReceptionHistory cur = (ReceptionHistory) (this.histories.get(key));
00330                     long nextBase = ((CheckpointInfoCIC) (this.getCheckpoint(key,
00331                             this.recoveryLine + 1).getCheckpointInfo())).lastRcvdRequestIndex +
00332                         1;
00333                     cur.goToNextBase(nextBase);
00334                     cur.confirmLastUpdate();
00335                 }
00336 
00337                 // a new rec line has been created
00338                 this.recoveryLine = this.recoveryLine + 1;
00339                 logger.info("[CKPT] Recovery line is " + this.recoveryLine);
00340                 return true;
00341             }
00342         } catch (RemoteException e) {
00343             logger.error("[ERROR] The FT server is not reachable");
00344             e.printStackTrace();
00345         }
00346         return false;
00347     }
00348 
00349     // protected accessors 
00350     protected void internalRecover(UniqueID failed) {
00351         try {
00352             Enumeration itBodies = null;
00353             int globalState = 0;
00354             synchronized (this) {
00355                 globalState = this.recoveryLine;
00356                 this.globalIncarnation++;
00357                 logger.info("[RECOVERY] Recovering system from " + globalState +
00358                     " with incarnation " + this.globalIncarnation);
00359 
00360                 itBodies = this.checkpointStorage.keys();
00361                 this.lastGlobalState = globalState;
00362                 this.lastRegisteredCkpt = globalState;
00363                 this.recoveryLine = globalState;
00364                 this.stateMonitor = new Hashtable();
00365                 this.recoveryLineMonitor = new Hashtable();
00366 
00367                 // delete unusable checkpoints
00368                 Iterator it = this.checkpointStorage.values().iterator();
00369                 while (it.hasNext()) {
00370                     ArrayList ckpts = ((ArrayList) (it.next()));
00371                     while (ckpts.size() > (globalState + 1)) {
00372                         ckpts.remove(globalState + 1);
00373                     }
00374                 }
00375 
00376                 // set all the system in recovery state
00377                 while (itBodies.hasMoreElements()) {
00378                     UniqueID current = (UniqueID) (itBodies.nextElement());
00379                     this.server.updateState(current, RecoveryProcess.RECOVERING);
00380                 }
00381 
00382                 //reinit the iterator
00383                 itBodies = this.checkpointStorage.keys();
00384 
00385                 // reinit hisotries; delete not recoverable parts of histories
00386                 Enumeration itHistories = this.histories.elements();
00387                 while (itHistories.hasMoreElements()) {
00388                     ((ReceptionHistory) (itHistories.nextElement())).compactHistory();
00389                 }
00390             } // end synchronize
00391 
00392             // for waiting the end of the recovery
00393             Vector barriers = new Vector();
00394 
00395             // send checkpoints
00396             while (itBodies.hasMoreElements()) {
00397                 UniqueID current = (UniqueID) (itBodies.nextElement());
00398 
00399                 //Checkpoint toSend = this.server.getCheckpoint(current,globalState); 
00400                 Checkpoint toSend = this.getCheckpoint(current, globalState);
00401 
00402                 // update history of toSend
00403                 CheckpointInfoCIC cic = (CheckpointInfoCIC) (toSend.getCheckpointInfo());
00404                 ReceptionHistory histo = ((ReceptionHistory) (this.histories.get(current)));
00405                 cic.history = (Vector) histo.getRecoverableHistory();
00406                 // set the last commited index
00407                 cic.lastCommitedIndex = histo.getLastRecoverable();
00408 
00409                 if (current.equals(failed)) {
00410                     //look for a new Runtime for this oa
00411                     Node node = this.server.getFreeNode();
00412 
00413                     //if (node==null)return;
00414                     barriers.add(this.server.submitJobWithBarrier(
00415                             new RecoveryJob(toSend, this.globalIncarnation, node)));
00416                 } else {
00417                     UniversalBody toRecover = (UniversalBody) (this.server.getLocation(current));
00418 
00419                     // test current OA so as to handle mutliple failures
00420                     boolean isDead = false;
00421                     try {
00422                         isDead = this.server.isUnreachable(toRecover);
00423                     } catch (Exception e) {
00424                     }
00425                     if (isDead) {
00426                         Node node = this.server.getFreeNode();
00427 
00428                         //if (node==null)return;
00429                         barriers.add(this.server.submitJobWithBarrier(
00430                                 new RecoveryJob(toSend, this.globalIncarnation,
00431                                     node)));
00432                     } else {
00433                         String nodeURL = toRecover.getNodeURL();
00434                         Node node = NodeFactory.getNode(nodeURL);
00435                         barriers.add(this.server.submitJobWithBarrier(
00436                                 new RecoveryJob(toSend, this.globalIncarnation,
00437                                     node)));
00438                     }
00439                 }
00440             }
00441 
00442             // MUST WAIT THE TERMINAISON OF THE RECOVERY !
00443             // FaultDetection thread wait for the completion of the recovery
00444             // If a failure occurs during rec, it will be detected by an active object
00445             Iterator itBarriers = barriers.iterator();
00446             while (itBarriers.hasNext()) {
00447                 ((JobBarrier) (itBarriers.next())).waitForJobCompletion();
00448             }
00449         } catch (NodeException e) {
00450             logger.error(
00451                 "[RECOVERY] **ERROR** Unable to send checkpoint for recovery");
00452             e.printStackTrace();
00453         } catch (RemoteException e) {
00454             logger.error(
00455                 "[RECOVERY] **ERROR** Cannot contact checkpoint server");
00456             e.printStackTrace();
00457         }
00458     }
00459 
00463     public synchronized void outputCommit(MessageInfo mi)
00464         throws RemoteException {
00465         Hashtable vectorClock = ((MessageInfoCIC) mi).vectorClock;
00466 
00467         // must store at least each histo up to vectorClock[id]       
00468         Enumeration enumClocks = vectorClock.keys();
00469 
00470         // <ATOMIC>
00471         while (enumClocks.hasMoreElements()) {
00472             UniqueID id = (UniqueID) (enumClocks.nextElement());
00473             MutableLong ml = (MutableLong) vectorClock.get(id);
00474             ReceptionHistory ih = (ReceptionHistory) (this.histories.get(id));
00475 
00476             // first test if a history retreiving is necessary
00477             // i.e. if vc[id]<=histories[id].lastCommited
00478             long lastCommited = ih.getLastCommited();
00479             long index = ml.getValue();
00480             if (lastCommited < index) {
00481                 try {
00482                     UniversalBody target = this.server.getLocation(id);
00483                     HistoryUpdater rh = (HistoryUpdater) (target.receiveFTMessage(new OutputCommit(lastCommited +
00484                                 1, index)));
00485                     ih.updateHistory(rh);
00486                 } catch (RemoteException e) {
00487                     logger.error("**ERROR** Unable to retreive history of " +
00488                         id);
00489                     e.printStackTrace();
00490                 } catch (IOException e) {
00491                     logger.error("**ERROR** Unable to retreive history of " +
00492                         id);
00493                     e.printStackTrace();
00494                 }
00495             }
00496         }
00497 
00498         // </ATOMIC>
00499         // wait for completion of histo retreiving
00500         // here we can commit alteration on histories
00501         Enumeration allHisto = this.histories.elements();
00502         while (allHisto.hasMoreElements()) {
00503             ReceptionHistory element = (ReceptionHistory) allHisto.nextElement();
00504             element.confirmLastUpdate();
00505         }
00506     }
00507 
00511     public void initialize() throws RemoteException {
00512         super.initialize();
00513         this.stateMonitor = new Hashtable();
00514         this.lastGlobalState = 0;
00515         this.greatestCommitedHistory = new Hashtable();
00516         this.recoveryLineMonitor = new Hashtable();
00517         this.recoveryLine = 0;
00518         this.lastRegisteredCkpt = 0;
00519         this.globalIncarnation = 1;
00520         this.histories = new Hashtable();
00521         // kill GC thread
00522         gc.killMe();
00523         gc = new ActiveQueue("ActiveQueue: GC");
00524         gc.start();
00525         gc.addJob(new GarbageCollectionJob(this, DEFAULT_GC_PERIOD));
00526     }
00527 
00531     private static class GarbageCollectionJob implements ActiveQueueJob {
00532         // this job is CIC specific
00533         private CheckpointServerCIC server;
00534 
00535         // period of garbage collection
00536         private int period;
00537 
00538         // constructor
00539         protected GarbageCollectionJob(CheckpointServerCIC server, int period) {
00540             this.server = server;
00541             this.period = period;
00542         }
00543 
00549         public void doTheJob() {
00550             while (true) {
00551                 try {
00552                     Thread.sleep(period);
00553                     CheckpointServerCIC.logger.info(
00554                         "[CKPT] Performing Garbage Collection...");
00555                     this.garbageCollection();
00556                     CheckpointServerCIC.logger.info(
00557                         "[CKPT] Garbage Collection done.");
00558                 } catch (InterruptedException e) {
00559                     e.printStackTrace();
00560                 }
00561             }
00562         }
00563 
00564         // Delete unsable checkpoints, i.e. index < currentRecoveryLine
00565         protected void garbageCollection() {
00566             boolean hasGarbaged = false;
00567             synchronized (server) {
00568                 int recLine = server.recoveryLine;
00569                 Iterator it = server.checkpointStorage.values().iterator();
00570                 while (it.hasNext()) {
00571                     ArrayList ckpts = ((ArrayList) (it.next()));
00572                     for (int i = 0; i < recLine; i++) {
00573                         if (ckpts.get(i) != null) {
00574                             hasGarbaged = true;
00575                             ckpts.remove(i);
00576                             ckpts.add(i, null);
00577                         }
00578                     }
00579                 }
00580             }
00581             if (hasGarbaged) {
00582                 System.gc();
00583             }
00584         }
00585     }
00586 
00587     /*
00588      * This class define a job for sending a global state completion notification.
00589      */
00590     private static class GSCESender implements ActiveQueueJob {
00591         private FTServer server;
00592         private UniqueID callee;
00593         private GlobalStateCompletion toSend;
00594 
00595         public GSCESender(FTServer s, UniqueID c, GlobalStateCompletion ts) {
00596             this.server = s;
00597             this.callee = c;
00598             this.toSend = ts;
00599         }
00600 
00601         public void doTheJob() {
00602             try {
00603                 UniversalBody destination = server.getLocation(callee);
00604 
00605                 // THIS CALL MUST BE FT !!!!
00606                 HistoryUpdater histo = (HistoryUpdater) (destination.receiveFTMessage(toSend));
00607 
00608                 // histo could be null : nothing to commit
00609                 if (histo != null) {
00610                     server.commitHistory(histo);
00611                 }
00612             } catch (IOException e) {
00613                 try {
00614                     server.forceDetection();
00615                 } catch (RemoteException e1) {
00616                     e1.printStackTrace();
00617                 }
00618             }
00619         }
00620     }
00621 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1