org/objectweb/proactive/core/body/BodyImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body;
00032 
00033 import java.io.IOException;
00034 import java.io.Serializable;
00035 
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.ProActiveInternalObject;
00038 import org.objectweb.proactive.core.ProActiveException;
00039 import org.objectweb.proactive.core.ProActiveRuntimeException;
00040 import org.objectweb.proactive.core.UniqueID;
00041 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00042 import org.objectweb.proactive.core.body.future.Future;
00043 import org.objectweb.proactive.core.body.future.FuturePool;
00044 import org.objectweb.proactive.core.body.message.MessageEventProducerImpl;
00045 import org.objectweb.proactive.core.body.reply.Reply;
00046 import org.objectweb.proactive.core.body.reply.ReplyReceiver;
00047 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00048 import org.objectweb.proactive.core.body.request.Request;
00049 import org.objectweb.proactive.core.body.request.RequestFactory;
00050 import org.objectweb.proactive.core.body.request.RequestImpl;
00051 import org.objectweb.proactive.core.body.request.RequestQueue;
00052 import org.objectweb.proactive.core.body.request.RequestReceiver;
00053 import org.objectweb.proactive.core.body.request.ServeException;
00054 import org.objectweb.proactive.core.component.request.ComponentRequestImpl;
00055 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00056 import org.objectweb.proactive.core.event.MessageEvent;
00057 import org.objectweb.proactive.core.event.MessageEventListener;
00058 import org.objectweb.proactive.core.exceptions.body.BodyNonFunctionalException;
00059 import org.objectweb.proactive.core.exceptions.body.SendReplyCommunicationException;
00060 import org.objectweb.proactive.core.exceptions.body.ServiceFailedCalleeNFE;
00061 import org.objectweb.proactive.core.exceptions.manager.NFEManager;
00062 import org.objectweb.proactive.core.exceptions.proxy.ProxyNonFunctionalException;
00063 import org.objectweb.proactive.core.exceptions.proxy.ServiceFailedCallerNFE;
00064 import org.objectweb.proactive.core.mop.MethodCall;
00065 import org.objectweb.proactive.core.util.profiling.PAProfilerEngine;
00066 import org.objectweb.proactive.core.util.profiling.Profiling;
00067 import org.objectweb.proactive.core.util.timer.CompositeAverageMicroTimer;
00068 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00069 
00070 
00102 public abstract class BodyImpl extends AbstractBody
00103     implements java.io.Serializable {
00104     //  
00105     // -- STATIC MEMBERS -----------------------------------------------
00106     //
00107     private static final String INACTIVE_BODY_EXCEPTION_MESSAGE = "Cannot perform this call because this body is inactive";
00108 
00109     //
00110     // -- PROTECTED MEMBERS -----------------------------------------------
00111     //
00112 
00114     protected ReplyReceiver replyReceiver;
00115 
00117     protected RequestReceiver requestReceiver;
00118     protected MessageEventProducerImpl messageEventProducer;
00119 
00121     private CompositeAverageMicroTimer timer;
00122 
00123     //
00124     // -- CONSTRUCTORS -----------------------------------------------
00125     //
00126 
00131     public BodyImpl() {
00132     }
00133 
00141     public BodyImpl(Object reifiedObject, String nodeURL,
00142         MetaObjectFactory factory, String jobId) {
00143         super(reifiedObject, nodeURL, factory, jobId);
00144         this.requestReceiver = factory.newRequestReceiverFactory()
00145                                       .newRequestReceiver();
00146         this.replyReceiver = factory.newReplyReceiverFactory().newReplyReceiver();
00147         this.messageEventProducer = new MessageEventProducerImpl();
00148         setLocalBodyImpl(new ActiveLocalBodyStrategy(reifiedObject,
00149                 factory.newRequestQueueFactory().newRequestQueue(bodyID),
00150                 factory.newRequestFactory()));
00151         this.localBodyStrategy.getFuturePool().setOwnerBody(this.getID());
00152         if (Profiling.SERVICE) {
00153             timer = new CompositeAverageMicroTimer("Service");
00154             PAProfilerEngine.registerTimer(timer);
00155         }
00156 
00157         // FAULT TOLERANCE
00158         String ftstate = ProActiveConfiguration.getFTState();
00159         if ("enable".equals(ftstate)) {
00160             // if the object is a ProActive internal object, FT is disabled
00161             if (!(this.localBodyStrategy.getReifiedObject() instanceof ProActiveInternalObject)) {
00162                 // if the object is not serilizable, FT is disabled
00163                 if (this.localBodyStrategy.getReifiedObject() instanceof Serializable) {
00164                     try {
00165                         // create the fault tolerance manager
00166                         int protocolSelector = FTManager.getProtoSelector(ProActiveConfiguration.getFTProtocol());
00167                         this.ftmanager = factory.newFTManagerFactory()
00168                                                 .newFTManager(protocolSelector);
00169                         this.ftmanager.init(this);
00170                         if (bodyLogger.isDebugEnabled()) {
00171                             bodyLogger.debug("Init FTManager on " +
00172                                 this.getNodeURL());
00173                         }
00174                     } catch (ProActiveException e) {
00175                         bodyLogger.error(
00176                             "**ERROR** Unable to init FTManager. Fault-tolerance is disabled " +
00177                             e);
00178                         this.ftmanager = null;
00179                     }
00180                 } else {
00181                     // target body is not serilizable
00182                     bodyLogger.error(
00183                         "**ERROR** Activated object is not serializable. Fault-tolerance is disabled");
00184                     this.ftmanager = null;
00185                 }
00186             }
00187         } else {
00188             this.ftmanager = null;
00189         }
00190     }
00191 
00192     //
00193     // -- PUBLIC METHODS -----------------------------------------------
00194     //
00195     //
00196     // -- implements MessageEventProducer -----------------------------------------------
00197     //
00198     public void addMessageEventListener(MessageEventListener listener) {
00199         if (messageEventProducer != null) {
00200             messageEventProducer.addMessageEventListener(listener);
00201         }
00202     }
00203 
00204     public void removeMessageEventListener(MessageEventListener listener) {
00205         if (messageEventProducer != null) {
00206             messageEventProducer.removeMessageEventListener(listener);
00207         }
00208     }
00209 
00210     //
00211     // -- PROTECTED METHODS -----------------------------------------------
00212     //
00213 
00220     protected int internalReceiveRequest(Request request)
00221         throws java.io.IOException, RenegotiateSessionException {
00222         if (messageEventProducer != null) {
00223             messageEventProducer.notifyListeners(request,
00224                 MessageEvent.REQUEST_RECEIVED, bodyID,
00225                 getRequestQueue().size() + 1);
00226         }
00227 
00228         // request queue length = number of requests in queue
00229         //                                                      + the one to add now 
00230         return requestReceiver.receiveRequest(request, this);
00231     }
00232 
00238     protected int internalReceiveReply(Reply reply) throws java.io.IOException {
00239         //System.out.print("Body receives Reply -> ");
00240         if (messageEventProducer != null) {
00241             messageEventProducer.notifyListeners(reply,
00242                 MessageEvent.REPLY_RECEIVED, bodyID);
00243         }
00244 
00245         /*if (reply.getResult() != null) {
00246            System.out.println("Result contains in Reply is : " + reply.getResult().getClass());
00247            } else {
00248                    System.out.println("Reply is : " + reply);
00249            }*/
00250         return replyReceiver.receiveReply(reply, this, getFuturePool());
00251     }
00252 
00256     protected void activityStopped() {
00257         super.activityStopped();
00258         messageEventProducer = null;
00259         setLocalBodyImpl(new InactiveLocalBodyStrategy());
00260     }
00261 
00262     //protected void activityStopped2() {
00263     //  super.activityStopped2();
00264     //  
00265     //}
00266     public void setImmediateService(String methodName)
00267         throws java.io.IOException {
00268         this.requestReceiver.setImmediateService(methodName);
00269     }
00270 
00271     public void setImmediateService(String methodName, Class[] parametersTypes)
00272         throws IOException {
00273         this.requestReceiver.setImmediateService(methodName, parametersTypes);
00274     }
00275 
00276     public void removeImmediateService(String methodName,
00277         Class[] parametersTypes) throws IOException {
00278         this.requestReceiver.removeImmediateService(methodName, parametersTypes);
00279     }
00280 
00281     public void updateNodeURL(String newNodeURL) {
00282         this.nodeURL = newNodeURL;
00283     }
00284 
00285     //
00286     // -- PRIVATE METHODS -----------------------------------------------
00287     //
00288     //
00289     // -- inner classes -----------------------------------------------
00290     //
00291     private class ActiveLocalBodyStrategy implements LocalBodyStrategy,
00292         java.io.Serializable {
00293 
00295         protected FuturePool futures;
00296 
00298         protected Object reifiedObject;
00299         protected BlockingRequestQueue requestQueue;
00300         protected RequestFactory internalRequestFactory;
00301         private long absoluteSequenceID;
00302 
00303         //
00304         // -- CONSTRUCTORS -----------------------------------------------
00305         //
00306         public ActiveLocalBodyStrategy(Object reifiedObject,
00307             BlockingRequestQueue requestQueue, RequestFactory requestFactory) {
00308             this.reifiedObject = reifiedObject;
00309             this.futures = new FuturePool();
00310             this.requestQueue = requestQueue;
00311             this.internalRequestFactory = requestFactory;
00312         }
00313 
00314         //
00315         // -- PUBLIC METHODS -----------------------------------------------
00316         //
00317         //
00318         // -- implements LocalBody -----------------------------------------------
00319         //
00320         public FuturePool getFuturePool() {
00321             return futures;
00322         }
00323 
00324         public BlockingRequestQueue getRequestQueue() {
00325             return requestQueue;
00326         }
00327 
00328         public Object getReifiedObject() {
00329             return reifiedObject;
00330         }
00331 
00332         public String getName() {
00333             return reifiedObject.getClass().getName();
00334         }
00335 
00339         public void serve(Request request) {
00340             if (request == null) {
00341                 return;
00342             }
00343             try {
00344                 messageEventProducer.notifyListeners(request,
00345                     MessageEvent.SERVING_STARTED, bodyID,
00346                     getRequestQueue().size());
00347                 Reply reply = null;
00348                 try {
00349                     if (Profiling.SERVICE) {
00350                         timer.setTimer("serve." + request.getMethodName());
00351                         timer.start();
00352                     }
00353                     
00354                     //If the request is not a "terminate Active Object" request, 
00355                     //it is served normally.
00356                     if(!isTerminateAORequest(request)) {
00357                         reply = request.serve(BodyImpl.this);
00358                     }
00359                     
00360                     if (Profiling.SERVICE) {
00361                         //timer.setTimer("serve."+this.getMethodName());
00362                         timer.stop();
00363                     }
00364                 } catch (ServeException e) {
00365                     // Create a non functional exception encapsulating the service exception
00366                     BodyNonFunctionalException calleeNFE = new ServiceFailedCalleeNFE(
00367                             "Exception occured while serving pending request = " +
00368                             request.getMethodName(), e, this,
00369                             ProActive.getBodyOnThis());
00370                     NFEManager.fireNFE(calleeNFE, BodyImpl.this);
00371 
00372                     // Create a non functional exception encapsulating the service exception
00373                     ProxyNonFunctionalException callerNFE = new ServiceFailedCallerNFE(
00374                             "Exception occured while serving pending request = " +
00375                             request.getMethodName(), e);
00376 
00377                     // Create a new reply that contains this NFE instead of the result
00378                     Reply replyAlternate = null;
00379                     replyAlternate = request.serveAlternate(BodyImpl.this,
00380                             callerNFE);
00381 
00382                     // Send reply and stop local node if desired
00383                     if (replyAlternate == null) {
00384                         if (!isActive()) {
00385                             return; //test if active in case of terminate() method otherwise eventProducer would be null
00386                         }
00387                         messageEventProducer.notifyListeners(request,
00388                             MessageEvent.VOID_REQUEST_SERVED, bodyID,
00389                             getRequestQueue().size());
00390                         return;
00391                     }
00392                     UniqueID destinationBodyId = request.getSourceBodyID();
00393                     if ((destinationBodyId != null) &&
00394                             (messageEventProducer != null)) {
00395                         messageEventProducer.notifyListeners(reply,
00396                             MessageEvent.REPLY_SENT, destinationBodyId,
00397                             getRequestQueue().size());
00398                     }
00399                     this.getFuturePool().registerDestination(request.getSender());
00400 
00401                     // FAULT-TOLERANCE
00402                     if (BodyImpl.this.ftmanager != null) {
00403                         BodyImpl.this.ftmanager.sendReply(replyAlternate,
00404                             request.getSender());
00405                     } else {
00406                         replyAlternate.send(request.getSender());
00407                     }
00408 
00409                     this.getFuturePool().removeDestination();
00410                     return;
00411                 }
00412 
00413                 if (reply == null) {
00414                     if (!isActive()) {
00415                         return; //test if active in case of terminate() method otherwise eventProducer would be null
00416                     }
00417                     messageEventProducer.notifyListeners(request,
00418                         MessageEvent.VOID_REQUEST_SERVED, bodyID,
00419                         getRequestQueue().size());
00420                     return;
00421                 }
00422                 UniqueID destinationBodyId = request.getSourceBodyID();
00423                 if ((destinationBodyId != null) &&
00424                         (messageEventProducer != null)) {
00425                     messageEventProducer.notifyListeners(reply,
00426                         MessageEvent.REPLY_SENT, destinationBodyId,
00427                         getRequestQueue().size());
00428                 }
00429                 this.getFuturePool().registerDestination(request.getSender());
00430 
00431                 // FAULT-TOLERANCE
00432                 if (BodyImpl.this.ftmanager != null) {
00433                     BodyImpl.this.ftmanager.sendReply(reply, request.getSender());
00434                 } else {
00435                     reply.send(request.getSender());
00436                 }
00437 
00438                 this.getFuturePool().removeDestination();
00439             } catch (java.io.IOException e) {
00440                 // Create a non functional exception encapsulating the network exception
00441                 BodyNonFunctionalException nfe = new SendReplyCommunicationException(
00442                         "Exception occured in while sending reply to request = " +
00443                         request.getMethodName(), e);
00444 
00445                 NFEManager.fireNFE(nfe, BodyImpl.this);
00446             }
00447         }
00448 
00449         public void sendRequest(MethodCall methodCall, Future future,
00450             UniversalBody destinationBody)
00451             throws java.io.IOException, RenegotiateSessionException {
00452             long sequenceID = getNextSequenceID();
00453             Request request = internalRequestFactory.newRequest(methodCall,
00454                     BodyImpl.this, future == null, sequenceID);
00455 
00456             // COMPONENTS : generate ComponentRequest for component messages
00457             if (methodCall.getComponentMetadata()!=null) {
00458                 request = new ComponentRequestImpl((RequestImpl) request);
00459             }
00460             if (future != null) {
00461                 future.setID(sequenceID);
00462                 futures.receiveFuture(future);
00463             }
00464             messageEventProducer.notifyListeners(request,
00465                 MessageEvent.REQUEST_SENT, destinationBody.getID());
00466 
00467             // FAULT TOLERANCE
00468             if (BodyImpl.this.ftmanager != null) {
00469                 BodyImpl.this.ftmanager.sendRequest(request, destinationBody);
00470             } else {
00471                 request.send(destinationBody);
00472             }
00473         }
00474 
00479         public synchronized long getNextSequenceID() {
00480             return bodyID.toString().hashCode() + ++absoluteSequenceID;
00481         }
00482 
00483         //
00484         // -- PROTECTED METHODS -----------------------------------------------
00485         //
00486 
00487         
00494         private boolean isTerminateAORequest(Request request) {
00495                 boolean terminateRequest = (request.getMethodName()).startsWith("_terminateAO");
00496                 if (terminateRequest) {
00497                         terminate();
00498                 }
00499                 return terminateRequest;
00500         }
00501     }
00502 
00503     // end inner class LocalBodyImpl
00504     private class InactiveLocalBodyStrategy implements LocalBodyStrategy,
00505         java.io.Serializable {
00506         //
00507         // -- CONSTRUCTORS -----------------------------------------------
00508         //
00509         public InactiveLocalBodyStrategy() {
00510         }
00511 
00512         //
00513         // -- PUBLIC METHODS -----------------------------------------------
00514         //
00515         //
00516         // -- implements LocalBody -----------------------------------------------
00517         //
00518         public FuturePool getFuturePool() {
00519             //throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00520             return null;
00521         }
00522 
00523         public BlockingRequestQueue getRequestQueue() {
00524             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00525         }
00526 
00527         public RequestQueue getHighPriorityRequestQueue() {
00528             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00529         }
00530 
00531         public Object getReifiedObject() {
00532             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00533         }
00534 
00535         public String getName() {
00536             return "inactive body";
00537         }
00538 
00539         public void serve(Request request) {
00540             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00541         }
00542 
00543         public void sendRequest(MethodCall methodCall, Future future,
00544             UniversalBody destinationBody) throws java.io.IOException {
00545             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
00546         }
00547 
00548         /*
00549          * @see org.objectweb.proactive.core.body.LocalBodyStrategy#getNextSequenceID()
00550          */
00551         public long getNextSequenceID() {
00552             return 0;
00553         }
00554 
00555         
00556     }
00557 
00558     // end inner class LocalInactiveBody
00559 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1