org/objectweb/proactive/core/body/request/RequestImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.request;
00032 
00033 import java.io.ByteArrayInputStream;
00034 import java.io.IOException;
00035 import java.io.StreamCorruptedException;
00036 import java.security.cert.X509Certificate;
00037 
00038 import org.apache.log4j.Logger;
00039 import org.objectweb.proactive.Body;
00040 import org.objectweb.proactive.ProActive;
00041 import org.objectweb.proactive.core.UniqueID;
00042 import org.objectweb.proactive.core.body.AbstractBody;
00043 import org.objectweb.proactive.core.body.LocalBodyStore;
00044 import org.objectweb.proactive.core.body.UniversalBody;
00045 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00046 import org.objectweb.proactive.core.body.future.FutureProxy;
00047 import org.objectweb.proactive.core.body.future.FutureResult;
00048 import org.objectweb.proactive.core.body.message.MessageImpl;
00049 import org.objectweb.proactive.core.body.reply.Reply;
00050 import org.objectweb.proactive.core.body.reply.ReplyImpl;
00051 import org.objectweb.proactive.core.exceptions.proxy.ProxyNonFunctionalException;
00052 import org.objectweb.proactive.core.mop.MethodCall;
00053 import org.objectweb.proactive.core.mop.MethodCallExecutionFailedException;
00054 import org.objectweb.proactive.core.mop.StubObject;
00055 import org.objectweb.proactive.core.util.log.Loggers;
00056 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00057 import org.objectweb.proactive.ext.locationserver.LocationServer;
00058 import org.objectweb.proactive.ext.locationserver.LocationServerFactory;
00059 import org.objectweb.proactive.ext.security.ProActiveSecurity;
00060 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00061 import org.objectweb.proactive.ext.security.crypto.Session;
00062 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00063 import org.objectweb.proactive.ext.security.exceptions.SecurityNotAvailableException;
00064 
00065 import sun.rmi.server.MarshalInputStream;
00066 
00067 
00068 public class RequestImpl extends MessageImpl implements Request,
00069     java.io.Serializable {
00070     public static Logger logger = ProActiveLogger.getLogger(Loggers.REQUESTS);
00071     public static Logger loggerNFE = ProActiveLogger.getLogger(Loggers.NFE);
00072     protected MethodCall methodCall;
00073     protected boolean ciphered;
00074 
00078     protected int sendCounter;
00079 
00082     protected transient UniversalBody sender;
00083     private byte[][] methodCallCiphered;
00084     public long sessionID;
00085     protected String codebase;
00086     private static Boolean enableStackTrace;
00087     private StackTraceElement[] stackTrace;
00088 
00089     //Non Functional requests
00090     protected boolean isNFRequest = false;
00091     protected int nfRequestPriority;
00092     
00096         private static final int MAX_TRIES = 15;
00097 
00098         transient protected LocationServer server;
00099 
00100     
00101     //
00102     // -- CONSTRUCTORS -----------------------------------------------
00103     //
00104     
00105     
00106     // Constructor of simple requests
00107     public RequestImpl(MethodCall methodCall, UniversalBody sender,
00108         boolean isOneWay, long nextSequenceID) {
00109         super(sender.getID(), nextSequenceID, isOneWay, methodCall.getName());
00110         this.methodCall = methodCall;
00111         this.sender = sender;
00112         this.isNFRequest = false;
00113         
00114         if (enableStackTrace == null) {
00115 
00116             /* First time */
00117             enableStackTrace = new Boolean(!"false".equals(System.getProperty(
00118                             "proactive.stack_trace")));
00119         }
00120         if (enableStackTrace.booleanValue()) {
00121             this.stackTrace = new Exception().getStackTrace();
00122         }
00123     }
00124 
00125     // Constructor of non functional requests without priority
00126     public RequestImpl(MethodCall methodCall, UniversalBody sender,
00127             boolean isOneWay, long nextSequenceID, boolean isNFRequest) {
00128         super(sender.getID(), nextSequenceID, isOneWay, methodCall.getName());
00129         this.methodCall = methodCall;
00130         this.sender = sender;
00131         this.isNFRequest = isNFRequest;
00132         this.nfRequestPriority = Request.NFREQUEST_NO_PRIORITY;
00133         
00134         if (enableStackTrace == null) {
00135 
00136             /* First time */
00137             enableStackTrace = new Boolean(!"false".equals(System.getProperty(
00138                             "proactive.stack_trace")));
00139         }
00140         if (enableStackTrace.booleanValue()) {
00141             this.stackTrace = new Exception().getStackTrace();
00142         }
00143     }
00144     
00145     // Constructor of non functional requests with priority
00146     public RequestImpl(MethodCall methodCall, UniversalBody sender,
00147             boolean isOneWay, long nextSequenceID, boolean isNFRequest, int nfRequestPriority) {
00148         super(sender.getID(), nextSequenceID, isOneWay, methodCall.getName());
00149         this.methodCall = methodCall;
00150         this.sender = sender;
00151         this.isNFRequest = isNFRequest;
00152         this.nfRequestPriority = nfRequestPriority;
00153         
00154         if (enableStackTrace == null) {
00155 
00156             /* First time */
00157             enableStackTrace = new Boolean(!"false".equals(System.getProperty(
00158                             "proactive.stack_trace")));
00159         }
00160         if (enableStackTrace.booleanValue()) {
00161             this.stackTrace = new Exception().getStackTrace();
00162         }
00163     }
00164     
00165     //
00166     // -- PUBLIC METHODS -----------------------------------------------
00167     //
00168     //
00169     // -- Implements Request -----------------------------------------------
00170     //
00171     public int send(UniversalBody destinationBody)
00172         throws java.io.IOException, RenegotiateSessionException {
00173         //System.out.println("RequestSender: sendRequest  " + methodName + " to destination");
00174         sendCounter++;
00175         return sendRequest(destinationBody);
00176     }
00177 
00178     public UniversalBody getSender() {
00179         return sender;
00180     }
00181 
00182     public Reply serve(Body targetBody) throws ServeException {
00183         if (logger.isDebugEnabled()) {
00184             logger.debug("Serving " + this.getMethodName());
00185         }
00186         FutureResult result = serveInternal(targetBody);
00187         if (logger.isDebugEnabled()) {
00188             logger.debug("result: " + result);
00189         }
00190         if (isOneWay) { // || (sender == null)) {
00191             return null;
00192         }
00193         result.augmentException(stackTrace);
00194         stackTrace = null;
00195         return createReply(targetBody, result);
00196     }
00197 
00198     public Reply serveAlternate(Body targetBody, ProxyNonFunctionalException nfe) {
00199         if (loggerNFE.isDebugEnabled()) {
00200             loggerNFE.debug("*** Serving an alternate version of " +
00201                 this.getMethodName());
00202             if (nfe != null) {
00203                 loggerNFE.debug("*** Result  " + nfe.getClass().getName());
00204             } else {
00205                 loggerNFE.debug("*** Result null");
00206             }
00207         }
00208         if (isOneWay) { // || (sender == null)) {
00209             return null;
00210         }
00211         return createReply(targetBody, new FutureResult(null, null, nfe));
00212     }
00213 
00214     public boolean hasBeenForwarded() {
00215         return sendCounter > 1;
00216     }
00217 
00218     public void resetSendCounter() {
00219         this.sendCounter = 0;
00220     }
00221 
00222     public Object getParameter(int index) {
00223         return methodCall.getParameter(index);
00224     }
00225 
00226     public MethodCall getMethodCall() {
00227         return methodCall;
00228     }
00229 
00230     public void notifyReception(UniversalBody bodyReceiver)
00231         throws java.io.IOException {
00232         if (!hasBeenForwarded()) {
00233             return;
00234         }
00235 
00236         //System.out.println("the request has been forwarded times");
00237         //we know c.res is a remoteBody since the call has been forwarded
00238         //if it is null, this is a one way call
00239         if (sender != null) {
00240             sender.updateLocation(bodyReceiver.getID(),
00241                 bodyReceiver.getRemoteAdapter());
00242         }
00243     }
00244 
00245     //
00246     // -- PROTECTED METHODS -----------------------------------------------
00247     //
00248     protected FutureResult serveInternal(Body targetBody)
00249         throws ServeException {
00250         Object result = null;
00251         Throwable exception = null;
00252         try {
00253             //loggerNFE.warn("CALL to " + targetBody);
00254             result = methodCall.execute(targetBody.getReifiedObject());
00255         } catch (MethodCallExecutionFailedException e) {
00256             // e.printStackTrace();
00257             throw new ServeException("serve method " +
00258                 methodCall.getReifiedMethod().toString() + " failed", e);
00259         } catch (java.lang.reflect.InvocationTargetException e) {
00260             exception = e.getTargetException();
00261             if (isOneWay) {
00262                 throw new ServeException("serve method " +
00263                     methodCall.getReifiedMethod().toString() + " failed",
00264                     exception);
00265             }
00266         }
00267 
00268         return new FutureResult(result, exception, null);
00269     }
00270 
00271     protected Reply createReply(Body targetBody, FutureResult result) {
00272         ProActiveSecurityManager psm = null;
00273         try {
00274             psm = ((AbstractBody) ProActive.getBodyOnThis()).getProActiveSecurityManager();
00275         } catch (java.io.IOException e) {
00276             e.printStackTrace();
00277         } catch (SecurityNotAvailableException e) {
00278             // do nothing
00279         }
00280 
00281         return new ReplyImpl(targetBody.getID(), sequenceNumber, methodName,
00282             result, psm);
00283     }
00284 
00285     public boolean crypt(ProActiveSecurityManager psm,
00286         UniversalBody destinationBody) throws RenegotiateSessionException {
00287         try {
00288             if (logger.isDebugEnabled()) {
00289                 ProActiveLogger.getLogger(Loggers.SECURITY_REQUEST).debug(" sending request " +
00290                     methodCall.getName());
00291             }
00292             if (!ciphered && !hasBeenForwarded()) {
00293                 sessionID = 0;
00294 
00295                 if (sender == null) {
00296                     logger.warn("sender is null but why ?");
00297                 }
00298 
00299                 byte[] certE = destinationBody.getCertificateEncoded();
00300                 X509Certificate cert = ProActiveSecurity.decodeCertificate(certE);
00301                 sessionID = psm.getSessionIDTo(cert);
00302                 if (sessionID != 0) {
00303                     methodCallCiphered = psm.encrypt(sessionID, methodCall,
00304                             Session.ACT_AS_CLIENT);
00305                     ciphered = true;
00306                     methodCall = null;
00307                     if (logger.isDebugEnabled()) {
00308                         ProActiveLogger.getLogger(Loggers.SECURITY_REQUEST)
00309                                        .debug("methodcallciphered " +
00310                             methodCallCiphered + ", ciphered " + ciphered +
00311                             ", methodCall " + methodCall);
00312                     }
00313                 }
00314             }
00315         } catch (SecurityNotAvailableException e) {
00316             // do nothing
00317             //  e.printStackTrace();
00318             logger.debug("Request : security disabled");
00319         } catch (IOException e) {
00320             e.printStackTrace();
00321         }
00322 
00323         return true;
00324     }
00325 
00326     protected int sendRequest(UniversalBody destinationBody)
00327         throws java.io.IOException, RenegotiateSessionException {
00328         try {
00329             this.crypt(((AbstractBody) ProActive.getBodyOnThis()).getProActiveSecurityManager(),
00330                 destinationBody);
00331         } catch (SecurityNotAvailableException e) {
00332             //todo remove SecurityNotAvalaible e.printStackTrace();
00333         }
00334 
00335         int ftres = FTManager.NON_FT;
00336         try {
00337                         ftres = destinationBody.receiveRequest(this);
00338                 } catch (Exception e) {
00339                         /*
00340                          * The exception is catched, we have to try 
00341                          * our backupSolution 
00342                          */
00343                         try {
00344                                 backupSolution(destinationBody);
00345                         }catch (IOException ioex)
00346                         {
00347                                 throw new IOException(e.getMessage()+ioex.getMessage());
00348                         }
00349                 }
00350 
00351                 if (logger.isDebugEnabled()) {
00352             logger.debug(" sending request finished");
00353         }
00354         return ftres;
00355     }
00356 
00360         protected void backupSolution(UniversalBody destinationBody)
00361                         throws java.io.IOException {
00362                 int tries = 0;
00363                 
00364                 UniqueID bodyID = destinationBody.getID();
00365                 while (tries < MAX_TRIES) {
00366                         UniversalBody remoteBody = null;
00367                         try {
00368                                 // get the new location from the server
00369                                 UniversalBody mobile = queryServer(bodyID);
00370 
00371                                 // we want to bypass the stub/proxy
00372                                 FutureProxy futureProxy = (FutureProxy) ((StubObject) mobile)
00373                                 .getProxy();
00374                                 remoteBody = (UniversalBody) futureProxy.getResult();
00375                                 
00376                                 if (remoteBody == null)
00377                                         throw new IOException("remoteBody is null");
00378                                 
00379                                 remoteBody.receiveRequest(this);
00380 
00381                                 // everything went fine, we have to update the current location
00382                                 // of the object
00383                                 // so that next requests don't go through the server
00384                                 if (sender != null) {
00385                                         sender.updateLocation(bodyID, remoteBody);
00386                                 } else {
00387                                         LocalBodyStore.getInstance()
00388                                                         .getLocalBody(getSourceBodyID()).updateLocation(
00389                                                                         bodyID, remoteBody);
00390                                 }
00391                                 return;
00392                         } catch (Exception e) {
00393                                 
00394                                 tries++;
00395 
00396                                 /*
00397                                  * The location server has been perform to block the request if
00398                                  * it has to send the same location of the object so we don't
00399                                  * have to wait between two requests 
00400                                  * try { Thread.sleep(500); }
00401                                  * catch (InterruptedException e1) { e1.printStackTrace(); }
00402                                  */
00403                                 if(tries == MAX_TRIES){
00404                                         logger.error("FAILED = " + " for method " + methodName
00405                                                         + " exception :" + e.getClass().getName());
00406                                 
00407                                         throw new IOException("FAILED = " + " for method " + methodName
00408                                                         + " exception :" + e.getClass().getName()+"("+e.getMessage()+")");
00409                                 }
00410                         }
00411                 }
00412         }
00413 
00414         protected UniversalBody queryServer(UniqueID bodyID) throws IOException {
00415                 if (server == null) {
00416                         server = LocationServerFactory.getLocationServer();
00417                 }
00418                 if (server == null){
00419                         throw new IOException("No server found");
00420                 }
00421                 UniversalBody mobile = (UniversalBody) server.searchObject(bodyID);
00422                 //logger.info("backupSolution() server has sent an answer");
00423                 ProActive.waitFor(mobile);
00424                 return mobile;
00425         }
00426 
00427     // security issue
00428     public boolean isCiphered() {
00429         return ciphered;
00430     }
00431 
00432     public boolean decrypt(ProActiveSecurityManager psm)
00433         throws RenegotiateSessionException {
00434         //  String localCodeBase = null;
00435         //     if (ciphered) {
00436         ProActiveLogger.getLogger(Loggers.SECURITY_REQUEST).debug(" RequestImpl " +
00437             sessionID + " decrypt : methodcallciphered " + methodCallCiphered +
00438             ", ciphered " + ciphered + ", methodCall " + methodCall);
00439 
00440         if ((ciphered) && (psm != null)) {
00441             try {
00442                 ProActiveLogger.getLogger(Loggers.SECURITY_REQUEST).debug("ReceiveRequest : this body is " +
00443                     psm.getCertificate().getSubjectDN() + " " +
00444                     psm.getCertificate().getPublicKey());
00445                 byte[] decryptedMethodCall = psm.decrypt(sessionID,
00446                         methodCallCiphered, Session.ACT_AS_SERVER);
00447 
00448                 //ProActiveLogger.getLogger("security.request").debug("ReceiveRequest :method call apres decryption : " +  ProActiveSecurityManager.displayByte(decryptedMethodCall));
00449                 ByteArrayInputStream bin = new ByteArrayInputStream(decryptedMethodCall);
00450                 MarshalInputStream in = new MarshalInputStream(bin);
00451 
00452                 // ObjectInputStream in = new ObjectInputStream(bin);
00453                 methodCall = (MethodCall) in.readObject();
00454                 in.close();
00455                 ciphered = false;
00456 
00457                 //  logger.info("After decoding method call  seq id " +sequenceNumber + ":" + ciphered + ":" + sessionID + "  "+ methodCall + ":" +methodCallCiphered);
00458                 return true;
00459             } catch (ClassNotFoundException e) {
00460                 int index = e.toString().indexOf(':');
00461                 String className = e.toString().substring(index).trim();
00462                 className = className.substring(2);
00463 
00464                 //                                      //              try {
00465                 //  MOPClassLoader currentClassLoader = org.objectweb.proactive.core.mop.MOPClassLoader.createMOPClassLoader();
00466                 // this.getClass().getClassLoader().loadClass(className);
00467                 //    currentClassLoader.loadClass(className);  
00468                 this.decrypt(psm);
00469 
00470                 //              } catch (ClassNotFoundException ex) {
00471                 //              e.printStackTrace();
00472                 //      }
00473             } catch (StreamCorruptedException e) {
00474                 e.printStackTrace();
00475             } catch (IOException e) {
00476                 // hum something wrong during decryption, trying with a new session
00477                 throw new RenegotiateSessionException("");
00478             }
00479 
00480             //    System.setProperty("java.rmi.server.codebase",localCodeBase);
00481         }
00482 
00483         return false;
00484     }
00485 
00486     /* (non-Javadoc)
00487      * @see org.objectweb.proactive.core.body.request.Request#getSessionId()
00488      */
00489     public long getSessionId() {
00490         return sessionID;
00491     }
00492 
00493     //
00494     // -- PRIVATE METHODS FOR SERIALIZATION -----------------------------------------------
00495     //
00496     private void writeObject(java.io.ObjectOutputStream out)
00497         throws java.io.IOException {
00498         out.defaultWriteObject();
00499         out.writeObject(sender.getRemoteAdapter());
00500     }
00501 
00502     private void readObject(java.io.ObjectInputStream in)
00503         throws java.io.IOException, ClassNotFoundException {
00504         in.defaultReadObject();
00505         sender = (UniversalBody) in.readObject(); // it is actually a UniversalBody
00506     }
00507     
00508     //
00509     // -- METHODS DEALING WITH NON FUNCTIONAL REQUESTS
00510     //
00511     
00512         public boolean isFunctionalRequest() {
00513                 return isNFRequest;
00514         }
00515 
00516         public void setFunctionalRequest(boolean isFunctionalRequest) {
00517            this.isNFRequest = isFunctionalRequest; 
00518         }
00519 
00520         public void setNFRequestPriority(int nfReqPriority) {
00521                 this.nfRequestPriority = nfReqPriority;
00522         }
00523 
00524         public int getNFRequestPriority() {
00525                 return nfRequestPriority;
00526         }
00527 }

Generated on Mon Jan 22 15:16:06 2007 for ProActive by  doxygen 1.5.1