00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body;
00032
00033 import org.objectweb.proactive.core.ProActiveException;
00034 import org.objectweb.proactive.core.ProActiveRuntimeException;
00035 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00036 import org.objectweb.proactive.core.body.future.Future;
00037 import org.objectweb.proactive.core.body.future.FuturePool;
00038 import org.objectweb.proactive.core.body.reply.Reply;
00039 import org.objectweb.proactive.core.body.reply.ReplyReceiver;
00040 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00041 import org.objectweb.proactive.core.body.request.Request;
00042 import org.objectweb.proactive.core.body.request.RequestFactory;
00043 import org.objectweb.proactive.core.body.request.RequestImpl;
00044 import org.objectweb.proactive.core.body.request.RequestQueue;
00045 import org.objectweb.proactive.core.component.request.ComponentRequestImpl;
00046 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00047 import org.objectweb.proactive.core.event.MessageEventListener;
00048 import org.objectweb.proactive.core.exceptions.NonFunctionalException;
00049 import org.objectweb.proactive.core.exceptions.manager.NFEListener;
00050 import org.objectweb.proactive.core.exceptions.manager.NFEListenerList;
00051 import org.objectweb.proactive.core.mop.MethodCall;
00052 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00053 import org.objectweb.proactive.core.util.log.Loggers;
00054 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00055 import org.objectweb.proactive.ext.security.InternalBodySecurity;
00056 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00057
00058
00059 public class HalfBody extends AbstractBody {
00060
00061
00062
00063 private static final String HALF_BODY_EXCEPTION_MESSAGE = "This method is not implemented in class HalfBody.";
00064 private static final String NAME = "Other thread";
00065
00067 private ReplyReceiver replyReceiver;
00068
00069 public synchronized static HalfBody getHalfBody(MetaObjectFactory factory) {
00070 return new HalfBody(factory);
00071 }
00072
00073
00074
00075
00076 private HalfBody(MetaObjectFactory factory) {
00077 super(new Object(), "LOCAL", factory, getRuntimeJobID());
00078
00079
00080 if (psm != null) {
00081 psm = psm.generateSiblingCertificate("HalfBody");
00082 psm.setBody(this);
00083 isSecurityOn = psm.getCertificate() != null;
00084 internalBodySecurity = new InternalBodySecurity(null);
00085 ProActiveLogger.getLogger(Loggers.SECURITY_MANAGER).debug(" ------> HalfBody Security is " +
00086 isSecurityOn);
00087 }
00088
00089 this.replyReceiver = factory.newReplyReceiverFactory().newReplyReceiver();
00090 setLocalBodyImpl(new HalfLocalBodyStrategy(factory.newRequestFactory()));
00091 this.localBodyStrategy.getFuturePool().setOwnerBody(this.getID());
00092
00093
00094 String ftstate = ProActiveConfiguration.getFTState();
00095 if ("enable".equals(ftstate)) {
00096 try {
00097
00098 int protocolSelector = FTManager.getProtoSelector(ProActiveConfiguration.getFTProtocol());
00099 this.ftmanager = factory.newFTManagerFactory().newHalfFTManager(protocolSelector);
00100 this.ftmanager.init(this);
00101 if (bodyLogger.isDebugEnabled()) {
00102 bodyLogger.debug("Init FTManager on " + this.getNodeURL());
00103 }
00104 } catch (ProActiveException e) {
00105 bodyLogger.error(
00106 "**ERROR** Unable to init FTManager. Fault-tolerance is disabled " +
00107 e);
00108 this.ftmanager = null;
00109 }
00110 } else {
00111 this.ftmanager = null;
00112 }
00113 }
00114
00115
00116
00117
00118
00119
00120
00121 public void addMessageEventListener(MessageEventListener listener) {
00122 }
00123
00124 public void removeMessageEventListener(MessageEventListener listener) {
00125 }
00126
00127
00128
00129
00130
00137 protected int internalReceiveRequest(Request request)
00138 throws java.io.IOException {
00139 throw new ProActiveRuntimeException(
00140 "The method 'receiveRequest' is not implemented in class HalfBody.");
00141 }
00142
00148 protected int internalReceiveReply(Reply reply) throws java.io.IOException {
00149 try {
00150 if (reply.isCiphered()) {
00151 reply.decrypt(psm);
00152 }
00153 } catch (Exception e) {
00154 e.printStackTrace();
00155 }
00156
00157 return replyReceiver.receiveReply(reply, this, getFuturePool());
00158 }
00159
00160 public void setImmediateService(String methodName) {
00161 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00162 }
00163
00164 public void setImmediateService(String methodName, Class[] parametersTypes) {
00165 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00166 }
00167
00168 public void removeImmediateService(String methodName,
00169 Class[] parametersTypes) {
00170 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00171 }
00172
00176 public String getJobID() {
00177 return getRuntimeJobID();
00178 }
00179
00180 private static String getRuntimeJobID() {
00181 return ProActiveRuntimeImpl.getProActiveRuntime().getJobID();
00182 }
00183
00184 public void updateNodeURL(String newNodeURL) {
00185 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00186 }
00187
00188
00189
00190
00191 private class HalfLocalBodyStrategy implements LocalBodyStrategy,
00192 java.io.Serializable {
00193
00195 protected FuturePool futures;
00196 protected RequestFactory internalRequestFactory;
00197 private long absoluteSequenceID;
00198
00199
00200
00201
00202 public HalfLocalBodyStrategy(RequestFactory requestFactory) {
00203 this.futures = new FuturePool();
00204 this.internalRequestFactory = requestFactory;
00205 }
00206
00207
00208
00209
00210
00211
00212
00213 public FuturePool getFuturePool() {
00214 return futures;
00215 }
00216
00217 public BlockingRequestQueue getRequestQueue() {
00218 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00219 }
00220
00221 public RequestQueue getHighPriorityRequestQueue() {
00222 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00223 }
00224
00225 public Object getReifiedObject() {
00226 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00227 }
00228
00229 public String getName() {
00230 return NAME;
00231 }
00232
00233 public void serve(Request request) {
00234 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00235 }
00236
00237 public void sendRequest(MethodCall methodCall, Future future,
00238 UniversalBody destinationBody)
00239 throws java.io.IOException, RenegotiateSessionException {
00240 long sequenceID = getNextSequenceID();
00241 Request request = internalRequestFactory.newRequest(methodCall,
00242 HalfBody.this, future == null, sequenceID);
00243
00244
00245 if (methodCall.getComponentMetadata() != null ) {
00246 request = new ComponentRequestImpl((RequestImpl) request);
00247 }
00248 if (future != null) {
00249 future.setID(sequenceID);
00250 futures.receiveFuture(future);
00251 }
00252
00253
00254
00255 if (HalfBody.this.ftmanager != null) {
00256 HalfBody.this.ftmanager.sendRequest(request, destinationBody);
00257 } else {
00258 request.send(destinationBody);
00259 }
00260 }
00261
00262
00263
00264
00265
00270 public synchronized long getNextSequenceID() {
00271 return bodyID.toString().hashCode() + ++absoluteSequenceID;
00272 }
00273 }
00274
00275
00276
00277 private NFEListenerList nfeListeners = null;
00278
00279 public void addNFEListener(NFEListener listener) {
00280 if (nfeListeners == null) {
00281 nfeListeners = new NFEListenerList();
00282 }
00283 nfeListeners.addNFEListener(listener);
00284 }
00285
00286 public void removeNFEListener(NFEListener listener) {
00287 if (nfeListeners != null) {
00288 nfeListeners.removeNFEListener(listener);
00289 }
00290 }
00291
00292 public int fireNFE(NonFunctionalException e) {
00293 if (nfeListeners != null) {
00294 return nfeListeners.fireNFE(e);
00295 }
00296 return 0;
00297 }
00298
00299 public long getNextSequenceID() {
00300 return localBodyStrategy.getNextSequenceID();
00301 }
00302 }