00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.future;
00032
00033 import java.util.ArrayList;
00034 import java.util.Collections;
00035 import java.util.HashMap;
00036 import java.util.Map;
00037
00038 import org.objectweb.proactive.Body;
00039 import org.objectweb.proactive.ProActive;
00040 import org.objectweb.proactive.core.ProActiveException;
00041 import org.objectweb.proactive.core.ProActiveRuntimeException;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.AbstractBody;
00044 import org.objectweb.proactive.core.body.LocalBodyStore;
00045 import org.objectweb.proactive.core.body.UniversalBody;
00046 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00047 import org.objectweb.proactive.core.body.reply.Reply;
00048 import org.objectweb.proactive.core.body.reply.ReplyImpl;
00049 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00050 import org.objectweb.proactive.core.mop.Utils;
00051 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00052 import org.objectweb.proactive.ext.security.exceptions.SecurityNotAvailableException;
00053
00054
00055 public class FuturePool extends Object implements java.io.Serializable {
00056 protected boolean newState;
00057
00058
00059 private FutureMap futures;
00060
00061
00062 private UniqueID ownerBody;
00063
00064
00065 private transient ActiveACQueue queueAC;
00066
00067
00068 private boolean acEnabled;
00069
00070
00071
00072 private java.util.HashMap<String,FutureResult> valuesForFutures;
00073
00074
00075
00076
00077 public FuturePool() {
00078 futures = new FutureMap();
00079 valuesForFutures = new java.util.HashMap<String,FutureResult>();
00080 this.newState = false;
00081 if ("enable".equals(ProActiveConfiguration.getACState())) {
00082 this.acEnabled = true;
00083 } else {
00084 this.acEnabled = false;
00085 }
00086 }
00087
00088
00089
00090
00091
00092
00093
00094 static private java.util.Hashtable<Thread, UniversalBody> bodyDestination;
00095
00096
00097 static private Map<Thread,Object> forwarderThreads;
00098
00099
00100 static public void addMeAsBodyForwarder() {
00101 forwarderThreads.put(Thread.currentThread(), null);
00102 }
00103
00104
00105 static public void removeMeFromBodyForwarders() {
00106 forwarderThreads.remove(Thread.currentThread());
00107 }
00108
00109
00110 static public boolean isInsideABodyForwarder() {
00111 return forwarderThreads.containsKey(Thread.currentThread());
00112 }
00113
00114
00115 static public void registerBodyDestination(UniversalBody dest) {
00116 bodyDestination.put(Thread.currentThread(), dest);
00117 }
00118
00119
00120 static public void removeBodyDestination() {
00121 bodyDestination.remove(Thread.currentThread());
00122 }
00123
00124
00125 static public UniversalBody getBodyDestination() {
00126 return (bodyDestination.get(Thread.currentThread()));
00127 }
00128
00129
00130
00131 static private java.util.Hashtable<Thread, ArrayList<Future>> incomingFutures;
00132
00133
00134 public static void registerIncomingFuture(Future f) {
00135 java.util.ArrayList<Future> listOfFutures = incomingFutures.get(Thread.currentThread());
00136 if (listOfFutures != null) {
00137 listOfFutures.add(f);
00138 } else {
00139 java.util.ArrayList<Future> newListOfFutures = new java.util.ArrayList<Future>();
00140 newListOfFutures.add(f);
00141 incomingFutures.put(Thread.currentThread(), newListOfFutures);
00142 }
00143 }
00144
00145
00146 static public void removeIncomingFutures() {
00147 incomingFutures.remove(Thread.currentThread());
00148 }
00149
00150
00151 static public java.util.ArrayList getIncomingFutures() {
00152 return (incomingFutures.get(Thread.currentThread()));
00153 }
00154
00155
00156 static {
00157 bodyDestination = new java.util.Hashtable<Thread, UniversalBody>();
00158 incomingFutures = new java.util.Hashtable<Thread, ArrayList<Future>>();
00159
00160 forwarderThreads = Collections.synchronizedMap(new HashMap<Thread,Object>());
00161 }
00162
00163
00164
00165
00166
00171 public void setOwnerBody(UniqueID i) {
00172 ownerBody = i;
00173 }
00174
00178 public UniqueID getOwnerBody() {
00179 return ownerBody;
00180 }
00181
00186 public void enableAC() {
00187
00188 this.acEnabled = true;
00189 }
00190
00195 public void disableAC() {
00196 this.acEnabled = false;
00197 if (this.queueAC!=null){
00198 this.queueAC.killMe();
00199 this.queueAC = null;
00200 }
00201 }
00202
00210 public synchronized int receiveFutureValue(long id, UniqueID creatorID,
00211 FutureResult result, Reply reply) throws java.io.IOException {
00212
00213 java.util.ArrayList futuresToUpdate = futures.getFuturesToUpdate(id,
00214 creatorID);
00215
00216 if (futuresToUpdate != null) {
00217
00218 int ftres = FTManager.NON_FT;
00219 if ((reply != null) && (reply.getFTManager() != null)) {
00220 ftres = reply.getFTManager().onDeliverReply(reply);
00221 }
00222
00223 Future future = (Future) (futuresToUpdate.get(0));
00224 if (future != null) {
00225 future.receiveReply(result);
00226 }
00227
00228
00229
00230
00231
00232 setMigrationTag();
00233 for (int i = 1; i < futuresToUpdate.size(); i++) {
00234 Future otherFuture = (Future) (futuresToUpdate.get(i));
00235 otherFuture.receiveReply((FutureResult) Utils.makeDeepCopy(
00236 result));
00237 }
00238 unsetMigrationTag();
00239 stateChange();
00240
00241
00242 if (acEnabled) {
00243 java.util.ArrayList bodiesToContinue = futures.getAutomaticContinuation(id,
00244 creatorID);
00245 if ((bodiesToContinue != null) &&
00246 (bodiesToContinue.size() != 0)) {
00247 ProActiveSecurityManager psm = null;
00248 try {
00249 psm = ((AbstractBody) ProActive.getBodyOnThis()).getProActiveSecurityManager();
00250 } catch (SecurityNotAvailableException e) {
00251 psm = null;
00252 }
00253
00254
00255 if (this.queueAC==null){
00256 this.queueAC = new ActiveACQueue();
00257 this.queueAC.start();
00258 }
00259
00260 queueAC.addACRequest(new ACService(bodiesToContinue,
00261 new ReplyImpl(creatorID, id, null, result, psm, true)));
00262 }
00263 }
00264
00265
00266 futures.removeFutures(id, creatorID);
00267
00268 return ftres;
00269 } else {
00270
00271 this.valuesForFutures.put("" + id + creatorID, result);
00272
00273 return FTManager.ORPHAN_REPLY;
00274 }
00275 }
00276
00281 public synchronized void receiveFuture(Future futureObject) {
00282 futureObject.setSenderID(ownerBody);
00283 futures.receiveFuture(futureObject);
00284 long id = futureObject.getID();
00285 UniqueID creatorID = futureObject.getCreatorID();
00286 if (valuesForFutures.get("" + id + creatorID) != null) {
00287 try {
00288 this.receiveFutureValue(id, creatorID,
00289 valuesForFutures.remove("" + id + creatorID),
00290 null);
00291 } catch (java.io.IOException e) {
00292 e.printStackTrace();
00293 }
00294 }
00295 }
00296
00303 public void addAutomaticContinuation(long id, UniqueID creatorID,
00304 UniversalBody bodyDest) {
00305 futures.addAutomaticContinuation(id, creatorID, bodyDest);
00306 }
00307
00308 public synchronized void waitForReply(long timeout)
00309 throws ProActiveException {
00310 this.newState = false;
00311
00312 int timeoutCounter = 1;
00313 while (!newState) {
00314 timeoutCounter--;
00315
00316
00317 if (timeoutCounter < 0) {
00318 throw new ProActiveException(
00319 "Timeout expired while waiting for future update");
00320 }
00321 try {
00322 wait(timeout);
00323 } catch (InterruptedException e) {
00324 e.printStackTrace();
00325 }
00326 }
00327 }
00328
00333 public void registerDestination(UniversalBody dest) {
00334 if (acEnabled) {
00335 FuturePool.registerBodyDestination(dest);
00336 }
00337 }
00338
00342 public void removeDestination() {
00343 if (acEnabled) {
00344 FuturePool.removeBodyDestination();
00345 }
00346 }
00347
00348 public void setMigrationTag() {
00349 futures.setMigrationTag();
00350 }
00351
00352 public void unsetMigrationTag() {
00353 futures.unsetMigrationTag();
00354 }
00355
00356
00357
00358
00359
00360 private void stateChange() {
00361 this.newState = true;
00362 notifyAll();
00363 }
00364
00365
00366
00367
00368 private void writeObject(java.io.ObjectOutputStream out)
00369 throws java.io.IOException {
00370 setMigrationTag();
00371 out.defaultWriteObject();
00372 if (acEnabled) {
00373
00374 if (this.queueAC==null){
00375
00376 out.writeBoolean(false);
00377 } else {
00378
00379 out.writeBoolean(true);
00380
00381 out.writeObject(queueAC.getQueue());
00382
00383 FTManager ftm = ((AbstractBody) (LocalBodyStore.getInstance()
00384 .getLocalBody(this.ownerBody))).getFTManager();
00385 if (ftm != null) {
00386 if (!ftm.isACheckpoint()) {
00387 queueAC.killMe();
00388 }
00389 } else {
00390 queueAC.killMe();
00391 }
00392 }
00393 }
00394 }
00395
00396 private void readObject(java.io.ObjectInputStream in)
00397 throws java.io.IOException, ClassNotFoundException {
00398 in.defaultReadObject();
00399 unsetMigrationTag();
00400 if (acEnabled) {
00401
00402 boolean queueExists = in.readBoolean();
00403 if (queueExists){
00404
00405 java.util.ArrayList<ACService> queue = (java.util.ArrayList<ACService>) (in.readObject());
00406 queueAC = new ActiveACQueue(queue);
00407 queueAC.start();
00408 }
00409 }
00410 }
00411
00412
00413
00421 private class ActiveACQueue extends Thread {
00422 private java.util.ArrayList<ACService> queue;
00423 private int counter;
00424 private boolean kill;
00425
00426
00427
00428
00429 public ActiveACQueue() {
00430 queue = new java.util.ArrayList<ACService>();
00431 counter = 0;
00432 kill = false;
00433 this.setName("Thread for AC");
00434 }
00435
00436 public ActiveACQueue(java.util.ArrayList<ACService> queue) {
00437 this.queue = queue;
00438 counter = queue.size();
00439 kill = false;
00440 this.setName("Thread for AC");
00441 }
00442
00443
00444
00445
00446
00450 public java.util.ArrayList<ACService> getQueue() {
00451 return queue;
00452 }
00453
00457 public synchronized void addACRequest(ACService r) {
00458 queue.add(r);
00459 counter++;
00460 notifyAll();
00461 }
00462
00466 public synchronized ACService removeACRequest() {
00467 counter--;
00468 return (queue.remove(0));
00469 }
00470
00474 public synchronized void killMe() {
00475 kill = true;
00476 notifyAll();
00477 }
00478
00479 public void run() {
00480
00481
00482
00483 Body owner = null;
00484
00485 while (true) {
00486
00487 waitForAC();
00488
00489 if (kill) {
00490 break;
00491 }
00492 while (owner == null) {
00493 owner = LocalBodyStore.getInstance().getLocalBody(ownerBody);
00494
00495 LocalBodyStore.getInstance().setCurrentThreadBody(owner);
00496
00497 if (owner == null) {
00498 owner = LocalBodyStore.getInstance().getLocalHalfBody(ownerBody);
00499 LocalBodyStore.getInstance().setCurrentThreadBody(owner);
00500 }
00501 }
00502
00503
00504 try {
00505
00506 owner.enterInThreadStore();
00507
00508
00509 if (kill) {
00510 break;
00511 }
00512
00513 ACService toDo = this.removeACRequest();
00514 if (toDo != null) {
00515 toDo.doAutomaticContinuation();
00516 }
00517
00518
00519 owner.exitFromThreadStore();
00520 } catch (Exception e2) {
00521
00522 owner.exitFromThreadStore();
00523 throw new ProActiveRuntimeException("Error while sending reply for AC ",
00524 e2);
00525 }
00526 }
00527 }
00528
00529
00530 private synchronized void waitForAC() {
00531 try {
00532 while ((counter == 0) && !kill) {
00533 wait();
00534 }
00535 } catch (InterruptedException e) {
00536 e.printStackTrace();
00537 }
00538 }
00539 }
00540
00545 private class ACService implements java.io.Serializable {
00546
00547 private java.util.ArrayList dests;
00548
00549
00550 private Reply reply;
00551
00552
00553
00554
00555 public ACService(java.util.ArrayList dests, Reply reply) {
00556 this.dests = dests;
00557 this.reply = reply;
00558 }
00559
00560
00561
00562
00563 public void doAutomaticContinuation() throws java.io.IOException {
00564 if (dests != null) {
00565 for (int i = 0; i < dests.size(); i++) {
00566 UniversalBody dest = (UniversalBody) (dests.get(i));
00567 registerDestination(dest);
00568
00569 AbstractBody ownerBody = (AbstractBody) (LocalBodyStore.getInstance()
00570 .getLocalBody(FuturePool.this.ownerBody));
00571 if (ownerBody == null) {
00572
00573 ownerBody = (AbstractBody) (LocalBodyStore.getInstance()
00574 .getLocalHalfBody(FuturePool.this.ownerBody));
00575 }
00576 FTManager ftm = ownerBody.getFTManager();
00577 if (ftm != null) {
00578 ftm.sendReply(reply, dest);
00579 } else {
00580
00581 reply.send(dest);
00582 }
00583 removeDestination();
00584 }
00585 }
00586 }
00587 }
00588 }