org/objectweb/proactive/core/body/ft/protocols/FTManager.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.ft.protocols;
00032 
00033 import java.io.IOException;
00034 import java.net.MalformedURLException;
00035 import java.rmi.Naming;
00036 import java.rmi.NotBoundException;
00037 import java.rmi.RemoteException;
00038 
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.core.ProActiveException;
00041 import org.objectweb.proactive.core.UniqueID;
00042 import org.objectweb.proactive.core.body.AbstractBody;
00043 import org.objectweb.proactive.core.body.UniversalBody;
00044 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00045 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00046 import org.objectweb.proactive.core.body.ft.internalmsg.Heartbeat;
00047 import org.objectweb.proactive.core.body.ft.servers.faultdetection.FaultDetector;
00048 import org.objectweb.proactive.core.body.ft.servers.location.LocationServer;
00049 import org.objectweb.proactive.core.body.ft.servers.recovery.RecoveryProcess;
00050 import org.objectweb.proactive.core.body.ft.servers.storage.CheckpointServer;
00051 import org.objectweb.proactive.core.body.reply.Reply;
00052 import org.objectweb.proactive.core.body.request.Request;
00053 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00054 import org.objectweb.proactive.core.util.log.Loggers;
00055 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00056 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00057 
00058 
00064 public abstract class FTManager implements java.io.Serializable {
00065     //logger
00066     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE);
00067 
00069     public static final int NON_FT = -30;
00070 
00072     public static final int DEFAULT_TTC_VALUE = 30000;
00073 
00075     public static final int IMMEDIATE_SERVICE = -1;
00076 
00078     public static final int ORPHAN_REPLY = -2;
00079 
00081     public static final long TIME_TO_RESEND = 3000;
00082 
00084     public static final String HALF_BODY_EXCEPTION_MESSAGE = "Cannot perform this call on a FTManager of a HalfBody";
00085 
00086     // true is this is a checkpoint
00087     private boolean isACheckpoint;
00088 
00089     // body attached to this manager
00090     protected AbstractBody owner;
00091     protected UniqueID ownerID;
00092 
00093     // server adresses
00094     protected CheckpointServer storage;
00095     protected LocationServer location;
00096     protected RecoveryProcess recovery;
00097 
00098     // additional codebase for checkpoints
00099     protected String additionalCodebase;
00100 
00101     // checkpoint interval (ms)
00102     protected int ttc;
00103 
00109     public static int getProtoSelector(String protoName) {
00110         if ("cic".equals(protoName)) {
00111             return FTManagerFactory.PROTO_CIC;
00112         } else if ("pml".equals(protoName)) {
00113             return FTManagerFactory.PROTO_PML;
00114         }
00115         return 0;
00116     }
00117 
00125     public int init(AbstractBody owner) throws ProActiveException {
00126         this.owner = owner;
00127         this.ownerID = owner.getID();
00128         try {
00129             String ttcValue = ProActiveConfiguration.getTTCValue();
00130             if (ttcValue != null) {
00131                 this.ttc = Integer.parseInt(ttcValue) * 1000;
00132             } else {
00133                 this.ttc = FTManager.DEFAULT_TTC_VALUE;
00134             }
00135             String urlGlobal = ProActiveConfiguration.getGlobalFTServer();
00136             if (urlGlobal != null) {
00137                 this.storage = (CheckpointServer) (Naming.lookup(urlGlobal));
00138                 this.location = (LocationServer) (Naming.lookup(urlGlobal));
00139                 this.recovery = (RecoveryProcess) (Naming.lookup(urlGlobal));
00140             } else {
00141                 String urlCheckpoint = ProActiveConfiguration.getCheckpointServer();
00142                 String urlRecovery = ProActiveConfiguration.getRecoveryServer();
00143                 String urlLocation = ProActiveConfiguration.getLocationServer();
00144                 if ((urlCheckpoint != null) && (urlRecovery != null) &&
00145                         (urlLocation != null)) {
00146                     this.storage = (CheckpointServer) (Naming.lookup(urlCheckpoint));
00147                     this.location = (LocationServer) (Naming.lookup(urlLocation));
00148                     this.recovery = (RecoveryProcess) (Naming.lookup(urlRecovery));
00149                 } else {
00150                     throw new ProActiveException(
00151                         "Unable to init FTManager : servers are not correctly set");
00152                 }
00153             }
00154 
00155             // the additional codebase is added to normal codebase 
00156             // ONLY during serialization for checkpoint !
00157             this.additionalCodebase = this.storage.getServerCodebase();
00158 
00159             // registration in the recovery process and in the localisation server
00160             try {
00161                 this.recovery.register(ownerID);
00162                 this.location.updateLocation(ownerID, owner.getRemoteAdapter());
00163             } catch (RemoteException e) {
00164                 logger.error("**ERROR** Unable to register in location server");
00165                 throw new ProActiveException("Unable to register in location server",
00166                     e);
00167             }
00168         } catch (MalformedURLException e) {
00169             throw new ProActiveException("Unable to init FTManager : FT is disable.",
00170                 e);
00171         } catch (RemoteException e) {
00172             throw new ProActiveException("Unable to init FTManager : FT is disable.",
00173                 e);
00174         } catch (NotBoundException e) {
00175             throw new ProActiveException("Unable to init FTManager : FT is disable.",
00176                 e);
00177         }
00178         return 0;
00179     }
00180 
00185     public void termination() throws ProActiveException {
00186         try {
00187             this.recovery.unregister(this.ownerID);
00188         } catch (RemoteException e) {
00189             logger.error("**ERROR** Unable to register in location server");
00190             throw new ProActiveException("Unable to unregister in location server",
00191                 e);
00192         }
00193     }
00194 
00201     public boolean isACheckpoint() {
00202         return isACheckpoint;
00203     }
00204 
00209     public void setCheckpointTag(boolean tag) {
00210         this.isACheckpoint = tag;
00211     }
00212 
00221     public UniversalBody communicationFailed(UniqueID suspect,
00222         UniversalBody suspectLocation, Exception e) {
00223         try {
00224             // send an adapter to suspectLocation: the suspected body could be local
00225             UniversalBody newLocation = this.location.searchObject(suspect,
00226                     suspectLocation.getRemoteAdapter(), this.ownerID);
00227             if (newLocation == null) {
00228                 while (newLocation == null) {
00229                     try {
00230                         // suspected is failed or is recovering
00231                         if (logger.isDebugEnabled()) {
00232                             logger.debug("[CIC] Waiting for recovery of " +
00233                                 suspect);
00234                         }
00235                         Thread.sleep(TIME_TO_RESEND);
00236                     } catch (InterruptedException e2) {
00237                         e2.printStackTrace();
00238                     }
00239                     newLocation = this.location.searchObject(suspect,
00240                             suspectLocation.getRemoteAdapter(), this.ownerID);
00241                 }
00242                 return newLocation;
00243             } else {
00244                 // newLocation is the new location of suspect
00245                 return newLocation;
00246             }
00247         } catch (RemoteException e1) {
00248             logger.error("**ERROR** Location server unreachable");
00249             e1.printStackTrace();
00250             return null;
00251         }
00252     }
00253 
00261     public int sendReply(Reply r, UniversalBody destination) {
00262         try {
00263             this.onSendReplyBefore(r);
00264             int res = r.send(destination);
00265             this.onSendReplyAfter(r, res, destination);
00266             return res;
00267         } catch (IOException e) {
00268             logger.info("[FAULT] " + this.ownerID + " : FAILURE OF " +
00269                 destination.getID() + " SUSPECTED ON REPLY SENDING : " +
00270                 e.getMessage());
00271             UniversalBody newDestination = this.communicationFailed(destination.getID(),
00272                     destination, e);
00273             return this.sendReply(r, newDestination);
00274         }
00275     }
00276 
00285     public int sendRequest(Request r, UniversalBody destination)
00286         throws RenegotiateSessionException {
00287         try {
00288             this.onSendRequestBefore(r);
00289             int res = r.send(destination);
00290             this.onSendRequestAfter(r, res, destination);
00291             return res;
00292         } catch (IOException e) {
00293             logger.info("[FAULT] " + this.ownerID + " : FAILURE OF " +
00294                 destination.getID() + " SUSPECTED ON REQUEST SENDING : " +
00295                 e.getMessage());
00296             UniversalBody newDestination = this.communicationFailed(destination.getID(),
00297                     destination, e);
00298             return this.sendRequest(r, newDestination);
00299         } catch (RenegotiateSessionException e1) {
00300             throw e1;
00301         }
00302     }
00303 
00309     public Object handleHBEvent(Heartbeat fte) {
00310         if (this.owner.isAlive()) {
00311             return FaultDetector.OK;
00312         } else {
00313             return FaultDetector.IS_DEAD;
00314         }
00315     }
00316 
00318     // ABSTRACT METHODS //
00320 
00325     public abstract int onReceiveReply(Reply reply);
00326 
00331     public abstract int onReceiveRequest(Request request);
00332 
00337     public abstract int onDeliverReply(Reply reply);
00338 
00343     public abstract int onDeliverRequest(Request request);
00344 
00349     public abstract int onSendReplyBefore(Reply reply);
00350 
00358     public abstract int onSendReplyAfter(Reply reply, int rdvValue,
00359         UniversalBody destination);
00360 
00366     public abstract int onSendRequestBefore(Request request);
00367 
00376     public abstract int onSendRequestAfter(Request request, int rdvValue,
00377         UniversalBody destination) throws RenegotiateSessionException;
00378 
00384     public abstract int onServeRequestBefore(Request request);
00385 
00391     public abstract int onServeRequestAfter(Request request);
00392 
00400     public abstract int beforeRestartAfterRecovery(CheckpointInfo ci, int inc);
00401 
00407     public abstract Object handleFTMessage(FTMessage fte);
00408     
00414     public void updateLocationAtServer(UniqueID ownerID, UniversalBody remoteBodyAdapter) {}
00415 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1