00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.future;
00032
00033 import java.lang.reflect.InvocationTargetException;
00034 import java.util.ArrayList;
00035 import java.util.HashMap;
00036
00037 import org.objectweb.proactive.Body;
00038 import org.objectweb.proactive.ProActive;
00039 import org.objectweb.proactive.core.Constants;
00040 import org.objectweb.proactive.core.ProActiveException;
00041 import org.objectweb.proactive.core.ProActiveRuntimeException;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.LocalBodyStore;
00044 import org.objectweb.proactive.core.body.UniversalBody;
00045 import org.objectweb.proactive.core.body.proxy.AbstractProxy;
00046 import org.objectweb.proactive.core.event.FutureEvent;
00047 import org.objectweb.proactive.core.exceptions.manager.ExceptionHandler;
00048 import org.objectweb.proactive.core.exceptions.manager.ExceptionMaskLevel;
00049 import org.objectweb.proactive.core.exceptions.manager.NFEManager;
00050 import org.objectweb.proactive.core.exceptions.proxy.FutureTimeoutException;
00051 import org.objectweb.proactive.core.exceptions.proxy.ProxyNonFunctionalException;
00052 import org.objectweb.proactive.core.mop.ConstructionOfReifiedObjectFailedException;
00053 import org.objectweb.proactive.core.mop.ConstructorCall;
00054 import org.objectweb.proactive.core.mop.MOP;
00055 import org.objectweb.proactive.core.mop.MethodCall;
00056 import org.objectweb.proactive.core.mop.MethodCallExecutionFailedException;
00057 import org.objectweb.proactive.core.mop.Proxy;
00058 import org.objectweb.proactive.core.mop.StubObject;
00059
00060
00068 public class FutureProxy implements Future, Proxy, java.io.Serializable {
00069
00070
00071
00072
00076 public static final int RECYCLE_POOL_SIZE = 1000;
00077 private static FutureProxy[] recyclePool;
00078
00082 private static boolean shouldPoolFutureProxyObjects;
00083 private static int index;
00084
00087 private static FutureEventProducerImpl futureEventProducer;
00088
00089
00090
00091
00092
00096 protected FutureResult target;
00097
00103 protected boolean migration;
00104
00108 protected boolean continuation;
00109
00113 protected UniqueID creatorID;
00114
00119 protected long ID;
00120
00124 protected UniqueID senderID;
00125
00129 protected HashMap futureLevel = null;
00130
00135 private ExceptionMaskLevel exceptionLevel;
00136
00142 private transient AbstractProxy originatingProxy;
00143
00148 protected static long futureMaxDelay = -1;
00149
00150
00151
00152
00153
00159 public FutureProxy() throws ConstructionOfReifiedObjectFailedException {
00160 }
00161
00167 public FutureProxy(ConstructorCall c, Object[] p)
00168 throws ConstructionOfReifiedObjectFailedException {
00169
00170 this();
00171 }
00172
00173
00174
00175
00176
00181 public static boolean isAwaited(Object obj) {
00182 return ProActive.isAwaited(obj);
00183 }
00184
00185 public synchronized static FutureProxy getFutureProxy() {
00186 FutureProxy result;
00187 if (shouldPoolFutureProxyObjects && (index > 0)) {
00188
00189 index--;
00190 result = recyclePool[index];
00191 recyclePool[index] = null;
00192 } else {
00193 try {
00194 result = new FutureProxy();
00195 } catch (ConstructionOfReifiedObjectFailedException e) {
00196 result = null;
00197 }
00198 }
00199 return result;
00200 }
00201
00204 public static FutureEventProducer getFutureEventProducer() {
00205 if (futureEventProducer == null) {
00206 futureEventProducer = new FutureEventProducerImpl();
00207 }
00208 return futureEventProducer;
00209 }
00210
00211
00212
00213
00214 public boolean equals(Object obj) {
00215
00216 if (isFutureObject(obj)) {
00217 return (((StubObject) obj).getProxy().hashCode() == this.hashCode());
00218 }
00219 return false;
00220 }
00221
00222
00223
00224
00225
00234 public synchronized void receiveReply(FutureResult obj)
00235 throws java.io.IOException {
00236 if (isAvailable()) {
00237 throw new java.io.IOException(
00238 "FutureProxy receives a reply and this target field is not null");
00239 }
00240 target = obj;
00241 ExceptionHandler.addResult(this);
00242 ProxyNonFunctionalException nfe = target.getNFE();
00243 if (nfe != null) {
00244 NFEManager.fireNFE(nfe, originatingProxy);
00245 }
00246
00247 originatingProxy = null;
00248 this.notifyAll();
00249 }
00250
00256 public synchronized Throwable getRaisedException() {
00257 waitFor();
00258 return target.getExceptionToRaise();
00259 }
00260
00264 public boolean isAvailable() {
00265 return target != null;
00266 }
00267
00268 public FutureResult getFutureResult() {
00269 return target;
00270 }
00271
00276 public synchronized Object getResult() {
00277 waitFor();
00278 return target.getResult();
00279 }
00280
00281 public synchronized void setResult(Object o) {
00282 target = new FutureResult(o, null, null);
00283 }
00284
00289 public synchronized boolean isAwaited() {
00290 return !isAvailable();
00291 }
00292
00296 public synchronized void waitFor() {
00297 if (futureMaxDelay == -1) {
00298
00299
00300 try {
00301 futureMaxDelay = Long.parseLong(System.getProperty(
00302 "proactive.future.maxdelay"));
00303 } catch (IllegalArgumentException iea) {
00304
00305
00306 futureMaxDelay = 0;
00307 }
00308 }
00309
00310 try {
00311 waitFor(futureMaxDelay);
00312 } catch (ProActiveException e) {
00313 ProxyNonFunctionalException nfe = new FutureTimeoutException(
00314 "Exception after waiting for " + futureMaxDelay + "ms", e);
00315
00316 target = new FutureResult(null, null, nfe);
00317 notifyAll();
00318 }
00319 }
00320
00326 public synchronized void waitFor(long timeout) throws ProActiveException {
00327 if (isAvailable()) {
00328 return;
00329 }
00330
00331 UniqueID id = null;
00332
00333
00334 if (futureEventProducer != null) {
00335 id = ProActive.getBodyOnThis().getID();
00336 if (LocalBodyStore.getInstance().getLocalBody(id) != null) {
00337
00338 futureEventProducer.notifyListeners(id, getCreatorID(),
00339 FutureEvent.WAIT_BY_NECESSITY);
00340 } else {
00341 id = null;
00342 }
00343 }
00344 int timeoutCounter = 1;
00345 while (!isAvailable()) {
00346 timeoutCounter--;
00347
00348
00349 if (timeoutCounter < 0) {
00350 throw new ProActiveException(
00351 "Timeout expired while waiting for the future update");
00352 }
00353 try {
00354 this.wait(timeout);
00355 } catch (InterruptedException e) {
00356 e.printStackTrace();
00357 }
00358 }
00359
00360
00361 if (id != null) {
00362 futureEventProducer.notifyListeners(id, getCreatorID(),
00363 FutureEvent.RECEIVED_FUTURE_RESULT);
00364 }
00365 }
00366
00367 public long getID() {
00368 return ID;
00369 }
00370
00371 public void setID(long l) {
00372 ID = l;
00373 }
00374
00375 public void setCreatorID(UniqueID i) {
00376 creatorID = i;
00377 }
00378
00379 public UniqueID getCreatorID() {
00380 return creatorID;
00381 }
00382
00383 public void setSenderID(UniqueID i) {
00384 senderID = i;
00385 }
00386
00387 public void setOriginatingProxy(AbstractProxy p) {
00388 originatingProxy = p;
00389 }
00390
00391
00392
00393
00394
00409 public Object reify(MethodCall c) throws InvocationTargetException {
00410 Object result = null;
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422 waitFor();
00423
00424
00425 Object resultObject = target.getResult();
00426 try {
00427 result = c.execute(resultObject);
00428 } catch (MethodCallExecutionFailedException e) {
00429 throw new ProActiveRuntimeException(
00430 "FutureProxy: Illegal arguments in call " + c.getName());
00431 }
00432
00433
00434 if (resultObject instanceof StubObject) {
00435 Proxy p = ((StubObject) resultObject).getProxy();
00436 if (p instanceof FutureProxy) {
00437 target = ((FutureProxy) p).target;
00438 }
00439 }
00440
00441 return result;
00442 }
00443
00444
00445
00446 protected void finalize() {
00447 returnFutureProxy(this);
00448 }
00449
00450 protected void setMigrationTag() {
00451 migration = true;
00452 }
00453
00454 protected void unsetMigrationTag() {
00455 migration = false;
00456 }
00457
00458 public synchronized void setContinuationTag() {
00459 continuation = true;
00460 }
00461
00462 public synchronized void unsetContinuationTag() {
00463 continuation = false;
00464 }
00465
00466
00467
00468
00469 private synchronized void writeObject(java.io.ObjectOutputStream out)
00470 throws java.io.IOException {
00471 if (!FuturePool.isInsideABodyForwarder()) {
00472
00473
00474
00475
00476 if (!continuation) {
00477 continuation = (FuturePool.getBodyDestination() != null);
00478 }
00479
00480
00481 if ((!migration) && (!continuation)) {
00482 waitFor();
00483 }
00484
00485
00486 if (continuation && isAwaited()) {
00487
00488 Body sender = LocalBodyStore.getInstance().getLocalBody(senderID);
00489
00490
00491 if (sender == null) {
00492 sender = LocalBodyStore.getInstance().getLocalHalfBody(senderID);
00493 }
00494 if (sender != null) {
00495 UniversalBody dest = FuturePool.getBodyDestination();
00496 if (dest != null) {
00497 sender.getFuturePool().addAutomaticContinuation(ID,
00498 creatorID, dest);
00499 }
00500 }
00501
00502
00503 }
00504 } else {
00505
00506
00507 ArrayList futures = FuturePool.getIncomingFutures();
00508 if (futures != null) {
00509 for (int i = 0; i < futures.size(); i++) {
00510 FutureProxy fp = (FutureProxy) futures.get(i);
00511 if (fp.creatorID.equals(creatorID) && (fp.ID == ID)) {
00512 FuturePool.removeIncomingFutures();
00513 continuation = true;
00514 }
00515 }
00516 }
00517 }
00518
00519
00520 out.writeObject(target);
00521
00522 out.writeBoolean(continuation);
00523
00524 out.writeLong(ID);
00525
00526 out.writeObject(creatorID);
00527
00528
00529 this.continuation = false;
00530 }
00531
00532 private synchronized void readObject(java.io.ObjectInputStream in)
00533 throws java.io.IOException, ClassNotFoundException {
00534 target = (FutureResult) in.readObject();
00535 continuation = (boolean) in.readBoolean();
00536 ID = (long) in.readLong();
00537 creatorID = (UniqueID) in.readObject();
00538
00539
00540 if (continuation && isAwaited()) {
00541
00542
00543
00544 continuation = false;
00545 FuturePool.registerIncomingFuture(this);
00546 }
00547
00548
00549 migration = false;
00550 }
00551
00552
00553
00554
00555 private static boolean isFutureObject(Object obj) {
00556
00557 if (!(MOP.isReifiedObject(obj))) {
00558 return false;
00559 }
00560
00561
00562
00563
00564
00565 Class proxyclass = ((StubObject) obj).getProxy().getClass();
00566 Class[] ints = proxyclass.getInterfaces();
00567 for (int i = 0; i < ints.length; i++) {
00568 if (Constants.FUTURE_PROXY_INTERFACE.isAssignableFrom(ints[i])) {
00569 return true;
00570 }
00571 }
00572 return false;
00573 }
00574
00575 private static synchronized void setShouldPoolFutureProxyObjects(
00576 boolean value) {
00577 if (shouldPoolFutureProxyObjects == value) {
00578 return;
00579 }
00580 shouldPoolFutureProxyObjects = value;
00581 if (shouldPoolFutureProxyObjects) {
00582
00583 recyclePool = new FutureProxy[RECYCLE_POOL_SIZE];
00584 index = 0;
00585 } else {
00586
00587
00588
00589 recyclePool = null;
00590 }
00591 }
00592
00593 private static synchronized void returnFutureProxy(FutureProxy futureProxy) {
00594 if (!shouldPoolFutureProxyObjects) {
00595 return;
00596 }
00597
00598
00599 if (recyclePool[index] == null) {
00600
00601
00602
00603
00604 futureProxy.target = null;
00605 futureProxy.exceptionLevel = null;
00606
00607
00608 recyclePool[index] = futureProxy;
00609 index++;
00610 if (index == RECYCLE_POOL_SIZE) {
00611 index = RECYCLE_POOL_SIZE - 1;
00612 }
00613 }
00614 }
00615
00619 public ExceptionMaskLevel getExceptionLevel() {
00620 return exceptionLevel;
00621 }
00622
00626 public void setExceptionLevel(ExceptionMaskLevel exceptionLevel) {
00627 this.exceptionLevel = exceptionLevel;
00628 }
00629
00635 public synchronized static int futureLength(Object future) {
00636 int res = 0;
00637 if ((MOP.isReifiedObject(future)) &&
00638 ((((StubObject) future).getProxy()) instanceof Future)) {
00639 res++;
00640 Future f = (Future) (((StubObject) future).getProxy());
00641 Object gna = f.getResult();
00642 while ((MOP.isReifiedObject(gna)) &&
00643 ((((StubObject) gna).getProxy()) instanceof Future)) {
00644 f = (Future) (((StubObject) gna).getProxy());
00645 gna = f.getResult();
00646 res++;
00647 }
00648 }
00649 return res;
00650 }
00651 }