org/objectweb/proactive/core/body/future/FutureProxy.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.future;
00032 
00033 import java.lang.reflect.InvocationTargetException;
00034 import java.util.ArrayList;
00035 import java.util.HashMap;
00036 
00037 import org.objectweb.proactive.Body;
00038 import org.objectweb.proactive.ProActive;
00039 import org.objectweb.proactive.core.Constants;
00040 import org.objectweb.proactive.core.ProActiveException;
00041 import org.objectweb.proactive.core.ProActiveRuntimeException;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.LocalBodyStore;
00044 import org.objectweb.proactive.core.body.UniversalBody;
00045 import org.objectweb.proactive.core.body.proxy.AbstractProxy;
00046 import org.objectweb.proactive.core.event.FutureEvent;
00047 import org.objectweb.proactive.core.exceptions.manager.ExceptionHandler;
00048 import org.objectweb.proactive.core.exceptions.manager.ExceptionMaskLevel;
00049 import org.objectweb.proactive.core.exceptions.manager.NFEManager;
00050 import org.objectweb.proactive.core.exceptions.proxy.FutureTimeoutException;
00051 import org.objectweb.proactive.core.exceptions.proxy.ProxyNonFunctionalException;
00052 import org.objectweb.proactive.core.mop.ConstructionOfReifiedObjectFailedException;
00053 import org.objectweb.proactive.core.mop.ConstructorCall;
00054 import org.objectweb.proactive.core.mop.MOP;
00055 import org.objectweb.proactive.core.mop.MethodCall;
00056 import org.objectweb.proactive.core.mop.MethodCallExecutionFailedException;
00057 import org.objectweb.proactive.core.mop.Proxy;
00058 import org.objectweb.proactive.core.mop.StubObject;
00059 
00060 
00068 public class FutureProxy implements Future, Proxy, java.io.Serializable {
00069     //
00070     // -- STATIC MEMBERS -----------------------------------------------
00071     //
00072 
00076     public static final int RECYCLE_POOL_SIZE = 1000;
00077     private static FutureProxy[] recyclePool;
00078 
00082     private static boolean shouldPoolFutureProxyObjects;
00083     private static int index;
00084 
00087     private static FutureEventProducerImpl futureEventProducer;
00088 
00089     //
00090     // -- PROTECTED MEMBERS -----------------------------------------------
00091     //
00092 
00096     protected FutureResult target;
00097 
00103     protected boolean migration;
00104 
00108     protected boolean continuation;
00109 
00113     protected UniqueID creatorID;
00114 
00119     protected long ID;
00120 
00124     protected UniqueID senderID;
00125 
00129     protected HashMap futureLevel = null;
00130 
00135     private ExceptionMaskLevel exceptionLevel;
00136 
00142     private transient AbstractProxy originatingProxy;
00143 
00148     protected static long futureMaxDelay = -1;
00149 
00150     //
00151     // -- CONSTRUCTORS -----------------------------------------------
00152     //
00153 
00159     public FutureProxy() throws ConstructionOfReifiedObjectFailedException {
00160     }
00161 
00167     public FutureProxy(ConstructorCall c, Object[] p)
00168         throws ConstructionOfReifiedObjectFailedException {
00169         // we don't care what the arguments are
00170         this();
00171     }
00172 
00173     //
00174     // -- PUBLIC STATIC METHODS -----------------------------------------------
00175     //
00176 
00181     public static boolean isAwaited(Object obj) {
00182         return ProActive.isAwaited(obj);
00183     }
00184 
00185     public synchronized static FutureProxy getFutureProxy() {
00186         FutureProxy result;
00187         if (shouldPoolFutureProxyObjects && (index > 0)) {
00188             // gets the object from the pool
00189             index--;
00190             result = recyclePool[index];
00191             recyclePool[index] = null;
00192         } else {
00193             try {
00194                 result = new FutureProxy();
00195             } catch (ConstructionOfReifiedObjectFailedException e) {
00196                 result = null;
00197             }
00198         }
00199         return result;
00200     }
00201 
00204     public static FutureEventProducer getFutureEventProducer() {
00205         if (futureEventProducer == null) {
00206             futureEventProducer = new FutureEventProducerImpl();
00207         }
00208         return futureEventProducer;
00209     }
00210 
00211     //
00212     // -- PUBLIC METHODS -----------------------------------------------
00213     //
00214     public boolean equals(Object obj) {
00215         //we test if we have a future object
00216         if (isFutureObject(obj)) {
00217             return (((StubObject) obj).getProxy().hashCode() == this.hashCode());
00218         }
00219         return false;
00220     }
00221 
00222     //
00223     // -- Implements Future -----------------------------------------------
00224     //
00225 
00234     public synchronized void receiveReply(FutureResult obj)
00235         throws java.io.IOException {
00236         if (isAvailable()) {
00237             throw new java.io.IOException(
00238                 "FutureProxy receives a reply and this target field is not null");
00239         }
00240         target = obj;
00241         ExceptionHandler.addResult(this);
00242         ProxyNonFunctionalException nfe = target.getNFE();
00243         if (nfe != null) {
00244             NFEManager.fireNFE(nfe, originatingProxy);
00245         }
00246 
00247         originatingProxy = null;
00248         this.notifyAll();
00249     }
00250 
00256     public synchronized Throwable getRaisedException() {
00257         waitFor();
00258         return target.getExceptionToRaise();
00259     }
00260 
00264     public boolean isAvailable() {
00265         return target != null;
00266     }
00267 
00268     public FutureResult getFutureResult() {
00269         return target;
00270     }
00271 
00276     public synchronized Object getResult() {
00277         waitFor();
00278         return target.getResult();
00279     }
00280 
00281     public synchronized void setResult(Object o) {
00282         target = new FutureResult(o, null, null);
00283     }
00284 
00289     public synchronized boolean isAwaited() {
00290         return !isAvailable();
00291     }
00292 
00296     public synchronized void waitFor() {
00297         if (futureMaxDelay == -1) {
00298 
00299             /* First time, hopefully the configuration file has been read */
00300             try {
00301                 futureMaxDelay = Long.parseLong(System.getProperty(
00302                             "proactive.future.maxdelay"));
00303             } catch (IllegalArgumentException iea) {
00304 
00305                 /* The property is not set, that's not a problem */
00306                 futureMaxDelay = 0;
00307             }
00308         }
00309 
00310         try {
00311             waitFor(futureMaxDelay);
00312         } catch (ProActiveException e) {
00313             ProxyNonFunctionalException nfe = new FutureTimeoutException(
00314                     "Exception after waiting for " + futureMaxDelay + "ms", e);
00315 
00316             target = new FutureResult(null, null, nfe);
00317             notifyAll();
00318         }
00319     }
00320 
00326     public synchronized void waitFor(long timeout) throws ProActiveException {
00327         if (isAvailable()) {
00328             return;
00329         }
00330 
00331         UniqueID id = null;
00332 
00333         // send WAIT_BY_NECESSITY event to listeners if there are any
00334         if (futureEventProducer != null) {
00335             id = ProActive.getBodyOnThis().getID();
00336             if (LocalBodyStore.getInstance().getLocalBody(id) != null) {
00337                 // send event only if ActiveObject, not for HalfBodies
00338                 futureEventProducer.notifyListeners(id, getCreatorID(),
00339                     FutureEvent.WAIT_BY_NECESSITY);
00340             } else {
00341                 id = null;
00342             }
00343         }
00344         int timeoutCounter = 1;
00345         while (!isAvailable()) {
00346             timeoutCounter--;
00347             // counter < 0 means that it is the second time we enter in the loop
00348             // while still not available, i.e timeout has expired
00349             if (timeoutCounter < 0) {
00350                 throw new ProActiveException(
00351                     "Timeout expired while waiting for the future update");
00352             }
00353             try {
00354                 this.wait(timeout);
00355             } catch (InterruptedException e) {
00356                 e.printStackTrace();
00357             }
00358         }
00359 
00360         // send RECEIVED_FUTURE_RESULT event to listeners if there are any
00361         if (id != null) {
00362             futureEventProducer.notifyListeners(id, getCreatorID(),
00363                 FutureEvent.RECEIVED_FUTURE_RESULT);
00364         }
00365     }
00366 
00367     public long getID() {
00368         return ID;
00369     }
00370 
00371     public void setID(long l) {
00372         ID = l;
00373     }
00374 
00375     public void setCreatorID(UniqueID i) {
00376         creatorID = i;
00377     }
00378 
00379     public UniqueID getCreatorID() {
00380         return creatorID;
00381     }
00382 
00383     public void setSenderID(UniqueID i) {
00384         senderID = i;
00385     }
00386 
00387     public void setOriginatingProxy(AbstractProxy p) {
00388         originatingProxy = p;
00389     }
00390 
00391     //
00392     // -- Implements Proxy -----------------------------------------------
00393     //
00394 
00409     public Object reify(MethodCall c) throws InvocationTargetException {
00410         Object result = null;
00411 
00412         //stem.out.println("FutureProxy: c.getName() = " +c.getName());
00413         //              if ((c.getName()).equals("equals") || (c.getName()).equals("hashCode")) {
00414         //                      //System.out.println("FutureProxy: now executing " + c.getName());
00415         //                      try {
00416         //                              result = c.execute(this);
00417         //                      } catch (MethodCallExecutionFailedException e) {
00418         //                              throw new ProActiveRuntimeException("FutureProxy: Illegal arguments in call " + c.getName());
00419         //                      }
00420         //                      return result;
00421         //              }
00422         waitFor();
00423 
00424         // Now that the object is available, execute the call
00425         Object resultObject = target.getResult();
00426         try {
00427             result = c.execute(resultObject);
00428         } catch (MethodCallExecutionFailedException e) {
00429             throw new ProActiveRuntimeException(
00430                 "FutureProxy: Illegal arguments in call " + c.getName());
00431         }
00432 
00433         // If target of this future is another future, make a shortcut !
00434         if (resultObject instanceof StubObject) {
00435             Proxy p = ((StubObject) resultObject).getProxy();
00436             if (p instanceof FutureProxy) {
00437                 target = ((FutureProxy) p).target;
00438             }
00439         }
00440 
00441         return result;
00442     }
00443 
00444     // -- PROTECTED METHODS -----------------------------------------------
00445     //
00446     protected void finalize() {
00447         returnFutureProxy(this);
00448     }
00449 
00450     protected void setMigrationTag() {
00451         migration = true;
00452     }
00453 
00454     protected void unsetMigrationTag() {
00455         migration = false;
00456     }
00457 
00458     public synchronized void setContinuationTag() {
00459         continuation = true;
00460     }
00461 
00462     public synchronized void unsetContinuationTag() {
00463         continuation = false;
00464     }
00465 
00466     //
00467     // -- PRIVATE METHODS FOR SERIALIZATION -----------------------------------------------
00468     //
00469     private synchronized void writeObject(java.io.ObjectOutputStream out)
00470         throws java.io.IOException {
00471         if (!FuturePool.isInsideABodyForwarder()) {
00472             // If we are on a forwarder we want to forward the call, not wait the 
00473             // futur result or whatever
00474             //if continuation is already set, we are in a forwarder
00475             //else if a destination is available in destTable, set the continuation tag
00476             if (!continuation) {
00477                 continuation = (FuturePool.getBodyDestination() != null);
00478             }
00479 
00480             // We wait until the result is available
00481             if ((!migration) && (!continuation)) {
00482                 waitFor();
00483             }
00484 
00485             // Registration in case of continuation
00486             if (continuation && isAwaited()) {
00487                 // get the sender body
00488                 Body sender = LocalBodyStore.getInstance().getLocalBody(senderID);
00489 
00490                 // it's a halfbody...
00491                 if (sender == null) {
00492                     sender = LocalBodyStore.getInstance().getLocalHalfBody(senderID);
00493                 }
00494                 if (sender != null) {
00495                     UniversalBody dest = FuturePool.getBodyDestination();
00496                     if (dest != null) {
00497                         sender.getFuturePool().addAutomaticContinuation(ID,
00498                             creatorID, dest);
00499                     }
00500                 }
00501 
00502                 // if sender is still null, it's a forwarder !!
00503             }
00504         } else {
00505             // Maybe this FutureProxy has been added into FuturePool by readObject
00506             // Remove it and restore continuation
00507             ArrayList futures = FuturePool.getIncomingFutures();
00508             if (futures != null) {
00509                 for (int i = 0; i < futures.size(); i++) {
00510                     FutureProxy fp = (FutureProxy) futures.get(i);
00511                     if (fp.creatorID.equals(creatorID) && (fp.ID == ID)) {
00512                         FuturePool.removeIncomingFutures();
00513                         continuation = true;
00514                     }
00515                 }
00516             }
00517         }
00518 
00519         // Pass the result
00520         out.writeObject(target);
00521         // Pass the continuation flag
00522         out.writeBoolean(continuation);
00523         // Pass the id
00524         out.writeLong(ID);
00525         //Pass the creatorID
00526         out.writeObject(creatorID);
00527 
00528         //unset the current continuation tag
00529         this.continuation = false;
00530     }
00531 
00532     private synchronized void readObject(java.io.ObjectInputStream in)
00533         throws java.io.IOException, ClassNotFoundException {
00534         target = (FutureResult) in.readObject();
00535         continuation = (boolean) in.readBoolean();
00536         ID = (long) in.readLong();
00537         creatorID = (UniqueID) in.readObject();
00538 
00539         // THIS MUST BE DONE WHEN THE SERVER SEND LOGS !!
00540         if (continuation && isAwaited()) {
00541             // If we are on a BodyForwarder we DO NOT want to perform the 
00542             // following operations but can't avoid them.
00543             // There is a hack in writeObject to restore the continuation state         
00544             continuation = false;
00545             FuturePool.registerIncomingFuture(this);
00546         }
00547 
00548         //now we restore migration to its normal value
00549         migration = false;
00550     }
00551 
00552     //
00553     // -- PRIVATE STATIC METHODS -----------------------------------------------
00554     //
00555     private static boolean isFutureObject(Object obj) {
00556         // If obj is not reified, it cannot be a future
00557         if (!(MOP.isReifiedObject(obj))) {
00558             return false;
00559         }
00560 
00561         // Being a future object is equivalent to have a stub/proxy pair
00562         // where the proxy object implements the interface FUTURE_PROXY_INTERFACE
00563         // if the proxy does not inherit from FUTURE_PROXY_ROOT_CLASS
00564         // it is not a future
00565         Class proxyclass = ((StubObject) obj).getProxy().getClass();
00566         Class[] ints = proxyclass.getInterfaces();
00567         for (int i = 0; i < ints.length; i++) {
00568             if (Constants.FUTURE_PROXY_INTERFACE.isAssignableFrom(ints[i])) {
00569                 return true;
00570             }
00571         }
00572         return false;
00573     }
00574 
00575     private static synchronized void setShouldPoolFutureProxyObjects(
00576         boolean value) {
00577         if (shouldPoolFutureProxyObjects == value) {
00578             return;
00579         }
00580         shouldPoolFutureProxyObjects = value;
00581         if (shouldPoolFutureProxyObjects) {
00582             // Creates the recycle poll for FutureProxy objects
00583             recyclePool = new FutureProxy[RECYCLE_POOL_SIZE];
00584             index = 0;
00585         } else {
00586             // If we do not want to recycle FutureProxy objects anymore,
00587             // let's free some memory by permitting the reyclePool to be
00588             // garbage-collecting
00589             recyclePool = null;
00590         }
00591     }
00592 
00593     private static synchronized void returnFutureProxy(FutureProxy futureProxy) {
00594         if (!shouldPoolFutureProxyObjects) {
00595             return;
00596         }
00597 
00598         // If there's still one slot left in the pool
00599         if (recyclePool[index] == null) {
00600             // Cleans up a FutureProxy object
00601             // It is prefereable to do it here rather than at the moment
00602             // the object is picked out of the pool, because it allows
00603             // garbage-collecting the objects referenced in here
00604             futureProxy.target = null;
00605             futureProxy.exceptionLevel = null;
00606 
00607             // Inserts the object in the pool
00608             recyclePool[index] = futureProxy;
00609             index++;
00610             if (index == RECYCLE_POOL_SIZE) {
00611                 index = RECYCLE_POOL_SIZE - 1;
00612             }
00613         }
00614     }
00615 
00619     public ExceptionMaskLevel getExceptionLevel() {
00620         return exceptionLevel;
00621     }
00622 
00626     public void setExceptionLevel(ExceptionMaskLevel exceptionLevel) {
00627         this.exceptionLevel = exceptionLevel;
00628     }
00629 
00635     public synchronized static int futureLength(Object future) {
00636         int res = 0;
00637         if ((MOP.isReifiedObject(future)) &&
00638                 ((((StubObject) future).getProxy()) instanceof Future)) {
00639             res++;
00640             Future f = (Future) (((StubObject) future).getProxy());
00641             Object gna = f.getResult();
00642             while ((MOP.isReifiedObject(gna)) &&
00643                     ((((StubObject) gna).getProxy()) instanceof Future)) {
00644                 f = (Future) (((StubObject) gna).getProxy());
00645                 gna = f.getResult();
00646                 res++;
00647             }
00648         }
00649         return res;
00650     }
00651 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1