org/objectweb/proactive/core/body/HalfBody.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body;
00032 
00033 import org.objectweb.proactive.core.ProActiveException;
00034 import org.objectweb.proactive.core.ProActiveRuntimeException;
00035 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00036 import org.objectweb.proactive.core.body.future.Future;
00037 import org.objectweb.proactive.core.body.future.FuturePool;
00038 import org.objectweb.proactive.core.body.reply.Reply;
00039 import org.objectweb.proactive.core.body.reply.ReplyReceiver;
00040 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00041 import org.objectweb.proactive.core.body.request.Request;
00042 import org.objectweb.proactive.core.body.request.RequestFactory;
00043 import org.objectweb.proactive.core.body.request.RequestImpl;
00044 import org.objectweb.proactive.core.body.request.RequestQueue;
00045 import org.objectweb.proactive.core.component.request.ComponentRequestImpl;
00046 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00047 import org.objectweb.proactive.core.event.MessageEventListener;
00048 import org.objectweb.proactive.core.exceptions.NonFunctionalException;
00049 import org.objectweb.proactive.core.exceptions.manager.NFEListener;
00050 import org.objectweb.proactive.core.exceptions.manager.NFEListenerList;
00051 import org.objectweb.proactive.core.mop.MethodCall;
00052 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00053 import org.objectweb.proactive.core.util.log.Loggers;
00054 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00055 import org.objectweb.proactive.ext.security.InternalBodySecurity;
00056 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00057 
00058 
00059 public class HalfBody extends AbstractBody {
00060     //
00061     // -- PRIVATE MEMBERS -----------------------------------------------
00062     //
00063     private static final String HALF_BODY_EXCEPTION_MESSAGE = "This method is not implemented in class HalfBody.";
00064     private static final String NAME = "Other thread";
00065 
00067     private ReplyReceiver replyReceiver;
00068 
00069     public synchronized static HalfBody getHalfBody(MetaObjectFactory factory) {
00070         return new HalfBody(factory);
00071     }
00072 
00073     //
00074     // -- CONSTRUCTORS -----------------------------------------------
00075     //
00076     private HalfBody(MetaObjectFactory factory) {
00077         super(new Object(), "LOCAL", factory, getRuntimeJobID());
00078 
00079         //SECURITY 
00080         if (psm != null) {
00081             psm = psm.generateSiblingCertificate("HalfBody");
00082             psm.setBody(this);
00083             isSecurityOn = psm.getCertificate() != null;
00084             internalBodySecurity = new InternalBodySecurity(null); // SECURITY
00085             ProActiveLogger.getLogger(Loggers.SECURITY_MANAGER).debug("  ------> HalfBody Security is " +
00086                 isSecurityOn);
00087         }
00088 
00089         this.replyReceiver = factory.newReplyReceiverFactory().newReplyReceiver();
00090         setLocalBodyImpl(new HalfLocalBodyStrategy(factory.newRequestFactory()));
00091         this.localBodyStrategy.getFuturePool().setOwnerBody(this.getID());
00092 
00093         // FAULT TOLERANCE
00094         String ftstate = ProActiveConfiguration.getFTState();
00095         if ("enable".equals(ftstate)) {
00096             try {
00097                 // create the fault-tolerance manager
00098                 int protocolSelector = FTManager.getProtoSelector(ProActiveConfiguration.getFTProtocol());
00099                 this.ftmanager = factory.newFTManagerFactory().newHalfFTManager(protocolSelector);
00100                 this.ftmanager.init(this);
00101                 if (bodyLogger.isDebugEnabled()) {
00102                     bodyLogger.debug("Init FTManager on " + this.getNodeURL());
00103                 }
00104             } catch (ProActiveException e) {
00105                 bodyLogger.error(
00106                     "**ERROR** Unable to init FTManager. Fault-tolerance is disabled " +
00107                     e);
00108                 this.ftmanager = null;
00109             }
00110         } else {
00111             this.ftmanager = null;
00112         }
00113     }
00114 
00115     //
00116     // -- PUBLIC METHODS -----------------------------------------------
00117     //
00118     //
00119     // -- implements MessageEventProducer -----------------------------------------------
00120     //
00121     public void addMessageEventListener(MessageEventListener listener) {
00122     }
00123 
00124     public void removeMessageEventListener(MessageEventListener listener) {
00125     }
00126 
00127     //
00128     // -- PROTECTED METHODS -----------------------------------------------
00129     //
00130 
00137     protected int internalReceiveRequest(Request request)
00138         throws java.io.IOException {
00139         throw new ProActiveRuntimeException(
00140             "The method 'receiveRequest' is not implemented in class HalfBody.");
00141     }
00142 
00148     protected int internalReceiveReply(Reply reply) throws java.io.IOException {
00149         try {
00150             if (reply.isCiphered()) {
00151                 reply.decrypt(psm);
00152             }
00153         } catch (Exception e) {
00154             e.printStackTrace();
00155         }
00156 
00157         return replyReceiver.receiveReply(reply, this, getFuturePool());
00158     }
00159 
00160     public void setImmediateService(String methodName) {
00161         throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00162     }
00163 
00164     public void setImmediateService(String methodName, Class[] parametersTypes) {
00165         throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00166     }
00167 
00168     public void removeImmediateService(String methodName,
00169         Class[] parametersTypes) {
00170         throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00171     }
00172 
00176     public String getJobID() {
00177         return getRuntimeJobID();
00178     }
00179 
00180     private static String getRuntimeJobID() {
00181         return ProActiveRuntimeImpl.getProActiveRuntime().getJobID();
00182     }
00183 
00184     public void updateNodeURL(String newNodeURL) {
00185         throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00186     }
00187 
00188     //
00189     // -- inner classes -----------------------------------------------
00190     //
00191     private class HalfLocalBodyStrategy implements LocalBodyStrategy,
00192         java.io.Serializable {
00193 
00195         protected FuturePool futures;
00196         protected RequestFactory internalRequestFactory;
00197         private long absoluteSequenceID;
00198 
00199         //
00200         // -- CONSTRUCTORS -----------------------------------------------
00201         //
00202         public HalfLocalBodyStrategy(RequestFactory requestFactory) {
00203             this.futures = new FuturePool();
00204             this.internalRequestFactory = requestFactory;
00205         }
00206 
00207         //
00208         // -- PUBLIC METHODS -----------------------------------------------
00209         //
00210         //
00211         // -- implements LocalBody -----------------------------------------------
00212         //
00213         public FuturePool getFuturePool() {
00214             return futures;
00215         }
00216 
00217         public BlockingRequestQueue getRequestQueue() {
00218             throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00219         }
00220 
00221         public RequestQueue getHighPriorityRequestQueue() {
00222             throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00223         }
00224 
00225         public Object getReifiedObject() {
00226             throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00227         }
00228 
00229         public String getName() {
00230             return NAME;
00231         }
00232 
00233         public void serve(Request request) {
00234             throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE);
00235         }
00236 
00237         public void sendRequest(MethodCall methodCall, Future future,
00238             UniversalBody destinationBody)
00239             throws java.io.IOException, RenegotiateSessionException {
00240             long sequenceID = getNextSequenceID();
00241             Request request = internalRequestFactory.newRequest(methodCall,
00242                     HalfBody.this, future == null, sequenceID);
00243 
00244             // COMPONENTS : generate ComponentRequest for component messages
00245             if (methodCall.getComponentMetadata() != null ) {
00246                 request = new ComponentRequestImpl((RequestImpl) request);
00247             }
00248             if (future != null) {
00249                 future.setID(sequenceID);
00250                 futures.receiveFuture(future);
00251             }
00252 
00253             // FAULT TOLERANCE
00254             // System.out.println("a half body send a request: " + request.getMethodName());
00255             if (HalfBody.this.ftmanager != null) {
00256                 HalfBody.this.ftmanager.sendRequest(request, destinationBody);
00257             } else {
00258                 request.send(destinationBody);
00259             }
00260         }
00261 
00262         //
00263         // -- PROTECTED METHODS -----------------------------------------------
00264         //
00265 
00270         public synchronized long getNextSequenceID() {
00271             return bodyID.toString().hashCode() + ++absoluteSequenceID;
00272         }
00273     }
00274 
00275     // end inner class LocalHalfBody
00276     // NFEProducer implementation
00277     private NFEListenerList nfeListeners = null;
00278 
00279     public void addNFEListener(NFEListener listener) {
00280         if (nfeListeners == null) {
00281             nfeListeners = new NFEListenerList();
00282         }
00283         nfeListeners.addNFEListener(listener);
00284     }
00285 
00286     public void removeNFEListener(NFEListener listener) {
00287         if (nfeListeners != null) {
00288             nfeListeners.removeNFEListener(listener);
00289         }
00290     }
00291 
00292     public int fireNFE(NonFunctionalException e) {
00293         if (nfeListeners != null) {
00294             return nfeListeners.fireNFE(e);
00295         }
00296         return 0;
00297     }
00298 
00299     public long getNextSequenceID() {
00300         return localBodyStrategy.getNextSequenceID();
00301     }
00302 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1