00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.ft.protocols.cic.servers;
00032
00033 import java.io.IOException;
00034 import java.rmi.RemoteException;
00035 import java.util.ArrayList;
00036 import java.util.Enumeration;
00037 import java.util.Hashtable;
00038 import java.util.Iterator;
00039 import java.util.Vector;
00040
00041 import org.apache.log4j.Logger;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.UniversalBody;
00044 import org.objectweb.proactive.core.body.ft.checkpointing.Checkpoint;
00045 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00046 import org.objectweb.proactive.core.body.ft.exception.NotImplementedException;
00047 import org.objectweb.proactive.core.body.ft.internalmsg.GlobalStateCompletion;
00048 import org.objectweb.proactive.core.body.ft.internalmsg.OutputCommit;
00049 import org.objectweb.proactive.core.body.ft.message.HistoryUpdater;
00050 import org.objectweb.proactive.core.body.ft.message.MessageInfo;
00051 import org.objectweb.proactive.core.body.ft.message.ReceptionHistory;
00052 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.CheckpointInfoCIC;
00053 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.MessageInfoCIC;
00054 import org.objectweb.proactive.core.body.ft.servers.FTServer;
00055 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryJob;
00056 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00057 import org.objectweb.proactive.core.body.ft.servers.storage.CheckpointServerImpl;
00058 import org.objectweb.proactive.core.body.ft.servers.util.ActiveQueue;
00059 import org.objectweb.proactive.core.body.ft.servers.util.ActiveQueueJob;
00060 import org.objectweb.proactive.core.body.ft.servers.util.JobBarrier;
00061 import org.objectweb.proactive.core.body.reply.Reply;
00062 import org.objectweb.proactive.core.body.request.Request;
00063 import org.objectweb.proactive.core.node.Node;
00064 import org.objectweb.proactive.core.node.NodeException;
00065 import org.objectweb.proactive.core.node.NodeFactory;
00066 import org.objectweb.proactive.core.util.MutableInteger;
00067 import org.objectweb.proactive.core.util.MutableLong;
00068 import org.objectweb.proactive.core.util.log.Loggers;
00069 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00070
00071
00077 public class CheckpointServerCIC extends CheckpointServerImpl {
00078
00080 public static final int DEFAULT_GC_PERIOD = 40000;
00081
00082
00083 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_CIC);
00084
00085
00086 private Hashtable stateMonitor;
00087 private int lastGlobalState;
00088 private int lastRegisteredCkpt;
00089
00090
00091 private int globalIncarnation;
00092
00093
00094 private Hashtable greatestCommitedHistory;
00095 private Hashtable recoveryLineMonitor;
00096 private int recoveryLine;
00097
00098
00099 private Hashtable histories;
00100
00101
00102 private ActiveQueue gc;
00103
00104
00105 private boolean displayCkptSize;
00106
00107 public CheckpointServerCIC(FTServer server) {
00108 super(server);
00109
00110 this.stateMonitor = new Hashtable();
00111 this.lastGlobalState = 0;
00112 this.greatestCommitedHistory = new Hashtable();
00113 this.recoveryLineMonitor = new Hashtable();
00114 this.recoveryLine = 0;
00115 this.lastRegisteredCkpt = 0;
00116 this.globalIncarnation = 1;
00117 this.histories = new Hashtable();
00118
00119
00120 this.gc = new ActiveQueue("ActiveQueue: GC");
00121 gc.start();
00122 gc.addJob(new GarbageCollectionJob(this, DEFAULT_GC_PERIOD));
00123
00124 this.displayCkptSize = false;
00125 }
00126
00130 public synchronized int storeCheckpoint(Checkpoint c, int incarnation)
00131 throws RemoteException {
00132 if (incarnation < this.globalIncarnation) {
00133 logger.warn("** WARNING ** : Object with incarnation " +
00134 incarnation + " is trying to store checkpoint");
00135 return 0;
00136 }
00137
00138 ArrayList ckptList = (ArrayList) checkpointStorage.get(c.getBodyID());
00139
00140
00141 if (ckptList == null) {
00142
00143 ArrayList checkpoints = new ArrayList();
00144
00145
00146 checkpoints.add(new Object());
00147 UniqueID id = c.getBodyID();
00148 checkpointStorage.put(id, checkpoints);
00149 checkpoints.add(c);
00150
00151 this.histories.put(c.getBodyID(), new ReceptionHistory());
00152
00153 this.greatestCommitedHistory.put(c.getBodyID(),
00154 new MutableInteger(0));
00155 } else {
00156
00157 ckptList.add(c);
00158 }
00159
00160
00161 int index = ((CheckpointInfoCIC) (c.getCheckpointInfo())).checkpointIndex;
00162 if (index > this.lastRegisteredCkpt) {
00163 this.lastRegisteredCkpt = index;
00164 }
00165 MutableInteger currentGlobalState = (MutableInteger) (this.stateMonitor.get(new MutableInteger(
00166 index)));
00167 if (currentGlobalState == null) {
00168
00169 this.stateMonitor.put(new MutableInteger(index),
00170 new MutableInteger(1));
00171 } else {
00172 currentGlobalState.add(1);
00173 }
00174
00175
00176 logger.info("[CKPT] Receive checkpoint indexed " + index +
00177 " from body " + c.getBodyID() + " (used memory = " +
00178 this.getUsedMem() + " Kb)");
00179
00180 if (displayCkptSize) {
00181 logger.info("[CKPT] Size of ckpt " + index + " before addInfo : " +
00182 this.getSize(c) + " bytes");
00183 }
00184
00185
00186 if (this.checkLastGlobalState()) {
00187
00188 Enumeration all = this.checkpointStorage.keys();
00189 while (all.hasMoreElements()) {
00190 UniqueID callee = (UniqueID) (all.nextElement());
00191 this.server.submitJob(new GSCESender(this.server, callee,
00192 new GlobalStateCompletion(this.lastGlobalState)));
00193 }
00194 }
00195 return this.lastGlobalState;
00196 }
00197
00201 public Checkpoint getCheckpoint(UniqueID id, int sequenceNumber)
00202 throws RemoteException {
00203
00204 return (Checkpoint) ((ArrayList) (checkpointStorage.get(id))).get(sequenceNumber);
00205 }
00206
00210 public Checkpoint getLastCheckpoint(UniqueID id) throws RemoteException {
00211 ArrayList checkpoints = (java.util.ArrayList) (checkpointStorage.get(id));
00212 int size = checkpoints.size();
00213 return (Checkpoint) (checkpoints.get(size - 1));
00214 }
00215
00219 public synchronized void addInfoToCheckpoint(CheckpointInfo ci,
00220 UniqueID id, int sequenceNumber, int incarnation)
00221 throws RemoteException {
00222 }
00223
00227 public synchronized void commitHistory(HistoryUpdater rh)
00228 throws RemoteException {
00229 if (rh.incarnation < this.globalIncarnation) {
00230 logger.warn("** WARNING ** : Object with incarnation " +
00231 rh.incarnation +
00232 " is trying to store checkpoint infos (Current inc = " +
00233 this.globalIncarnation + ")");
00234 return;
00235 }
00236
00237
00238 if (rh.elements != null) {
00239 ReceptionHistory ih = (ReceptionHistory) this.histories.get(rh.owner);
00240 ih.updateHistory(rh);
00241 }
00242
00243
00244 MutableInteger greatestIndexSent = ((MutableInteger) (this.greatestCommitedHistory.get(rh.owner)));
00245 if (greatestIndexSent.getValue() < rh.checkpointIndex) {
00246
00247 greatestIndexSent.setValue(rh.checkpointIndex);
00248
00249 MutableInteger counter = (MutableInteger) (this.recoveryLineMonitor.get(greatestIndexSent));
00250 if (counter == null) {
00251
00252 this.recoveryLineMonitor.put(new MutableInteger(
00253 rh.checkpointIndex), new MutableInteger(1));
00254 } else {
00255 counter.add(1);
00256 }
00257
00258
00259
00260 this.checkRecoveryLine();
00261 }
00262 }
00263
00267 public CheckpointInfo getInfoFromCheckpoint(UniqueID id, int sequenceNumber)
00268 throws RemoteException {
00269 throw new NotImplementedException();
00270 }
00271
00276 public void storeRequest(UniqueID receiverId, Request request)
00277 throws RemoteException {
00278 throw new NotImplementedException();
00279 }
00280
00285 public void storeReply(UniqueID receiverID, Reply reply)
00286 throws RemoteException {
00287 throw new NotImplementedException();
00288 }
00289
00290
00291 private boolean checkLastGlobalState() {
00292 try {
00293
00294 int systemSize = this.server.getSystemSize();
00295 int lastGB = this.lastGlobalState;
00296 int lastCkpt = this.lastRegisteredCkpt;
00297 MutableInteger mi = new MutableInteger(lastCkpt);
00298 for (int i = lastCkpt; i > lastGB; i--, mi.add(-1)) {
00299 int numRegistered = ((MutableInteger) (this.stateMonitor.get(mi))).getValue();
00300 if (numRegistered == systemSize) {
00301 this.lastGlobalState = i;
00302 return true;
00303 }
00304 }
00305 return false;
00306 } catch (RemoteException e) {
00307 logger.error("**ERROR** Cannot contact recoveryProcess");
00308 e.printStackTrace();
00309 return false;
00310 }
00311 }
00312
00313
00314
00315 private boolean checkRecoveryLine() {
00316 try {
00317 int systemSize = this.server.getSystemSize();
00318 MutableInteger nextPossible = (MutableInteger) (this.recoveryLineMonitor.get(new MutableInteger(this.recoveryLine +
00319 1)));
00320
00321
00322 if ((nextPossible != null) &&
00323 (nextPossible.getValue() == systemSize)) {
00324
00325
00326 Enumeration itKey = this.histories.keys();
00327 while (itKey.hasMoreElements()) {
00328 UniqueID key = (UniqueID) (itKey.nextElement());
00329 ReceptionHistory cur = (ReceptionHistory) (this.histories.get(key));
00330 long nextBase = ((CheckpointInfoCIC) (this.getCheckpoint(key,
00331 this.recoveryLine + 1).getCheckpointInfo())).lastRcvdRequestIndex +
00332 1;
00333 cur.goToNextBase(nextBase);
00334 cur.confirmLastUpdate();
00335 }
00336
00337
00338 this.recoveryLine = this.recoveryLine + 1;
00339 logger.info("[CKPT] Recovery line is " + this.recoveryLine);
00340 return true;
00341 }
00342 } catch (RemoteException e) {
00343 logger.error("[ERROR] The FT server is not reachable");
00344 e.printStackTrace();
00345 }
00346 return false;
00347 }
00348
00349
00350 protected void internalRecover(UniqueID failed) {
00351 try {
00352 Enumeration itBodies = null;
00353 int globalState = 0;
00354 synchronized (this) {
00355 globalState = this.recoveryLine;
00356 this.globalIncarnation++;
00357 logger.info("[RECOVERY] Recovering system from " + globalState +
00358 " with incarnation " + this.globalIncarnation);
00359
00360 itBodies = this.checkpointStorage.keys();
00361 this.lastGlobalState = globalState;
00362 this.lastRegisteredCkpt = globalState;
00363 this.recoveryLine = globalState;
00364 this.stateMonitor = new Hashtable();
00365 this.recoveryLineMonitor = new Hashtable();
00366
00367
00368 Iterator it = this.checkpointStorage.values().iterator();
00369 while (it.hasNext()) {
00370 ArrayList ckpts = ((ArrayList) (it.next()));
00371 while (ckpts.size() > (globalState + 1)) {
00372 ckpts.remove(globalState + 1);
00373 }
00374 }
00375
00376
00377 while (itBodies.hasMoreElements()) {
00378 UniqueID current = (UniqueID) (itBodies.nextElement());
00379 this.server.updateState(current, RecoveryProcess.RECOVERING);
00380 }
00381
00382
00383 itBodies = this.checkpointStorage.keys();
00384
00385
00386 Enumeration itHistories = this.histories.elements();
00387 while (itHistories.hasMoreElements()) {
00388 ((ReceptionHistory) (itHistories.nextElement())).compactHistory();
00389 }
00390 }
00391
00392
00393 Vector barriers = new Vector();
00394
00395
00396 while (itBodies.hasMoreElements()) {
00397 UniqueID current = (UniqueID) (itBodies.nextElement());
00398
00399
00400 Checkpoint toSend = this.getCheckpoint(current, globalState);
00401
00402
00403 CheckpointInfoCIC cic = (CheckpointInfoCIC) (toSend.getCheckpointInfo());
00404 ReceptionHistory histo = ((ReceptionHistory) (this.histories.get(current)));
00405 cic.history = (Vector) histo.getRecoverableHistory();
00406
00407 cic.lastCommitedIndex = histo.getLastRecoverable();
00408
00409 if (current.equals(failed)) {
00410
00411 Node node = this.server.getFreeNode();
00412
00413
00414 barriers.add(this.server.submitJobWithBarrier(
00415 new RecoveryJob(toSend, this.globalIncarnation, node)));
00416 } else {
00417 UniversalBody toRecover = (UniversalBody) (this.server.getLocation(current));
00418
00419
00420 boolean isDead = false;
00421 try {
00422 isDead = this.server.isUnreachable(toRecover);
00423 } catch (Exception e) {
00424 }
00425 if (isDead) {
00426 Node node = this.server.getFreeNode();
00427
00428
00429 barriers.add(this.server.submitJobWithBarrier(
00430 new RecoveryJob(toSend, this.globalIncarnation,
00431 node)));
00432 } else {
00433 String nodeURL = toRecover.getNodeURL();
00434 Node node = NodeFactory.getNode(nodeURL);
00435 barriers.add(this.server.submitJobWithBarrier(
00436 new RecoveryJob(toSend, this.globalIncarnation,
00437 node)));
00438 }
00439 }
00440 }
00441
00442
00443
00444
00445 Iterator itBarriers = barriers.iterator();
00446 while (itBarriers.hasNext()) {
00447 ((JobBarrier) (itBarriers.next())).waitForJobCompletion();
00448 }
00449 } catch (NodeException e) {
00450 logger.error(
00451 "[RECOVERY] **ERROR** Unable to send checkpoint for recovery");
00452 e.printStackTrace();
00453 } catch (RemoteException e) {
00454 logger.error(
00455 "[RECOVERY] **ERROR** Cannot contact checkpoint server");
00456 e.printStackTrace();
00457 }
00458 }
00459
00463 public synchronized void outputCommit(MessageInfo mi)
00464 throws RemoteException {
00465 Hashtable vectorClock = ((MessageInfoCIC) mi).vectorClock;
00466
00467
00468 Enumeration enumClocks = vectorClock.keys();
00469
00470
00471 while (enumClocks.hasMoreElements()) {
00472 UniqueID id = (UniqueID) (enumClocks.nextElement());
00473 MutableLong ml = (MutableLong) vectorClock.get(id);
00474 ReceptionHistory ih = (ReceptionHistory) (this.histories.get(id));
00475
00476
00477
00478 long lastCommited = ih.getLastCommited();
00479 long index = ml.getValue();
00480 if (lastCommited < index) {
00481 try {
00482 UniversalBody target = this.server.getLocation(id);
00483 HistoryUpdater rh = (HistoryUpdater) (target.receiveFTMessage(new OutputCommit(lastCommited +
00484 1, index)));
00485 ih.updateHistory(rh);
00486 } catch (RemoteException e) {
00487 logger.error("**ERROR** Unable to retreive history of " +
00488 id);
00489 e.printStackTrace();
00490 } catch (IOException e) {
00491 logger.error("**ERROR** Unable to retreive history of " +
00492 id);
00493 e.printStackTrace();
00494 }
00495 }
00496 }
00497
00498
00499
00500
00501 Enumeration allHisto = this.histories.elements();
00502 while (allHisto.hasMoreElements()) {
00503 ReceptionHistory element = (ReceptionHistory) allHisto.nextElement();
00504 element.confirmLastUpdate();
00505 }
00506 }
00507
00511 public void initialize() throws RemoteException {
00512 super.initialize();
00513 this.stateMonitor = new Hashtable();
00514 this.lastGlobalState = 0;
00515 this.greatestCommitedHistory = new Hashtable();
00516 this.recoveryLineMonitor = new Hashtable();
00517 this.recoveryLine = 0;
00518 this.lastRegisteredCkpt = 0;
00519 this.globalIncarnation = 1;
00520 this.histories = new Hashtable();
00521
00522 gc.killMe();
00523 gc = new ActiveQueue("ActiveQueue: GC");
00524 gc.start();
00525 gc.addJob(new GarbageCollectionJob(this, DEFAULT_GC_PERIOD));
00526 }
00527
00531 private static class GarbageCollectionJob implements ActiveQueueJob {
00532
00533 private CheckpointServerCIC server;
00534
00535
00536 private int period;
00537
00538
00539 protected GarbageCollectionJob(CheckpointServerCIC server, int period) {
00540 this.server = server;
00541 this.period = period;
00542 }
00543
00549 public void doTheJob() {
00550 while (true) {
00551 try {
00552 Thread.sleep(period);
00553 CheckpointServerCIC.logger.info(
00554 "[CKPT] Performing Garbage Collection...");
00555 this.garbageCollection();
00556 CheckpointServerCIC.logger.info(
00557 "[CKPT] Garbage Collection done.");
00558 } catch (InterruptedException e) {
00559 e.printStackTrace();
00560 }
00561 }
00562 }
00563
00564
00565 protected void garbageCollection() {
00566 boolean hasGarbaged = false;
00567 synchronized (server) {
00568 int recLine = server.recoveryLine;
00569 Iterator it = server.checkpointStorage.values().iterator();
00570 while (it.hasNext()) {
00571 ArrayList ckpts = ((ArrayList) (it.next()));
00572 for (int i = 0; i < recLine; i++) {
00573 if (ckpts.get(i) != null) {
00574 hasGarbaged = true;
00575 ckpts.remove(i);
00576 ckpts.add(i, null);
00577 }
00578 }
00579 }
00580 }
00581 if (hasGarbaged) {
00582 System.gc();
00583 }
00584 }
00585 }
00586
00587
00588
00589
00590 private static class GSCESender implements ActiveQueueJob {
00591 private FTServer server;
00592 private UniqueID callee;
00593 private GlobalStateCompletion toSend;
00594
00595 public GSCESender(FTServer s, UniqueID c, GlobalStateCompletion ts) {
00596 this.server = s;
00597 this.callee = c;
00598 this.toSend = ts;
00599 }
00600
00601 public void doTheJob() {
00602 try {
00603 UniversalBody destination = server.getLocation(callee);
00604
00605
00606 HistoryUpdater histo = (HistoryUpdater) (destination.receiveFTMessage(toSend));
00607
00608
00609 if (histo != null) {
00610 server.commitHistory(histo);
00611 }
00612 } catch (IOException e) {
00613 try {
00614 server.forceDetection();
00615 } catch (RemoteException e1) {
00616 e1.printStackTrace();
00617 }
00618 }
00619 }
00620 }
00621 }