org/objectweb/proactive/core/body/reply/ReplyImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.reply;
00032 
00033 import java.io.ByteArrayInputStream;
00034 import java.io.IOException;
00035 import java.io.ObjectInputStream;
00036 
00037 import org.objectweb.proactive.ProActive;
00038 import org.objectweb.proactive.core.UniqueID;
00039 import org.objectweb.proactive.core.body.LocalBodyStore;
00040 import org.objectweb.proactive.core.body.UniversalBody;
00041 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00042 import org.objectweb.proactive.core.body.future.FutureProxy;
00043 import org.objectweb.proactive.core.body.future.FutureResult;
00044 import org.objectweb.proactive.core.body.message.MessageImpl;
00045 import org.objectweb.proactive.core.mop.StubObject;
00046 import org.objectweb.proactive.core.mop.Utils;
00047 import org.objectweb.proactive.ext.locationserver.LocationServer;
00048 import org.objectweb.proactive.ext.locationserver.LocationServerFactory;
00049 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00050 import org.objectweb.proactive.ext.security.SecurityContext;
00051 import org.objectweb.proactive.ext.security.crypto.AuthenticationException;
00052 import org.objectweb.proactive.ext.security.crypto.Session;
00053 import org.objectweb.proactive.ext.security.exceptions.CommunicationForbiddenException;
00054 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00055 import org.objectweb.proactive.ext.security.exceptions.SecurityNotAvailableException;
00056 
00057 public class ReplyImpl extends MessageImpl implements Reply,
00058                 java.io.Serializable {
00059 
00063         protected FutureResult result;
00064 
00065         // security features
00066 
00070         protected byte[][] encryptedResult;
00071 
00072         protected boolean ciphered;
00073 
00074         // true if this reply is sent by automatic continuation
00075         private boolean isAC;
00076 
00077         /*
00078          * the session ID used to find the key and decrypt the reply
00079          */
00080         protected long sessionID;
00081 
00082         protected transient ProActiveSecurityManager psm = null;
00083 
00087         private static final int MAX_TRIES = 15;
00088 
00089         transient protected LocationServer server;
00090 
00091         public ReplyImpl(UniqueID senderID, long sequenceNumber, String methodName,
00092                         FutureResult result, ProActiveSecurityManager psm) {
00093                 super(senderID, sequenceNumber, true, methodName);
00094                 this.result = result;
00095                 this.psm = psm;
00096                 this.isAC = false;
00097         }
00098 
00099         public ReplyImpl(UniqueID senderID, long sequenceNumber, String methodName,
00100                         FutureResult result, ProActiveSecurityManager psm,
00101                         boolean isAutomaticContinuation) {
00102                 this(senderID, sequenceNumber, methodName, result, psm);
00103                 this.isAC = isAutomaticContinuation;
00104         }
00105 
00106         public FutureResult getResult() {
00107                 return result;
00108         }
00109 
00110         public int send(UniversalBody destinationBody) throws IOException {
00111                 // if destination body is on the same VM that the sender, we must
00112                 // perform
00113                 // a deep copy of result in order to preserve ProActive model.
00114                 UniqueID destinationID = destinationBody.getID();
00115                 boolean isLocal = ((LocalBodyStore.getInstance().getLocalBody(
00116                                 destinationID) != null) || (LocalBodyStore.getInstance()
00117                                 .getLocalHalfBody(destinationID) != null));
00118 
00119                 if (isLocal) {
00120                         result = (FutureResult) Utils.makeDeepCopy(result);
00121                 }
00122 
00123                 // security
00124                 if (!ciphered && (psm != null)) {
00125                         long sessionID = 0;
00126 
00127                         try {
00128                                 sessionID = psm
00129                                                 .getSessionIDTo(destinationBody.getCertificate());
00130 
00131                                 if (sessionID == 0) {
00132                                         psm.initiateSession(
00133                                                         SecurityContext.COMMUNICATION_SEND_REPLY_TO,
00134                                                         destinationBody);
00135                                         sessionID = psm.getSessionIDTo(destinationBody
00136                                                         .getCertificate());
00137                                 }
00138 
00139                                 if (sessionID != 0) {
00140                                         encryptedResult = psm.encrypt(sessionID, result,
00141                                                         Session.ACT_AS_SERVER);
00142                                         ciphered = true;
00143                                         this.sessionID = sessionID;
00144                                 }
00145                         } catch (SecurityNotAvailableException e) {
00146                                 // do nothing
00147                         } catch (CommunicationForbiddenException e) {
00148                                 e.printStackTrace();
00149                         } catch (AuthenticationException e) {
00150                                 e.printStackTrace();
00151                         } catch (RenegotiateSessionException e) {
00152                                 psm.terminateSession(sessionID);
00153                                 try {
00154                                         destinationBody.terminateSession(sessionID);
00155                                 } catch (SecurityNotAvailableException e1) {
00156                                         e.printStackTrace();
00157                                 }
00158                                 this.send(destinationBody);
00159                         }
00160                 }
00161 
00162                 // end security
00163                 // fault-tolerance returned value
00164                 int ftres = FTManager.NON_FT;
00165                 try {
00166                         ftres = destinationBody.receiveReply(this);
00167                 } catch (Exception ex) {
00168                         this.backupSolution(destinationBody);
00169                 }
00170 
00171                 return ftres;
00172         }
00173 
00174         // security issue
00175         public boolean isCiphered() {
00176                 return ciphered;
00177         }
00178 
00179         public boolean decrypt(ProActiveSecurityManager psm)
00180                         throws RenegotiateSessionException {
00181                 if ((sessionID != 0) && ciphered) {
00182                         byte[] decryptedMethodCall = psm.decrypt(sessionID,
00183                                         encryptedResult, Session.ACT_AS_CLIENT);
00184                         try {
00185                                 ByteArrayInputStream bin = new ByteArrayInputStream(
00186                                                 decryptedMethodCall);
00187                                 ObjectInputStream in = new ObjectInputStream(bin);
00188                                 result = (FutureResult) in.readObject();
00189                                 in.close();
00190                                 return true;
00191                         } catch (Exception e) {
00192                                 e.printStackTrace();
00193                         }
00194                 }
00195 
00196                 return false;
00197         }
00198 
00199         /*
00200          * (non-Javadoc)
00201          * 
00202          * @see org.objectweb.proactive.core.body.reply.Reply#getSessionId()
00203          */
00204         public long getSessionId() {
00205                 return sessionID;
00206         }
00207 
00211         public boolean isAutomaticContinuation() {
00212                 return this.isAC;
00213         }
00214 
00222         protected void backupSolution(UniversalBody destinationBody)
00223                         throws java.io.IOException {
00224                 int tries = 0;
00225                 // get the new location from the server
00226                 UniqueID bodyID = destinationBody.getID();
00227                 while (tries < MAX_TRIES) {
00228                         UniversalBody remoteBody = null;
00229                         UniversalBody mobile = queryServer(bodyID);
00230 
00231                         // we want to bypass the stub/proxy
00232                         remoteBody = (UniversalBody) ((FutureProxy) ((StubObject) mobile)
00233                                         .getProxy()).getResult();
00234 
00235                         try {
00236                                 remoteBody.receiveReply(this);
00237 
00238                                 return;
00239                         } catch (Exception e) {
00240 
00241                                 tries++;
00242 
00243                                 if (tries == MAX_TRIES) {
00244                                         throw new IOException(e.getMessage());
00245                                 }
00246                         }
00247 
00248                 }
00249         }
00250 
00251         protected UniversalBody queryServer(UniqueID bodyID) {
00252                 if (server == null) {
00253                         server = LocationServerFactory.getLocationServer();
00254                 }
00255                 UniversalBody mobile = (UniversalBody) server.searchObject(bodyID);
00256                 ProActive.waitFor(mobile);
00257                 return mobile;
00258         }
00259 }

Generated on Mon Jan 22 15:16:06 2007 for ProActive by  doxygen 1.5.1