00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.ft.protocols;
00032
00033 import java.io.IOException;
00034 import java.net.MalformedURLException;
00035 import java.rmi.Naming;
00036 import java.rmi.NotBoundException;
00037 import java.rmi.RemoteException;
00038
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.core.ProActiveException;
00041 import org.objectweb.proactive.core.UniqueID;
00042 import org.objectweb.proactive.core.body.AbstractBody;
00043 import org.objectweb.proactive.core.body.UniversalBody;
00044 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00045 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00046 import org.objectweb.proactive.core.body.ft.internalmsg.Heartbeat;
00047 import org.objectweb.proactive.core.body.ft.servers.faultdetection.FaultDetector;
00048 import org.objectweb.proactive.core.body.ft.servers.location.LocationServer;
00049 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00050 import org.objectweb.proactive.core.body.ft.servers.storage.CheckpointServer;
00051 import org.objectweb.proactive.core.body.reply.Reply;
00052 import org.objectweb.proactive.core.body.request.Request;
00053 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00054 import org.objectweb.proactive.core.util.log.Loggers;
00055 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00056 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00057
00058
00064 public abstract class FTManager implements java.io.Serializable {
00065
00066 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE);
00067
00069 public static final int NON_FT = -30;
00070
00072 public static final int DEFAULT_TTC_VALUE = 30000;
00073
00075 public static final int IMMEDIATE_SERVICE = -1;
00076
00078 public static final int ORPHAN_REPLY = -2;
00079
00081 public static final long TIME_TO_RESEND = 3000;
00082
00084 public static final String HALF_BODY_EXCEPTION_MESSAGE = "Cannot perform this call on a FTManager of a HalfBody";
00085
00086
00087 private boolean isACheckpoint;
00088
00089
00090 protected AbstractBody owner;
00091 protected UniqueID ownerID;
00092
00093
00094 protected CheckpointServer storage;
00095 protected LocationServer location;
00096 protected RecoveryProcess recovery;
00097
00098
00099 protected String additionalCodebase;
00100
00101
00102 protected int ttc;
00103
00109 public static int getProtoSelector(String protoName) {
00110 if ("cic".equals(protoName)) {
00111 return FTManagerFactory.PROTO_CIC;
00112 } else if ("pml".equals(protoName)) {
00113 return FTManagerFactory.PROTO_PML;
00114 }
00115 return 0;
00116 }
00117
00125 public int init(AbstractBody owner) throws ProActiveException {
00126 this.owner = owner;
00127 this.ownerID = owner.getID();
00128 try {
00129 String ttcValue = ProActiveConfiguration.getTTCValue();
00130 if (ttcValue != null) {
00131 this.ttc = Integer.parseInt(ttcValue) * 1000;
00132 } else {
00133 this.ttc = FTManager.DEFAULT_TTC_VALUE;
00134 }
00135 String urlGlobal = ProActiveConfiguration.getGlobalFTServer();
00136 if (urlGlobal != null) {
00137 this.storage = (CheckpointServer) (Naming.lookup(urlGlobal));
00138 this.location = (LocationServer) (Naming.lookup(urlGlobal));
00139 this.recovery = (RecoveryProcess) (Naming.lookup(urlGlobal));
00140 } else {
00141 String urlCheckpoint = ProActiveConfiguration.getCheckpointServer();
00142 String urlRecovery = ProActiveConfiguration.getRecoveryServer();
00143 String urlLocation = ProActiveConfiguration.getLocationServer();
00144 if ((urlCheckpoint != null) && (urlRecovery != null) &&
00145 (urlLocation != null)) {
00146 this.storage = (CheckpointServer) (Naming.lookup(urlCheckpoint));
00147 this.location = (LocationServer) (Naming.lookup(urlLocation));
00148 this.recovery = (RecoveryProcess) (Naming.lookup(urlRecovery));
00149 } else {
00150 throw new ProActiveException(
00151 "Unable to init FTManager : servers are not correctly set");
00152 }
00153 }
00154
00155
00156
00157 this.additionalCodebase = this.storage.getServerCodebase();
00158
00159
00160 try {
00161 this.recovery.register(ownerID);
00162 this.location.updateLocation(ownerID, owner.getRemoteAdapter());
00163 } catch (RemoteException e) {
00164 logger.error("**ERROR** Unable to register in location server");
00165 throw new ProActiveException("Unable to register in location server",
00166 e);
00167 }
00168 } catch (MalformedURLException e) {
00169 throw new ProActiveException("Unable to init FTManager : FT is disable.",
00170 e);
00171 } catch (RemoteException e) {
00172 throw new ProActiveException("Unable to init FTManager : FT is disable.",
00173 e);
00174 } catch (NotBoundException e) {
00175 throw new ProActiveException("Unable to init FTManager : FT is disable.",
00176 e);
00177 }
00178 return 0;
00179 }
00180
00185 public void termination() throws ProActiveException {
00186 try {
00187 this.recovery.unregister(this.ownerID);
00188 } catch (RemoteException e) {
00189 logger.error("**ERROR** Unable to register in location server");
00190 throw new ProActiveException("Unable to unregister in location server",
00191 e);
00192 }
00193 }
00194
00201 public boolean isACheckpoint() {
00202 return isACheckpoint;
00203 }
00204
00209 public void setCheckpointTag(boolean tag) {
00210 this.isACheckpoint = tag;
00211 }
00212
00221 public UniversalBody communicationFailed(UniqueID suspect,
00222 UniversalBody suspectLocation, Exception e) {
00223 try {
00224
00225 UniversalBody newLocation = this.location.searchObject(suspect,
00226 suspectLocation.getRemoteAdapter(), this.ownerID);
00227 if (newLocation == null) {
00228 while (newLocation == null) {
00229 try {
00230
00231 if (logger.isDebugEnabled()) {
00232 logger.debug("[CIC] Waiting for recovery of " +
00233 suspect);
00234 }
00235 Thread.sleep(TIME_TO_RESEND);
00236 } catch (InterruptedException e2) {
00237 e2.printStackTrace();
00238 }
00239 newLocation = this.location.searchObject(suspect,
00240 suspectLocation.getRemoteAdapter(), this.ownerID);
00241 }
00242 return newLocation;
00243 } else {
00244
00245 return newLocation;
00246 }
00247 } catch (RemoteException e1) {
00248 logger.error("**ERROR** Location server unreachable");
00249 e1.printStackTrace();
00250 return null;
00251 }
00252 }
00253
00261 public int sendReply(Reply r, UniversalBody destination) {
00262 try {
00263 this.onSendReplyBefore(r);
00264 int res = r.send(destination);
00265 this.onSendReplyAfter(r, res, destination);
00266 return res;
00267 } catch (IOException e) {
00268 logger.info("[FAULT] " + this.ownerID + " : FAILURE OF " +
00269 destination.getID() + " SUSPECTED ON REPLY SENDING : " +
00270 e.getMessage());
00271 UniversalBody newDestination = this.communicationFailed(destination.getID(),
00272 destination, e);
00273 return this.sendReply(r, newDestination);
00274 }
00275 }
00276
00285 public int sendRequest(Request r, UniversalBody destination)
00286 throws RenegotiateSessionException {
00287 try {
00288 this.onSendRequestBefore(r);
00289 int res = r.send(destination);
00290 this.onSendRequestAfter(r, res, destination);
00291 return res;
00292 } catch (IOException e) {
00293 logger.info("[FAULT] " + this.ownerID + " : FAILURE OF " +
00294 destination.getID() + " SUSPECTED ON REQUEST SENDING : " +
00295 e.getMessage());
00296 UniversalBody newDestination = this.communicationFailed(destination.getID(),
00297 destination, e);
00298 return this.sendRequest(r, newDestination);
00299 } catch (RenegotiateSessionException e1) {
00300 throw e1;
00301 }
00302 }
00303
00309 public Object handleHBEvent(Heartbeat fte) {
00310 if (this.owner.isAlive()) {
00311 return FaultDetector.OK;
00312 } else {
00313 return FaultDetector.IS_DEAD;
00314 }
00315 }
00316
00318
00320
00325 public abstract int onReceiveReply(Reply reply);
00326
00331 public abstract int onReceiveRequest(Request request);
00332
00337 public abstract int onDeliverReply(Reply reply);
00338
00343 public abstract int onDeliverRequest(Request request);
00344
00349 public abstract int onSendReplyBefore(Reply reply);
00350
00358 public abstract int onSendReplyAfter(Reply reply, int rdvValue,
00359 UniversalBody destination);
00360
00366 public abstract int onSendRequestBefore(Request request);
00367
00376 public abstract int onSendRequestAfter(Request request, int rdvValue,
00377 UniversalBody destination) throws RenegotiateSessionException;
00378
00384 public abstract int onServeRequestBefore(Request request);
00385
00391 public abstract int onServeRequestAfter(Request request);
00392
00400 public abstract int beforeRestartAfterRecovery(CheckpointInfo ci, int inc);
00401
00407 public abstract Object handleFTMessage(FTMessage fte);
00408
00414 public void updateLocationAtServer(UniqueID ownerID, UniversalBody remoteBodyAdapter) {}
00415 }