00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body;
00032
00033 import java.io.IOException;
00034 import java.io.Serializable;
00035
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.ProActiveInternalObject;
00038 import org.objectweb.proactive.core.ProActiveException;
00039 import org.objectweb.proactive.core.ProActiveRuntimeException;
00040 import org.objectweb.proactive.core.UniqueID;
00041 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00042 import org.objectweb.proactive.core.body.future.Future;
00043 import org.objectweb.proactive.core.body.future.FuturePool;
00044 import org.objectweb.proactive.core.body.message.MessageEventProducerImpl;
00045 import org.objectweb.proactive.core.body.reply.Reply;
00046 import org.objectweb.proactive.core.body.reply.ReplyReceiver;
00047 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00048 import org.objectweb.proactive.core.body.request.Request;
00049 import org.objectweb.proactive.core.body.request.RequestFactory;
00050 import org.objectweb.proactive.core.body.request.RequestImpl;
00051 import org.objectweb.proactive.core.body.request.RequestQueue;
00052 import org.objectweb.proactive.core.body.request.RequestReceiver;
00053 import org.objectweb.proactive.core.body.request.ServeException;
00054 import org.objectweb.proactive.core.component.request.ComponentRequestImpl;
00055 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00056 import org.objectweb.proactive.core.event.MessageEvent;
00057 import org.objectweb.proactive.core.event.MessageEventListener;
00058 import org.objectweb.proactive.core.exceptions.body.BodyNonFunctionalException;
00059 import org.objectweb.proactive.core.exceptions.body.SendReplyCommunicationException;
00060 import org.objectweb.proactive.core.exceptions.body.ServiceFailedCalleeNFE;
00061 import org.objectweb.proactive.core.exceptions.manager.NFEManager;
00062 import org.objectweb.proactive.core.exceptions.proxy.ProxyNonFunctionalException;
00063 import org.objectweb.proactive.core.exceptions.proxy.ServiceFailedCallerNFE;
00064 import org.objectweb.proactive.core.mop.MethodCall;
00065 import org.objectweb.proactive.core.util.profiling.PAProfilerEngine;
00066 import org.objectweb.proactive.core.util.profiling.Profiling;
00067 import org.objectweb.proactive.core.util.timer.CompositeAverageMicroTimer;
00068 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00069
00070
00102 public abstract class BodyImpl extends AbstractBody
00103 implements java.io.Serializable {
00104
00105
00106
00107 private static final String INACTIVE_BODY_EXCEPTION_MESSAGE = "Cannot perform this call because this body is inactive";
00108
00109
00110
00111
00112
00114 protected ReplyReceiver replyReceiver;
00115
00117 protected RequestReceiver requestReceiver;
00118 protected MessageEventProducerImpl messageEventProducer;
00119
00121 private CompositeAverageMicroTimer timer;
00122
00123
00124
00125
00126
00131 public BodyImpl() {
00132 }
00133
00141 public BodyImpl(Object reifiedObject, String nodeURL,
00142 MetaObjectFactory factory, String jobId) {
00143 super(reifiedObject, nodeURL, factory, jobId);
00144 this.requestReceiver = factory.newRequestReceiverFactory()
00145 .newRequestReceiver();
00146 this.replyReceiver = factory.newReplyReceiverFactory().newReplyReceiver();
00147 this.messageEventProducer = new MessageEventProducerImpl();
00148 setLocalBodyImpl(new ActiveLocalBodyStrategy(reifiedObject,
00149 factory.newRequestQueueFactory().newRequestQueue(bodyID),
00150 factory.newRequestFactory()));
00151 this.localBodyStrategy.getFuturePool().setOwnerBody(this.getID());
00152 if (Profiling.SERVICE) {
00153 timer = new CompositeAverageMicroTimer("Service");
00154 PAProfilerEngine.registerTimer(timer);
00155 }
00156
00157
00158 String ftstate = ProActiveConfiguration.getFTState();
00159 if ("enable".equals(ftstate)) {
00160
00161 if (!(this.localBodyStrategy.getReifiedObject() instanceof ProActiveInternalObject)) {
00162
00163 if (this.localBodyStrategy.getReifiedObject() instanceof Serializable) {
00164 try {
00165
00166 int protocolSelector = FTManager.getProtoSelector(ProActiveConfiguration.getFTProtocol());
00167 this.ftmanager = factory.newFTManagerFactory()
00168 .newFTManager(protocolSelector);
00169 this.ftmanager.init(this);
00170 if (bodyLogger.isDebugEnabled()) {
00171 bodyLogger.debug("Init FTManager on " +
00172 this.getNodeURL());
00173 }
00174 } catch (ProActiveException e) {
00175 bodyLogger.error(
00176 "**ERROR** Unable to init FTManager. Fault-tolerance is disabled " +
00177 e);
00178 this.ftmanager = null;
00179 }
00180 } else {
00181
00182 bodyLogger.error(
00183 "**ERROR** Activated object is not serializable. Fault-tolerance is disabled");
00184 this.ftmanager = null;
00185 }
00186 }
00187 } else {
00188 this.ftmanager = null;
00189 }
00190 }
00191
00192
00193
00194
00195
00196
00197
00198 public void addMessageEventListener(MessageEventListener listener) {
00199 if (messageEventProducer != null) {
00200 messageEventProducer.addMessageEventListener(listener);
00201 }
00202 }
00203
00204 public void removeMessageEventListener(MessageEventListener listener) {
00205 if (messageEventProducer != null) {
00206 messageEventProducer.removeMessageEventListener(listener);
00207 }
00208 }
00209
00210
00211
00212
00213
00220 protected int internalReceiveRequest(Request request)
00221 throws java.io.IOException, RenegotiateSessionException {
00222 if (messageEventProducer != null) {
00223 messageEventProducer.notifyListeners(request,
00224 MessageEvent.REQUEST_RECEIVED, bodyID,
00225 getRequestQueue().size() + 1);
00226 }
00227
00228
00229
00230 return requestReceiver.receiveRequest(request, this);
00231 }
00232
00238 protected int internalReceiveReply(Reply reply) throws java.io.IOException {
00239
00240 if (messageEventProducer != null) {
00241 messageEventProducer.notifyListeners(reply,
00242 MessageEvent.REPLY_RECEIVED, bodyID);
00243 }
00244
00245
00246
00247
00248
00249
00250 return replyReceiver.receiveReply(reply, this, getFuturePool());
00251 }
00252
00256 protected void activityStopped() {
00257 super.activityStopped();
00258 messageEventProducer = null;
00259 setLocalBodyImpl(new InactiveLocalBodyStrategy());
00260 }
00261
00262
00263
00264
00265
00266 public void setImmediateService(String methodName)
00267 throws java.io.IOException {
00268 this.requestReceiver.setImmediateService(methodName);
00269 }
00270
00271 public void setImmediateService(String methodName, Class[] parametersTypes)
00272 throws IOException {
00273 this.requestReceiver.setImmediateService(methodName, parametersTypes);
00274 }
00275
00276 public void removeImmediateService(String methodName,
00277 Class[] parametersTypes) throws IOException {
00278 this.requestReceiver.removeImmediateService(methodName, parametersTypes);
00279 }
00280
00281 public void updateNodeURL(String newNodeURL) {
00282 this.nodeURL = newNodeURL;
00283 }
00284
00285
00286
00287
00288
00289
00290
00291 private class ActiveLocalBodyStrategy implements LocalBodyStrategy,
00292 java.io.Serializable {
00293
00295 protected FuturePool futures;
00296
00298 protected Object reifiedObject;
00299 protected BlockingRequestQueue requestQueue;
00300 protected RequestFactory internalRequestFactory;
00301 private long absoluteSequenceID;
00302
00303
00304
00305
00306 public ActiveLocalBodyStrategy(Object reifiedObject,
00307 BlockingRequestQueue requestQueue, RequestFactory requestFactory) {
00308 this.reifiedObject = reifiedObject;
00309 this.futures = new FuturePool();
00310 this.requestQueue = requestQueue;
00311 this.internalRequestFactory = requestFactory;
00312 }
00313
00314
00315
00316
00317
00318
00319
00320 public FuturePool getFuturePool() {
00321 return futures;
00322 }
00323
00324 public BlockingRequestQueue getRequestQueue() {
00325 return requestQueue;
00326 }
00327
00328 public Object getReifiedObject() {
00329 return reifiedObject;
00330 }
00331
00332 public String getName() {
00333 return reifiedObject.getClass().getName();
00334 }
00335
00339 public void serve(Request request) {
00340 if (request == null) {
00341 return;
00342 }
00343 try {
00344 messageEventProducer.notifyListeners(request,
00345 MessageEvent.SERVING_STARTED, bodyID,
00346 getRequestQueue().size());
00347 Reply reply = null;
00348 try {
00349 if (Profiling.SERVICE) {
00350 timer.setTimer("serve." + request.getMethodName());
00351 timer.start();
00352 }
00353
00354
00355
00356 if(!isTerminateAORequest(request)) {
00357 reply = request.serve(BodyImpl.this);
00358 }
00359
00360 if (Profiling.SERVICE) {
00361
00362 timer.stop();
00363 }
00364 } catch (ServeException e) {
00365
00366 BodyNonFunctionalException calleeNFE = new ServiceFailedCalleeNFE(
00367 "Exception occured while serving pending request = " +
00368 request.getMethodName(), e, this,
00369 ProActive.getBodyOnThis());
00370 NFEManager.fireNFE(calleeNFE, BodyImpl.this);
00371
00372
00373 ProxyNonFunctionalException callerNFE = new ServiceFailedCallerNFE(
00374 "Exception occured while serving pending request = " +
00375 request.getMethodName(), e);
00376
00377
00378 Reply replyAlternate = null;
00379 replyAlternate = request.serveAlternate(BodyImpl.this,
00380 callerNFE);
00381
00382
00383 if (replyAlternate == null) {
00384 if (!isActive()) {
00385 return;
00386 }
00387 messageEventProducer.notifyListeners(request,
00388 MessageEvent.VOID_REQUEST_SERVED, bodyID,
00389 getRequestQueue().size());
00390 return;
00391 }
00392 UniqueID destinationBodyId = request.getSourceBodyID();
00393 if ((destinationBodyId != null) &&
00394 (messageEventProducer != null)) {
00395 messageEventProducer.notifyListeners(reply,
00396 MessageEvent.REPLY_SENT, destinationBodyId,
00397 getRequestQueue().size());
00398 }
00399 this.getFuturePool().registerDestination(request.getSender());
00400
00401
00402 if (BodyImpl.this.ftmanager != null) {
00403 BodyImpl.this.ftmanager.sendReply(replyAlternate,
00404 request.getSender());
00405 } else {
00406 replyAlternate.send(request.getSender());
00407 }
00408
00409 this.getFuturePool().removeDestination();
00410 return;
00411 }
00412
00413 if (reply == null) {
00414 if (!isActive()) {
00415 return;
00416 }
00417 messageEventProducer.notifyListeners(request,
00418 MessageEvent.VOID_REQUEST_SERVED, bodyID,
00419 getRequestQueue().size());
00420 return;
00421 }
00422 UniqueID destinationBodyId = request.getSourceBodyID();
00423 if ((destinationBodyId != null) &&
00424 (messageEventProducer != null)) {
00425 messageEventProducer.notifyListeners(reply,
00426 MessageEvent.REPLY_SENT, destinationBodyId,
00427 getRequestQueue().size());
00428 }
00429 this.getFuturePool().registerDestination(request.getSender());
00430
00431
00432 if (BodyImpl.this.ftmanager != null) {
00433 BodyImpl.this.ftmanager.sendReply(reply, request.getSender());
00434 } else {
00435 reply.send(request.getSender());
00436 }
00437
00438 this.getFuturePool().removeDestination();
00439 } catch (java.io.IOException e) {
00440
00441 BodyNonFunctionalException nfe = new SendReplyCommunicationException(
00442 "Exception occured in while sending reply to request = " +
00443 request.getMethodName(), e);
00444
00445 NFEManager.fireNFE(nfe, BodyImpl.this);
00446 }
00447 }
00448
00449 public void sendRequest(MethodCall methodCall, Future future,
00450 UniversalBody destinationBody)
00451 throws java.io.IOException, RenegotiateSessionException {
00452 long sequenceID = getNextSequenceID();
00453 Request request = internalRequestFactory.newRequest(methodCall,
00454 BodyImpl.this, future == null, sequenceID);
00455
00456
00457 if (methodCall.getComponentMetadata()!=null) {
00458 request = new ComponentRequestImpl((RequestImpl) request);
00459 }
00460 if (future != null) {
00461 future.setID(sequenceID);
00462 futures.receiveFuture(future);
00463 }
00464 messageEventProducer.notifyListeners(request,
00465 MessageEvent.REQUEST_SENT, destinationBody.getID());
00466
00467
00468 if (BodyImpl.this.ftmanager != null) {
00469 BodyImpl.this.ftmanager.sendRequest(request, destinationBody);
00470 } else {
00471 request.send(destinationBody);
00472 }
00473 }
00474
00479 public synchronized long getNextSequenceID() {
00480 return bodyID.toString().hashCode() + ++absoluteSequenceID;
00481 }
00482
00483
00484
00485
00486
00487
00494 private boolean isTerminateAORequest(Request request) {
00495 boolean terminateRequest = (request.getMethodName()).startsWith("_terminateAO");
00496 if (terminateRequest) {
00497 terminate();
00498 }
00499 return terminateRequest;
00500 }
00501 }
00502
00503
00504 private class InactiveLocalBodyStrategy implements LocalBodyStrategy,
00505 java.io.Serializable {
00506
00507
00508
00509 public InactiveLocalBodyStrategy() {
00510 }
00511
00512
00513
00514
00515
00516
00517
00518 public FuturePool getFuturePool() {
00519
00520 return null;
00521 }
00522
00523 public BlockingRequestQueue getRequestQueue() {
00524 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00525 }
00526
00527 public RequestQueue getHighPriorityRequestQueue() {
00528 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00529 }
00530
00531 public Object getReifiedObject() {
00532 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00533 }
00534
00535 public String getName() {
00536 return "inactive body";
00537 }
00538
00539 public void serve(Request request) {
00540 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00541 }
00542
00543 public void sendRequest(MethodCall methodCall, Future future,
00544 UniversalBody destinationBody) throws java.io.IOException {
00545 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00546 }
00547
00548
00549
00550
00551 public long getNextSequenceID() {
00552 return 0;
00553 }
00554
00555
00556 }
00557
00558
00559 }