org/objectweb/proactive/core/body/AbstractBody.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body;
00032 
00033 import java.io.IOException;
00034 import java.security.PublicKey;
00035 import java.security.cert.CertificateEncodingException;
00036 import java.security.cert.X509Certificate;
00037 import java.util.ArrayList;
00038 import java.util.Collection;
00039 import java.util.Hashtable;
00040 
00041 import org.apache.log4j.Logger;
00042 import org.objectweb.proactive.Body;
00043 import org.objectweb.proactive.core.UniqueID;
00044 import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
00045 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00046 import org.objectweb.proactive.core.body.future.Future;
00047 import org.objectweb.proactive.core.body.future.FuturePool;
00048 import org.objectweb.proactive.core.body.future.FutureProxy;
00049 import org.objectweb.proactive.core.body.proxy.UniversalBodyProxy;
00050 import org.objectweb.proactive.core.body.reply.Reply;
00051 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
00052 import org.objectweb.proactive.core.body.request.Request;
00053 import org.objectweb.proactive.core.component.representative.ItfID;
00054 import org.objectweb.proactive.core.component.request.Shortcut;
00055 import org.objectweb.proactive.core.gc.GarbageCollector;
00056 import org.objectweb.proactive.core.group.ProActiveGroup;
00057 import org.objectweb.proactive.core.group.spmd.ProActiveSPMDGroupManager;
00058 import org.objectweb.proactive.core.mop.MethodCall;
00059 import org.objectweb.proactive.core.util.ThreadStore;
00060 import org.objectweb.proactive.core.util.log.Loggers;
00061 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00062 import org.objectweb.proactive.ext.security.Communication;
00063 import org.objectweb.proactive.ext.security.DefaultProActiveSecurityManager;
00064 import org.objectweb.proactive.ext.security.InternalBodySecurity;
00065 import org.objectweb.proactive.ext.security.PolicyServer;
00066 import org.objectweb.proactive.ext.security.ProActiveSecurity;
00067 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00068 import org.objectweb.proactive.ext.security.Secure;
00069 import org.objectweb.proactive.ext.security.SecurityContext;
00070 import org.objectweb.proactive.ext.security.crypto.AuthenticationException;
00071 import org.objectweb.proactive.ext.security.crypto.KeyExchangeException;
00072 import org.objectweb.proactive.ext.security.exceptions.CommunicationForbiddenException;
00073 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00074 import org.objectweb.proactive.ext.security.exceptions.SecurityNotAvailableException;
00075 import org.objectweb.proactive.ext.security.securityentity.Entity;
00076 
00077 
00103 public abstract class AbstractBody extends AbstractUniversalBody implements Body,
00104     java.io.Serializable {
00105     //
00106     // -- STATIC MEMBERS -----------------------------------------------
00107     //
00108     private static final String TERMINATED_BODY_EXCEPTION_MESSAGE = "The body has been Terminated";
00109     
00110     private static Logger logger = ProActiveLogger.getLogger(Loggers.BODY);
00111 
00112     //
00113     // -- PROTECTED MEMBERS -----------------------------------------------
00114     //
00115     protected ThreadStore threadStore;
00116 
00117     // the current implementation of the local view of this body
00118     protected LocalBodyStrategy localBodyStrategy;
00119 
00120     // SECURITY
00121     protected ProActiveSecurityManager psm;
00122     protected boolean isSecurityOn = false;
00123     protected transient InternalBodySecurity internalBodySecurity;
00124     protected Hashtable openedSessions;
00125     protected boolean isInterfaceSecureImplemented = false;
00126 
00127     // SPMD GROUP
00128     protected ProActiveSPMDGroupManager spmdManager;
00129 
00130     // FAULT TOLERANCE
00131     protected FTManager ftmanager;
00132 
00133     //
00134     // -- PRIVATE MEMBERS -----------------------------------------------
00135     //
00136 
00138     private transient boolean isActive;
00139 
00142     private transient boolean isDead;
00143     
00144     // GC
00145     private transient GarbageCollector gc;
00146 
00147     //
00148     // -- CONSTRUCTORS -----------------------------------------------
00149     //
00150 
00155     public AbstractBody() {
00156     }
00157 
00165     public AbstractBody(Object reifiedObject, String nodeURL,
00166         MetaObjectFactory factory, String jobId) {
00167         super(nodeURL, factory.newRemoteBodyFactory(), jobId);
00168         this.threadStore = factory.newThreadStoreFactory().newThreadStore();
00169 
00170         // GROUP
00171         this.spmdManager = factory.newProActiveSPMDGroupManagerFactory()
00172                                   .newProActiveSPMDGroupManager();
00173 
00174         ProActiveSecurity.loadProvider();
00175         // SECURITY
00176         if (reifiedObject instanceof Secure) {
00177             isInterfaceSecureImplemented = true;
00178         }
00179 
00180         if ((psm = factory.getProActiveSecurityManager()) == null) {
00181             isSecurityOn = false;
00182             ProActiveLogger.getLogger(Loggers.SECURITY_BODY)
00183                            .debug("Active Object security Off");
00184         } else {
00185             isSecurityOn = true;
00186             ProActiveLogger.getLogger(Loggers.SECURITY_BODY)
00187                            .debug("Active Object security On application is " +
00188                 psm.getPolicyServer().getApplicationName());
00189             ProActiveLogger.getLogger(Loggers.SECURITY_BODY)
00190                            .debug("current thread is " +
00191                 Thread.currentThread().getName());
00192         }
00193 
00194         if (this.isSecurityOn) {
00195             psm.setBody(this);
00196             isSecurityOn = psm.getCertificate() != null;
00197             internalBodySecurity = new InternalBodySecurity(null); // SECURITY
00198         }
00199         this.gc = new GarbageCollector(this);
00200     }
00201     
00202     public void updateReference(UniversalBodyProxy ref) {
00203         this.gc.addProxy(ref);
00204     }
00205     
00206     public void updateReferences(Collection<UniversalBodyProxy> newReferences) {
00207                 for (UniversalBodyProxy ubp : newReferences) {
00208                         this.gc.addProxy(ubp);
00209         }
00210     }
00211 
00212     public Collection<UniversalBodyProxy> getReferences() {
00213         return this.gc.getReferences();
00214     }
00215 
00216     //
00217     // -- PUBLIC METHODS -----------------------------------------------
00218     //
00219 
00224     public String toString() {
00225         // get the incarnation number if ft is enable
00226         String inc = (this.ftmanager != null) ? ("" + this.ftmanager) : ("");
00227         return "Body for " + localBodyStrategy.getName() + " node=" + nodeURL +
00228         " id=" + bodyID + inc;
00229     }
00230 
00231     //
00232     // -- implements UniversalBody -----------------------------------------------
00233     //
00234     public int receiveRequest(Request request)
00235         throws java.io.IOException, RenegotiateSessionException {
00236         //  System.out.println("" + this + "  --> receiveRequest m="+request.getMethodName());
00237         // NON_FT is returned if this object is not fault tolerant
00238         int ftres = FTManager.NON_FT;
00239         if (this.ftmanager != null) {
00240             if (this.isDead) {
00241                 throw new java.io.IOException(TERMINATED_BODY_EXCEPTION_MESSAGE);
00242             } else {
00243                 ftres = this.ftmanager.onReceiveRequest(request);
00244                 if (request.ignoreIt()) {
00245                     return ftres;
00246                 }
00247             }
00248         }
00249         try {
00250             this.enterInThreadStore();
00251             if (this.isDead) {
00252                 throw new java.io.IOException(TERMINATED_BODY_EXCEPTION_MESSAGE);
00253             }
00254             if (this.isSecurityOn) {
00255 
00256                 /*
00257                    if (isInterfaceSecureImplemented) {
00258                    Session session = psm.getSession(request.getSessionId());
00259                    ((Secure) getReifiedObject()).receiveRequest(session.getSecurityContext());
00260                    }
00261                  */
00262                 try {
00263                     this.renegociateSessionIfNeeded(request.getSessionId());
00264                     if ((this.internalBodySecurity.isLocalBody()) &&
00265                             request.isCiphered()) {
00266                         request.decrypt(psm);
00267                     }
00268                 } catch (SecurityNotAvailableException e) {
00269                     // do nothing
00270                     e.printStackTrace();
00271                 }
00272             }
00273             this.registerIncomingFutures();
00274             ftres = this.internalReceiveRequest(request);
00275             if (GarbageCollector.isBuildingTopology()) {
00276                 updateReferences(UniversalBodyProxy.getIncomingReferences());
00277             }
00278         } finally {
00279             this.exitFromThreadStore();
00280         }
00281         return ftres;
00282     }
00283 
00284     public int receiveReply(Reply reply) throws java.io.IOException {
00285         //System.out.println("  --> receiveReply m="+reply.getMethodName());
00286         // NON_FT is returned if this object is not fault tolerant
00287         int ftres = FTManager.NON_FT;
00288         if (this.ftmanager != null) {
00289             if (this.isDead) {
00290                 throw new java.io.IOException(TERMINATED_BODY_EXCEPTION_MESSAGE);
00291             } else {
00292                 ftres = this.ftmanager.onReceiveReply(reply);
00293                 if (reply.ignoreIt()) {
00294                     return ftres;
00295                 }
00296             }
00297         }
00298 
00299         try {
00300             enterInThreadStore();
00301             if (isDead) {
00302                 throw new java.io.IOException(TERMINATED_BODY_EXCEPTION_MESSAGE);
00303             }
00304 
00305             //System.out.println("Body receives Reply on NODE : " + this.nodeURL); 
00306             if (isSecurityOn) {
00307                 try {
00308                     if ((internalBodySecurity.isLocalBody()) &&
00309                             reply.isCiphered()) {
00310                         reply.decrypt(psm);
00311                     }
00312                 } catch (Exception e) {
00313                     e.printStackTrace();
00314                 }
00315             }
00316             this.registerIncomingFutures();
00317             ftres = internalReceiveReply(reply);
00318             if (GarbageCollector.isBuildingTopology()) {
00319                 updateReferences(UniversalBodyProxy.getIncomingReferences());
00320             }
00321         } finally {
00322             exitFromThreadStore();
00323         }
00324         return ftres;
00325     }
00326 
00333     public void registerIncomingFutures() {
00334         // get list of futures that should be deserialized and registred "behind the ThreadStore"
00335         java.util.ArrayList incomingFutures = FuturePool.getIncomingFutures();
00336 
00337         if (incomingFutures != null) {
00338             // if futurePool is not null, we are in an Active Body
00339             if (getFuturePool() != null) {
00340                 // some futures have to be registred in the local futurePool
00341                 java.util.Iterator it = incomingFutures.iterator();
00342                 while (it.hasNext()) {
00343                     Future current = (Future) (it.next());
00344                     getFuturePool().receiveFuture(current);
00345                 }
00346                 FuturePool.removeIncomingFutures();
00347             } else {
00348                 // we are in a forwarder
00349                 // some futures have to set their continuation tag
00350                 java.util.Iterator it = incomingFutures.iterator();
00351                 while (it.hasNext()) {
00352                     FutureProxy current = (FutureProxy) (it.next());
00353                     current.setContinuationTag();
00354                 }
00355                 FuturePool.removeIncomingFutures();
00356             }
00357         }
00358     }
00359 
00360     public void enableAC() {
00361         localBodyStrategy.getFuturePool().enableAC();
00362     }
00363 
00364     public void disableAC() {
00365         localBodyStrategy.getFuturePool().disableAC();
00366     }
00367 
00368     public void renegociateSessionIfNeeded(long sID)
00369         throws RenegotiateSessionException, SecurityNotAvailableException,
00370             IOException {
00371         try {
00372             enterInThreadStore();
00373             if (!internalBodySecurity.isLocalBody() &&
00374                     (openedSessions != null)) {
00375                 // inside a forwarder
00376                 Long sessionID;
00377 
00378                 //long sID = request.getSessionId();
00379                 if (sID != 0) {
00380                     sessionID = new Long(sID);
00381                     if (openedSessions.containsKey(sessionID)) {
00382                         openedSessions.remove(sessionID);
00383                         internalBodySecurity.terminateSession(sID);
00384                         //System.out.println("Object has migrated : Renegotiate Session");
00385                         throw new RenegotiateSessionException(internalBodySecurity.getDistantBody());
00386                     }
00387                 }
00388             }
00389         } finally {
00390             exitFromThreadStore();
00391         }
00392     }
00393 
00394     public void terminateSession(long sessionID)
00395         throws SecurityNotAvailableException, IOException {
00396         try {
00397             enterInThreadStore();
00398             if (isSecurityOn) {
00399                 if (internalBodySecurity.isLocalBody()) {
00400                     psm.terminateSession(sessionID);
00401                 } else {
00402                     internalBodySecurity.terminateSession(sessionID);
00403                 }
00404             }
00405         } finally {
00406             exitFromThreadStore();
00407         }
00408     }
00409 
00410     public X509Certificate getCertificate()
00411         throws SecurityNotAvailableException, IOException {
00412         try {
00413             enterInThreadStore();
00414             if (isSecurityOn) {
00415                 if (internalBodySecurity.isLocalBody()) {
00416                     //  System.out.println(" getCertificate on demande un security manager a " + ProActive.getBodyOnThis());
00417                     //  if (psm == null) {
00418                     //  startDefaultProActiveSecurityManager();
00419                     //}
00420                     return psm.getCertificate();
00421                 } else {
00422                     return internalBodySecurity.getCertificate();
00423                 }
00424             }
00425             throw new SecurityNotAvailableException();
00426         } finally {
00427             exitFromThreadStore();
00428         }
00429     }
00430 
00431     public ProActiveSecurityManager getProActiveSecurityManager()
00432         throws java.io.IOException, SecurityNotAvailableException {
00433         try {
00434             enterInThreadStore();
00435             if (isSecurityOn) {
00436                 if (internalBodySecurity.isLocalBody()) {
00437                     //  System.out.println("getProActiveSecurityManager on demande un security manager a " + ProActive.getBodyOnThis());
00438                     // if (psm == null) {
00439                     //    startDefaultProActiveSecurityManager();
00440                     // }
00441                     return psm;
00442                 } else {
00443                     //ProActiveSecurityManager plop = internalBodySecurity.getProActiveSecurityManager();
00444                     //return plop;
00445                     return null;
00446                 }
00447             } else {
00448                 throw new SecurityNotAvailableException();
00449             }
00450         } finally {
00451             exitFromThreadStore();
00452         }
00453     }
00454 
00455     public ProActiveSPMDGroupManager getProActiveSPMDGroupManager() {
00456         return this.spmdManager;
00457     }
00458 
00459     public long startNewSession(Communication policy)
00460         throws RenegotiateSessionException, SecurityNotAvailableException,
00461             IOException {
00462         try {
00463             enterInThreadStore();
00464             if (isSecurityOn) {
00465                 long sessionID;
00466 
00467                 if (internalBodySecurity.isLocalBody()) {
00468                     //System.out.println("startNewSession on demande un security manager a " + ProActive.getBodyOnThis());
00469                     //if (psm == null) {
00470                     //startDefaultProActiveSecurityManager();
00471                     //}
00472                     sessionID = psm.startNewSession(policy);
00473 
00474                     return sessionID;
00475                 } else {
00476                     sessionID = internalBodySecurity.startNewSession(policy);
00477 
00478                     return sessionID;
00479                 }
00480             }
00481             throw new SecurityNotAvailableException();
00482         } finally {
00483             exitFromThreadStore();
00484         }
00485     }
00486 
00487     public PublicKey getPublicKey()
00488         throws SecurityNotAvailableException, IOException {
00489         try {
00490             enterInThreadStore();
00491             if (isSecurityOn) {
00492                 PublicKey pk;
00493 
00494                 if (internalBodySecurity.isLocalBody()) {
00495                     //  System.out.println("getPublicKey on demande un security manager a " + ProActive.getBodyOnThis());
00496                     //if (psm == null) {
00497                     //         startDefaultProActiveSecurityManager();
00498                     //        }
00499                     pk = psm.getPublicKey();
00500 
00501                     return pk;
00502                 } else {
00503                     pk = internalBodySecurity.getPublicKey();
00504 
00505                     return pk;
00506                 }
00507             }
00508             throw new SecurityNotAvailableException();
00509         } finally {
00510             exitFromThreadStore();
00511         }
00512     }
00513 
00514     public byte[] randomValue(long sessionID, byte[] clientRandomValue)
00515         throws SecurityNotAvailableException, RenegotiateSessionException,
00516             IOException {
00517         try {
00518             enterInThreadStore();
00519             if (isSecurityOn) {
00520                 byte[] plop;
00521 
00522                 if (internalBodySecurity.isLocalBody()) {
00523                     plop = psm.randomValue(sessionID, clientRandomValue);
00524 
00525                     return plop;
00526                 } else {
00527                     plop = internalBodySecurity.randomValue(sessionID,
00528                             clientRandomValue);
00529 
00530                     return plop;
00531                 }
00532             }
00533             throw new SecurityNotAvailableException();
00534         } finally {
00535             exitFromThreadStore();
00536         }
00537     }
00538 
00539     public byte[][] publicKeyExchange(long sessionID, byte[] myPublicKey,
00540         byte[] myCertificate, byte[] signature)
00541         throws SecurityNotAvailableException, RenegotiateSessionException,
00542             KeyExchangeException, IOException {
00543         try {
00544             enterInThreadStore();
00545             if (isSecurityOn) {
00546                 renegociateSessionIfNeeded(sessionID);
00547 
00548                 byte[][] pke;
00549 
00550                 if (internalBodySecurity.isLocalBody()) {
00551                     pke = psm.publicKeyExchange(sessionID, myPublicKey,
00552                             myCertificate, signature);
00553 
00554                     return pke;
00555                 } else {
00556                     pke = internalBodySecurity.publicKeyExchange(sessionID,
00557                             myPublicKey, myCertificate, signature);
00558 
00559                     return pke;
00560                 }
00561             }
00562             throw new SecurityNotAvailableException();
00563         } finally {
00564             exitFromThreadStore();
00565         }
00566     }
00567 
00568     public byte[][] secretKeyExchange(long sessionID, byte[] encodedAESKey,
00569         byte[] encodedIVParameters, byte[] encodedClientMacKey,
00570         byte[] encodedLockData, byte[] parametersSignature)
00571         throws SecurityNotAvailableException, RenegotiateSessionException,
00572             IOException {
00573         try {
00574             enterInThreadStore();
00575             if (!isSecurityOn) {
00576                 throw new SecurityNotAvailableException();
00577             }
00578 
00579             byte[][] ske;
00580 
00581             renegociateSessionIfNeeded(sessionID);
00582 
00583             if (internalBodySecurity.isLocalBody()) {
00584                 //      System.out.println("secretKeyExchange demande un security manager a " + ProActive.getBodyOnThis());
00585                 ske = psm.secretKeyExchange(sessionID, encodedAESKey,
00586                         encodedIVParameters, encodedClientMacKey,
00587                         encodedLockData, parametersSignature);
00588 
00589                 return ske;
00590             } else {
00591                 ske = internalBodySecurity.secretKeyExchange(sessionID,
00592                         encodedAESKey, encodedIVParameters,
00593                         encodedClientMacKey, encodedLockData,
00594                         parametersSignature);
00595 
00596                 return ske;
00597             }
00598         } finally {
00599             threadStore.exit();
00600         }
00601     }
00602 
00603     public SecurityContext getPolicy(SecurityContext securityContext)
00604         throws SecurityNotAvailableException, IOException {
00605         try {
00606             enterInThreadStore();
00607             if (!isSecurityOn) {
00608                 throw new SecurityNotAvailableException();
00609             }
00610 
00611             if (internalBodySecurity.isLocalBody()) {
00612                 return psm.getPolicy(securityContext);
00613             } else {
00614                 return internalBodySecurity.getPolicy(securityContext);
00615             }
00616         } finally {
00617             exitFromThreadStore();
00618         }
00619     }
00620 
00621     public byte[] getCertificateEncoded()
00622         throws SecurityNotAvailableException, IOException {
00623         try {
00624             enterInThreadStore();
00625 
00626             //if (psm == null) {
00627             //    startDefaultProActiveSecurityManager();
00628             // }
00629             if (!isSecurityOn || (psm == null)) {
00630                 throw new SecurityNotAvailableException();
00631             }
00632 
00633             if (internalBodySecurity.isLocalBody()) {
00634                 return psm.getCertificate().getEncoded();
00635             } else {
00636                 return internalBodySecurity.getCertificatEncoded();
00637             }
00638         } catch (CertificateEncodingException e) {
00639             e.printStackTrace();
00640         } finally {
00641             exitFromThreadStore();
00642         }
00643         return null;
00644     }
00645 
00646     protected void startDefaultProActiveSecurityManager() {
00647         try {
00648             //     logger.info("starting a new psm ");
00649             this.psm = new DefaultProActiveSecurityManager("vide ");
00650             isSecurityOn = true;
00651             psm.setBody(this);
00652             internalBodySecurity = new InternalBodySecurity(null);
00653         } catch (Exception e) {
00654                 
00655             logger.error(
00656                 "Error when contructing a DefaultProActiveManager");
00657             e.printStackTrace();
00658         }
00659     }
00660 
00661     public ArrayList<Entity> getEntities()
00662         throws SecurityNotAvailableException, IOException {
00663         try {
00664             enterInThreadStore();
00665             if (!isSecurityOn) {
00666                 throw new SecurityNotAvailableException();
00667             }
00668             if (internalBodySecurity.isLocalBody()) {
00669                 return psm.getEntities();
00670             } else {
00671                 return internalBodySecurity.getEntities();
00672             }
00673         } finally {
00674             exitFromThreadStore();
00675         }
00676     }
00677 
00678     //
00679     // -- implements Body -----------------------------------------------
00680     //
00681     public void terminate() {
00682         if (isDead) {
00683             return;
00684         }
00685         isDead = true;
00686         activityStopped();
00687         this.remoteBody = null;
00688         // unblock is thread was block
00689         acceptCommunication();
00690     }
00691 
00692     public void blockCommunication() {
00693         threadStore.close();
00694     }
00695 
00696     public void acceptCommunication() {
00697         threadStore.open();
00698     }
00699 
00700     public void enterInThreadStore() {
00701         threadStore.enter();
00702     }
00703 
00704     public void exitFromThreadStore() {
00705         threadStore.exit();
00706     }
00707 
00708     public boolean isAlive() {
00709         return !isDead;
00710     }
00711 
00712     public boolean isActive() {
00713         return isActive;
00714     }
00715 
00716     public UniversalBody checkNewLocation(UniqueID bodyID) {
00717         //we look in the location table of the current JVM
00718         Body body = LocalBodyStore.getInstance().getLocalBody(bodyID);
00719         if (body != null) {
00720             // we update our table to say that this body is local
00721             location.updateBody(bodyID, body);
00722             return body;
00723         } else {
00724             //it was not found in this vm let's try the location table
00725             return location.getBody(bodyID);
00726         }
00727     }
00728 
00729     /*
00730      *
00731      * @see org.objectweb.proactive.Body#getShortcutTargetBody(org.objectweb.proactive.core.component.representative.ItfID)
00732      */
00733     public UniversalBody getShortcutTargetBody(
00734         ItfID functionalItfID) {
00735         if (shortcuts == null) {
00736             return null;
00737         } else {
00738             if (shortcuts.containsKey(functionalItfID)) {
00739                 return ((Shortcut) shortcuts.get(functionalItfID)).getShortcutTargetBody();
00740             } else {
00741                 return null;
00742             }
00743         }
00744     }
00745 
00746     public void setPolicyServer(PolicyServer server) {
00747         if (server != null) {
00748             if ((psm != null) && (psm.getPolicyServer() == null)) {
00749                 psm = new ProActiveSecurityManager(server);
00750                 isSecurityOn = true;
00751                 logger.debug("Security is on " + isSecurityOn);
00752                 psm.setBody(this);
00753             }
00754         }
00755     }
00756 
00757     //
00758     // -- implements LocalBody -----------------------------------------------
00759     //
00760     public FuturePool getFuturePool() {
00761         return localBodyStrategy.getFuturePool();
00762     }
00763 
00764     public BlockingRequestQueue getRequestQueue() {
00765         return localBodyStrategy.getRequestQueue();
00766     }
00767 
00768     public Object getReifiedObject() {
00769         return localBodyStrategy.getReifiedObject();
00770     }
00771 
00772     public String getName() {
00773         return localBodyStrategy.getName();
00774     }
00775 
00779     public void serve(Request request) {
00780         if (this.ftmanager != null) {
00781             this.ftmanager.onServeRequestBefore(request);
00782             localBodyStrategy.serve(request);
00783             this.ftmanager.onServeRequestAfter(request);
00784         } else {
00785             localBodyStrategy.serve(request);
00786         }
00787     }
00788 
00789     public void sendRequest(MethodCall methodCall, Future future,
00790         UniversalBody destinationBody)
00791         throws java.io.IOException, RenegotiateSessionException {
00792         long sessionID = 0;
00793 
00794         // Tag the outgoing request with the barrier tags
00795         if (!this.spmdManager.isTagsListEmpty()) {
00796             methodCall.setBarrierTags(this.spmdManager.getBarrierTags());
00797         }
00798 
00799         try {
00800             if (!isSecurityOn) {
00801                 bodyLogger.debug("security is off");
00802             } else {
00803                 try {
00804                     if (internalBodySecurity.isLocalBody()) {
00805                         byte[] certE = destinationBody.getCertificateEncoded();
00806 
00807                         X509Certificate cert = ProActiveSecurity.decodeCertificate(certE);
00808                         ProActiveLogger.getLogger(Loggers.SECURITY_BODY)
00809                                        .debug("send Request AbstractBody " +
00810                             this + ", method " + methodCall.getName() +
00811                             " cert " + cert.getSubjectDN() + " " +
00812                             cert.getPublicKey());
00813                         sessionID = psm.getSessionIDTo(cert);
00814                         if (sessionID == 0) {
00815                             psm.initiateSession(SecurityContext.COMMUNICATION_SEND_REQUEST_TO,
00816                                 destinationBody);
00817                             sessionID = this.psm.getSessionIDTo(cert);
00818                         }
00819                     }
00820                 } catch (SecurityNotAvailableException e) {
00821                     // do nothing 
00822                     bodyLogger.debug("communication without security");
00823                     //e.printStackTrace();
00824                 }
00825             }
00826 
00827             localBodyStrategy.sendRequest(methodCall, future, destinationBody);
00828         } catch (RenegotiateSessionException e) {
00829             if (e.getUniversalBody() != null) {
00830                 ProActiveLogger.getLogger(Loggers.SECURITY_CRYPTO)
00831                                .debug("renegotiate session " + sessionID);
00832                 updateLocation(destinationBody.getID(), e.getUniversalBody());
00833                 psm.terminateSession(sessionID);
00834                 sendRequest(methodCall, future, e.getUniversalBody());
00835             } else {
00836                 psm.terminateSession(sessionID);
00837                 sendRequest(methodCall, future, destinationBody);
00838             }
00839         } catch (CommunicationForbiddenException e) {
00840             bodyLogger.warn(e);
00841             //e.printStackTrace();
00842         } catch (AuthenticationException e) {
00843             e.printStackTrace();
00844         }
00845     }
00846 
00847     //    /**
00848     //     * Get information about the handlerizable object
00849     //     * @return information about the handlerizable object
00850     //     */
00851     //    public String getHandlerizableInfo() throws java.io.IOException {
00852     //        return "BODY (URL=" + this.nodeURL + ") of CLASS [" + this.getClass() +
00853     //        "]";
00854     //    }
00855     //
00856     //    /** Give a reference to a local map of handlers
00857     //     * @return A reference to a map of handlers
00858     //     */
00859     //    public HashMap getHandlersLevel() throws java.io.IOException {
00860     //        return bodyLevel;
00861     //    }
00862     //
00863     //    /**
00864     //     * Clear the local map of handlers
00865     //     */
00866     //    public void clearHandlersLevel() throws java.io.IOException {
00867     //        bodyLevel.clear();
00868     //    }
00869     //
00870     //    /** Set a new handler within the table of the Handlerizable Object
00871     //     * @param handler A handler associated with a class of non functional exception.
00872     //     * @param exception A class of non functional exception. It is a subclass of <code>NonFunctionalException</code>.
00873     //     */
00874     //    public void setExceptionHandler(Handler handler, Class exception)
00875     //        throws java.io.IOException {
00876     //        // add handler to body level
00877     //        if (bodyLevel == null) {
00878     //            bodyLevel = new HashMap();
00879     //        }
00880     //        bodyLevel.put(exception, handler);
00881     //    }
00882     //
00883     //    /** Remove a handler from the table of the Handlerizable Object
00884     //     * @param exception A class of non functional exception. It is a subclass of <code>NonFunctionalException</code>.
00885     //     * @return The removed handler or null
00886     //     */
00887     //    public Handler unsetExceptionHandler(Class exception)
00888     //        throws java.io.IOException {
00889     //        // remove handler from body level
00890     //        if (bodyLevel != null) {
00891     //            Handler handler = (Handler) bodyLevel.remove(exception);
00892     //            return handler;
00893     //        } else {
00894     //            if (logger.isDebugEnabled()) {
00895     //                logger.debug("[NFE_WARNING] No handler for [" +
00896     //                    exception.getName() + "] can be removed from BODY level");
00897     //            }
00898     //            return null;
00899     //        }
00900     //    }
00901     public Object receiveFTMessage(FTMessage fte) {
00902         // delegate to the FTManger
00903         Object res = null;
00904         if (this.ftmanager != null) {
00905             res = this.ftmanager.handleFTMessage(fte);
00906         }
00907         return res;
00908     }
00909 
00910     //
00911     // -- PROTECTED METHODS -----------------------------------------------
00912     //
00913 
00920     protected abstract int internalReceiveRequest(Request request)
00921         throws java.io.IOException, RenegotiateSessionException;
00922 
00928     protected abstract int internalReceiveReply(Reply reply)
00929         throws java.io.IOException;
00930 
00931     protected void setLocalBodyImpl(LocalBodyStrategy localBody) {
00932         localBodyStrategy = localBody;
00933     }
00934 
00938     protected void activityStopped() {
00939         if (!isActive) {
00940             return;
00941         }
00942         isActive = false;
00943         //We are no longer an active body
00944         LocalBodyStore.getInstance().unregisterBody(this);
00945     }
00946 
00947     //protected void activityStopped2(){
00948     //  LocalBodyStore.getInstance().unregisterBody(this);
00949     //}
00950 
00954     protected void activityStarted() {
00955         if (isActive) {
00956             return;
00957         }
00958         isActive = true;
00959         // we associated this body to the thread running it
00960         LocalBodyStore.getInstance().setCurrentThreadBody(this);
00961         // we register in this JVM
00962         LocalBodyStore.getInstance().registerBody(this);
00963     }
00964 
00969     public void setSPMDGroup(Object o) {
00970         this.spmdManager.setSPMDGroup(o);
00971     }
00972 
00977     public Object getSPMDGroup() {
00978         return this.spmdManager.getSPMDGroup();
00979     }
00980 
00985     public int getSPMDGroupSize() {
00986         return ProActiveGroup.size(this.getSPMDGroup());
00987     }
00988 
00993     public FTManager getFTManager() {
00994         return ftmanager;
00995     }
00996 
01001     public void setFTManager(FTManager ftm) {
01002         this.ftmanager = ftm;
01003     }
01004 
01005     //
01006     // -- SERIALIZATION METHODS -----------------------------------------------
01007     //
01008     private void writeObject(java.io.ObjectOutputStream out)
01009         throws java.io.IOException {
01010         out.defaultWriteObject();
01011     }
01012 
01013     private void readObject(java.io.ObjectInputStream in)
01014         throws java.io.IOException, ClassNotFoundException {
01015         this.gc = new GarbageCollector(this);
01016         in.defaultReadObject();
01017 
01018         // FAULT TOLERANCE
01019         if (this.ftmanager != null) {
01020             if (this.ftmanager.isACheckpoint()) {
01021                 //re-use remote view of the old body if any                
01022                 Body toKill = LocalBodyStore.getInstance()
01023                                             .getLocalBody(this.bodyID);
01024                 if (toKill != null) {
01025                     //this body is still alive
01026                     toKill.blockCommunication();
01027                     BodyAdapter ba = (BodyAdapter) (toKill.getRemoteAdapter());
01028                     ba.changeProxiedBody(this);
01029                     this.remoteBody = ba;
01030                     toKill.terminate();
01031                     toKill.acceptCommunication();
01032                 }
01033             }
01034         }
01035     }
01036 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1