00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.ft.protocols.cic.managers;
00032
00033 import java.net.MalformedURLException;
00034 import java.rmi.Naming;
00035 import java.rmi.NotBoundException;
00036 import java.rmi.RemoteException;
00037
00038 import org.apache.log4j.Logger;
00039 import org.objectweb.proactive.core.ProActiveException;
00040 import org.objectweb.proactive.core.ProActiveRuntimeException;
00041 import org.objectweb.proactive.core.body.AbstractBody;
00042 import org.objectweb.proactive.core.body.UniversalBody;
00043 import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
00044 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00045 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00046 import org.objectweb.proactive.core.body.ft.protocols.cic.infos.MessageInfoCIC;
00047 import org.objectweb.proactive.core.body.ft.servers.location.LocationServer;
00048 import org.objectweb.proactive.core.body.reply.Reply;
00049 import org.objectweb.proactive.core.body.request.Request;
00050 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00051 import org.objectweb.proactive.core.util.log.Loggers;
00052 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00053 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00054
00055
00062 public class HalfFTManagerCIC extends FTManager {
00063
00064 private MessageInfoCIC forSentMessage;
00065
00066
00067 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_PML);
00068
00072 public int init(AbstractBody owner) throws ProActiveException {
00073
00074
00075 try {
00076 String urlGlobal = ProActiveConfiguration.getGlobalFTServer();
00077 if (urlGlobal != null) {
00078 this.location = (LocationServer) (Naming.lookup(urlGlobal));
00079 } else {
00080 String urlLocation = ProActiveConfiguration.getLocationServer();
00081 if (urlLocation != null) {
00082 this.location = (LocationServer) (Naming.lookup(urlLocation));
00083 } else {
00084 throw new ProActiveException(
00085 "Unable to init HalfFTManager : servers are not correctly set");
00086 }
00087 }
00088 this.storage = null;
00089 this.recovery = null;
00090 } catch (MalformedURLException e) {
00091 throw new ProActiveException("Unable to init HalfFTManager : FT is disable.",
00092 e);
00093 } catch (RemoteException e) {
00094 throw new ProActiveException("Unable to init HalfFTManager : FT is disable.",
00095 e);
00096 } catch (NotBoundException e) {
00097 throw new ProActiveException("Unable to init HalfFTManager : FT is disable.",
00098 e);
00099 }
00100 this.forSentMessage = new MessageInfoCIC();
00101 this.forSentMessage.fromHalfBody = true;
00102 logger.info(" CIC fault-tolerance is enabled for half body " +
00103 this.ownerID);
00104 return 0;
00105 }
00106
00107 public int onReceiveReply(Reply reply) {
00108 reply.setFTManager(this);
00109 return 0;
00110 }
00111
00112 public int onDeliverReply(Reply reply) {
00113 return FTManager.NON_FT;
00114 }
00115
00116 public int onSendRequestBefore(Request request) {
00117 request.setMessageInfo(this.forSentMessage);
00118 return 0;
00119 }
00120
00121 public int onSendReplyBefore(Reply reply) {
00122 reply.setMessageInfo(this.forSentMessage);
00123 return 0;
00124 }
00125
00126 public int onSendRequestAfter(Request request, int rdvValue,
00127 UniversalBody destination) throws RenegotiateSessionException {
00128 if (rdvValue == FTManagerCIC.RESEND_MESSAGE) {
00129 try {
00130 request.resetSendCounter();
00131 request.setIgnoreIt(false);
00132 Thread.sleep(FTManagerCIC.TIME_TO_RESEND);
00133 int rdvValueBis = sendRequest(request, destination);
00134 return this.onSendRequestAfter(request, rdvValueBis, destination);
00135 } catch (RenegotiateSessionException e1) {
00136 throw e1;
00137 } catch (InterruptedException e) {
00138 e.printStackTrace();
00139 }
00140 }
00141 return 0;
00142 }
00143
00144 public synchronized int onSendReplyAfter(Reply reply, int rdvValue,
00145 UniversalBody destination) {
00146 if (rdvValue == FTManagerCIC.RESEND_MESSAGE) {
00147 try {
00148 reply.setIgnoreIt(false);
00149 Thread.sleep(FTManagerCIC.TIME_TO_RESEND);
00150 int rdvValueBis = sendReply(reply, destination);
00151 return this.onSendReplyAfter(reply, rdvValueBis, destination);
00152 } catch (InterruptedException e) {
00153 e.printStackTrace();
00154 }
00155 }
00156 return 0;
00157 }
00158
00160
00162 public int onReceiveRequest(Request request) {
00163 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00164 }
00165
00166 public int onDeliverRequest(Request request) {
00167 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00168 }
00169
00170 public int onServeRequestBefore(Request request) {
00171 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00172 }
00173
00174 public int onServeRequestAfter(Request request) {
00175 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00176 }
00177
00178 public int beforeRestartAfterRecovery(CheckpointInfo ci, int inc) {
00179 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00180 }
00181
00182 public int getIncarnation() {
00183 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00184 }
00185
00186 public Object handleFTMessage(FTMessage fte) {
00187 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00188 }
00189 }