org/objectweb/proactive/core/body/future/FuturePool.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.future;
00032 
00033 import java.util.ArrayList;
00034 import java.util.Collections;
00035 import java.util.HashMap;
00036 import java.util.Map;
00037 
00038 import org.objectweb.proactive.Body;
00039 import org.objectweb.proactive.ProActive;
00040 import org.objectweb.proactive.core.ProActiveException;
00041 import org.objectweb.proactive.core.ProActiveRuntimeException;
00042 import org.objectweb.proactive.core.UniqueID;
00043 import org.objectweb.proactive.core.body.AbstractBody;
00044 import org.objectweb.proactive.core.body.LocalBodyStore;
00045 import org.objectweb.proactive.core.body.UniversalBody;
00046 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00047 import org.objectweb.proactive.core.body.reply.Reply;
00048 import org.objectweb.proactive.core.body.reply.ReplyImpl;
00049 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00050 import org.objectweb.proactive.core.mop.Utils;
00051 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00052 import org.objectweb.proactive.ext.security.exceptions.SecurityNotAvailableException;
00053 
00054 
00055 public class FuturePool extends Object implements java.io.Serializable {
00056     protected boolean newState;
00057 
00058     // table of future and ACs
00059     private FutureMap futures;
00060 
00061     // ID of the body corresponding to this futurePool
00062     private UniqueID ownerBody;
00063 
00064     // Active queue of AC services
00065     private transient ActiveACQueue queueAC;
00066 
00067     // toggle for enabling or disabling automatic continuation 
00068     private boolean acEnabled;
00069 
00070     // table used for storing values which arrive in the futurePool BEFORE the registration
00071     // of its corresponding future.
00072     private java.util.HashMap<String,FutureResult> valuesForFutures;
00073 
00074     //
00075     // -- CONSTRUCTORS -----------------------------------------------
00076     //
00077     public FuturePool() {
00078         futures = new FutureMap();
00079         valuesForFutures = new java.util.HashMap<String,FutureResult>();
00080         this.newState = false;
00081         if ("enable".equals(ProActiveConfiguration.getACState())) {
00082             this.acEnabled = true;
00083         } else {
00084             this.acEnabled = false;
00085         }
00086     }
00087 
00088     //
00089     // -- STATIC ------------------------------------------------------
00090     //
00091     // this table is used to register destination before sending.
00092     // So, a future could retreive its destination during serialization
00093     // this table indexed by the thread which perform the registration.
00094     static private java.util.Hashtable<Thread, UniversalBody> bodyDestination;
00095 
00096     // map of threads that are running a body forwarder 
00097     static private Map<Thread,Object> forwarderThreads;
00098 
00099     // Add the current thread as a body forwarder
00100     static public void addMeAsBodyForwarder() {
00101         forwarderThreads.put(Thread.currentThread(), null);
00102     }
00103 
00104     // Remove the current thread from the list of body forwarders
00105     static public void removeMeFromBodyForwarders() {
00106         forwarderThreads.remove(Thread.currentThread());
00107     }
00108 
00109     // Return true if the current thread is executing a body forwarder 
00110     static public boolean isInsideABodyForwarder() {
00111         return forwarderThreads.containsKey(Thread.currentThread());
00112     }
00113 
00114     // to register in the table
00115     static public void registerBodyDestination(UniversalBody dest) {
00116         bodyDestination.put(Thread.currentThread(), dest);
00117     }
00118 
00119     // to clear an entry in the table
00120     static public void removeBodyDestination() {
00121         bodyDestination.remove(Thread.currentThread());
00122     }
00123 
00124     // to get a destination
00125     static public UniversalBody getBodyDestination() {
00126         return (bodyDestination.get(Thread.currentThread()));
00127     }
00128 
00129     // this table is used to register deserialized futures after receive
00130     // So, futures to add in the local futurePool could be retreived
00131     static private java.util.Hashtable<Thread, ArrayList<Future>> incomingFutures;
00132 
00133     // to register an incoming future in the table      
00134     public static void registerIncomingFuture(Future f) {
00135         java.util.ArrayList<Future> listOfFutures = incomingFutures.get(Thread.currentThread());
00136         if (listOfFutures != null) {
00137             listOfFutures.add(f);
00138         } else {
00139             java.util.ArrayList<Future> newListOfFutures = new java.util.ArrayList<Future>();
00140             newListOfFutures.add(f);
00141             incomingFutures.put(Thread.currentThread(), newListOfFutures);
00142         }
00143     }
00144 
00145     // to remove an entry from the table
00146     static public void removeIncomingFutures() {
00147         incomingFutures.remove(Thread.currentThread());
00148     }
00149 
00150     // to get a list of incomingFutures
00151     static public java.util.ArrayList getIncomingFutures() {
00152         return (incomingFutures.get(Thread.currentThread()));
00153     }
00154 
00155     // static init block
00156     static {
00157         bodyDestination = new java.util.Hashtable<Thread, UniversalBody>();
00158         incomingFutures = new java.util.Hashtable<Thread, ArrayList<Future>>();
00159         // A HashTable cannot contain null as value so we use a syncrhonized HashMap
00160         forwarderThreads = Collections.synchronizedMap(new HashMap<Thread,Object>());
00161     }
00162 
00163     //
00164     // -- PUBLIC METHODS -----------------------------------------------
00165     //
00166 
00171     public void setOwnerBody(UniqueID i) {
00172         ownerBody = i;
00173     }
00174 
00178     public UniqueID getOwnerBody() {
00179         return ownerBody;
00180     }
00181 
00186     public void enableAC() {
00187         // queueAC is created in a lazy manner (see receiveFutureValue)
00188         this.acEnabled = true;
00189     }
00190 
00195     public void disableAC() {
00196         this.acEnabled = false;
00197         if (this.queueAC!=null){
00198             this.queueAC.killMe();
00199             this.queueAC = null;
00200         }
00201     }
00202 
00210     public synchronized int receiveFutureValue(long id, UniqueID creatorID,
00211         FutureResult result, Reply reply) throws java.io.IOException {
00212         // get all aiwated futures
00213         java.util.ArrayList futuresToUpdate = futures.getFuturesToUpdate(id,
00214                 creatorID);
00215 
00216         if (futuresToUpdate != null) {
00217             // FAULT-TOLERANCE
00218             int ftres = FTManager.NON_FT;
00219             if ((reply != null) && (reply.getFTManager() != null)) {
00220                 ftres = reply.getFTManager().onDeliverReply(reply);
00221             }
00222 
00223             Future future = (Future) (futuresToUpdate.get(0));
00224             if (future != null) {
00225                 future.receiveReply(result);
00226             }
00227 
00228             // if there are more than one future to update, we "give" deep copy
00229             // of the result to the other futures to respect ProActive model
00230             // We use here the migration tag to perform a simple serialization (ie 
00231             // without continuation side-effects)
00232             setMigrationTag();
00233             for (int i = 1; i < futuresToUpdate.size(); i++) {
00234                 Future otherFuture = (Future) (futuresToUpdate.get(i));
00235                 otherFuture.receiveReply((FutureResult) Utils.makeDeepCopy(
00236                         result));
00237             }
00238             unsetMigrationTag();
00239             stateChange();
00240 
00241             // 2) create and put ACservices
00242             if (acEnabled) {
00243                 java.util.ArrayList bodiesToContinue = futures.getAutomaticContinuation(id,
00244                         creatorID);
00245                 if ((bodiesToContinue != null) &&
00246                         (bodiesToContinue.size() != 0)) {
00247                     ProActiveSecurityManager psm = null;
00248                     try {
00249                         psm = ((AbstractBody) ProActive.getBodyOnThis()).getProActiveSecurityManager();
00250                     } catch (SecurityNotAvailableException e) {
00251                         psm = null;
00252                     }
00253                     
00254                     // lazy creation of the AC thread
00255                     if (this.queueAC==null){
00256                         this.queueAC = new ActiveACQueue();
00257                         this.queueAC.start();
00258                     }
00259                     //the added reply is a deep copy with the isAC tag set to true
00260                     queueAC.addACRequest(new ACService(bodiesToContinue,
00261                             new ReplyImpl(creatorID, id, null, result, psm, true)));
00262                 }
00263             }
00264 
00265             // 3) Remove futures from the futureMap
00266             futures.removeFutures(id, creatorID);
00267 
00268             return ftres;
00269         } else {
00270             // we have to store the result received by AC until future arrive
00271             this.valuesForFutures.put("" + id + creatorID, result);
00272             // OR this reply might be an orphan reply (return value is ignored if not)
00273             return FTManager.ORPHAN_REPLY;
00274         }
00275     }
00276 
00281     public synchronized void receiveFuture(Future futureObject) {
00282         futureObject.setSenderID(ownerBody);
00283         futures.receiveFuture(futureObject);
00284         long id = futureObject.getID();
00285         UniqueID creatorID = futureObject.getCreatorID();
00286         if (valuesForFutures.get("" + id + creatorID) != null) {
00287             try {
00288                 this.receiveFutureValue(id, creatorID,
00289                     valuesForFutures.remove("" + id + creatorID),
00290                     null);
00291             } catch (java.io.IOException e) {
00292                 e.printStackTrace();
00293             }
00294         }
00295     }
00296 
00303     public void addAutomaticContinuation(long id, UniqueID creatorID,
00304         UniversalBody bodyDest) {
00305         futures.addAutomaticContinuation(id, creatorID, bodyDest);
00306     }
00307 
00308     public synchronized void waitForReply(long timeout)
00309         throws ProActiveException {
00310         this.newState = false;
00311         // variable used to know wether the timeout has expired or not
00312         int timeoutCounter = 1;
00313         while (!newState) {
00314             timeoutCounter--;
00315             // counter < 0 means that it is the second time we enter in the loop
00316             // while the state has not been changed, i.e timeout has expired
00317             if (timeoutCounter < 0) {
00318                 throw new ProActiveException(
00319                     "Timeout expired while waiting for future update");
00320             }
00321             try {
00322                 wait(timeout);
00323             } catch (InterruptedException e) {
00324                 e.printStackTrace();
00325             }
00326         }
00327     }
00328 
00333     public void registerDestination(UniversalBody dest) {
00334         if (acEnabled) {
00335             FuturePool.registerBodyDestination(dest);
00336         }
00337     }
00338 
00342     public void removeDestination() {
00343         if (acEnabled) {
00344             FuturePool.removeBodyDestination();
00345         }
00346     }
00347 
00348     public void setMigrationTag() {
00349         futures.setMigrationTag();
00350     }
00351 
00352     public void unsetMigrationTag() {
00353         futures.unsetMigrationTag();
00354     }
00355 
00356 
00357     //
00358     // -- PRIVATE METHODS -----------------------------------------------
00359     //
00360     private void stateChange() {
00361         this.newState = true;
00362         notifyAll();
00363     }
00364 
00365     //
00366     // -- PRIVATE METHODS FOR SERIALIZATION -----------------------------------------------
00367     //
00368     private void writeObject(java.io.ObjectOutputStream out)
00369         throws java.io.IOException {
00370         setMigrationTag();
00371         out.defaultWriteObject();
00372         if (acEnabled) {
00373             // queue could not be created because of lazy creation
00374             if (this.queueAC==null){
00375                 // notify the reader that queueAC is null
00376                 out.writeBoolean(false);
00377             } else {
00378                 // notify the reader that queueAC has been created
00379                 out.writeBoolean(true);
00380                 // send the queue of AC requests
00381                 out.writeObject(queueAC.getQueue());
00382                 // stop the ActiveQueue thread if this is not a checkpointing serialization
00383                 FTManager ftm = ((AbstractBody) (LocalBodyStore.getInstance()
00384                         .getLocalBody(this.ownerBody))).getFTManager();
00385                 if (ftm != null) {
00386                     if (!ftm.isACheckpoint()) {
00387                         queueAC.killMe();
00388                     }
00389                 } else {
00390                     queueAC.killMe();
00391                 }
00392             }
00393         }
00394     }
00395 
00396     private void readObject(java.io.ObjectInputStream in)
00397         throws java.io.IOException, ClassNotFoundException {
00398         in.defaultReadObject();
00399         unsetMigrationTag();
00400         if (acEnabled) {
00401             // if queueExists is true, ACqueue has been created
00402             boolean queueExists = in.readBoolean();
00403             if (queueExists){
00404                 // create a new ActiveACQueue
00405                 java.util.ArrayList<ACService> queue = (java.util.ArrayList<ACService>) (in.readObject());
00406                 queueAC = new ActiveACQueue(queue);
00407                 queueAC.start(); 
00408             }
00409         }
00410     }
00411 
00412     //--------------------------------INNER CLASS------------------------------------//
00413 
00421     private class ActiveACQueue extends Thread {
00422         private java.util.ArrayList<ACService> queue;
00423         private int counter;
00424         private boolean kill;
00425 
00426         //
00427         // -- CONSTRUCTORS -----------------------------------------------
00428         //
00429         public ActiveACQueue() {
00430             queue = new java.util.ArrayList<ACService>();
00431             counter = 0;
00432             kill = false;
00433             this.setName("Thread for AC");
00434         }
00435 
00436         public ActiveACQueue(java.util.ArrayList<ACService> queue) {
00437             this.queue = queue;
00438             counter = queue.size();
00439             kill = false;
00440             this.setName("Thread for AC");
00441         }
00442 
00443         //
00444         // -- PUBLIC METHODS -----------------------------------------------
00445         //
00446 
00450         public java.util.ArrayList<ACService> getQueue() {
00451             return queue;
00452         }
00453 
00457         public synchronized void addACRequest(ACService r) {
00458             queue.add(r);
00459             counter++;
00460             notifyAll();
00461         }
00462 
00466         public synchronized ACService removeACRequest() {
00467             counter--;
00468             return (queue.remove(0));
00469         }
00470 
00474         public synchronized void killMe() {
00475             kill = true;
00476             notifyAll();
00477         }
00478 
00479         public void run() {
00480             // get a reference on the owner body
00481             // try until it's not null because deserialization of the body 
00482             // may be not finished when we restart the thread.
00483             Body owner = null;
00484 
00485             while (true) {
00486                 // if there is no AC to do, wait...
00487                 waitForAC();
00488                 // if body is dead, kill the thread
00489                 if (kill) {
00490                     break;
00491                 }
00492                 while (owner == null) {
00493                     owner = LocalBodyStore.getInstance().getLocalBody(ownerBody);
00494                     // Associating the thred with the body
00495                     LocalBodyStore.getInstance().setCurrentThreadBody(owner);
00496                     // it's a halfbody...
00497                     if (owner == null) {
00498                         owner = LocalBodyStore.getInstance().getLocalHalfBody(ownerBody);
00499                         LocalBodyStore.getInstance().setCurrentThreadBody(owner);
00500                     }
00501                 }
00502 
00503                 // there are ACs to do !
00504                 try {
00505                     // enter in the threadStore 
00506                     owner.enterInThreadStore();
00507 
00508                     // if body has migrated, kill the thread
00509                     if (kill) {
00510                         break;
00511                     }
00512 
00513                     ACService toDo = this.removeACRequest();
00514                     if (toDo != null) {
00515                         toDo.doAutomaticContinuation();
00516                     }
00517 
00518                     // exit from the threadStore
00519                     owner.exitFromThreadStore();
00520                 } catch (Exception e2) {
00521                     // to unblock active object
00522                     owner.exitFromThreadStore();
00523                     throw new ProActiveRuntimeException("Error while sending reply for AC ",
00524                         e2);
00525                 }
00526             }
00527         }
00528 
00529         // synchronized wait on ACRequest queue
00530         private synchronized void waitForAC() {
00531             try {
00532                 while ((counter == 0) && !kill) {
00533                     wait();
00534                 }
00535             } catch (InterruptedException e) {
00536                 e.printStackTrace();
00537             }
00538         }
00539     }
00540 
00545     private class ACService implements java.io.Serializable {
00546         // bodies that have to be updated       
00547         private java.util.ArrayList dests;
00548 
00549         // reply to send
00550         private Reply reply;
00551 
00552         //
00553         // -- CONSTRUCTORS -----------------------------------------------
00554         //
00555         public ACService(java.util.ArrayList dests, Reply reply) {
00556             this.dests = dests;
00557             this.reply = reply;
00558         }
00559 
00560         //
00561         // -- PUBLIC METHODS -----------------------------------------------
00562         //
00563         public void doAutomaticContinuation() throws java.io.IOException {
00564             if (dests != null) {
00565                 for (int i = 0; i < dests.size(); i++) {
00566                     UniversalBody dest = (UniversalBody) (dests.get(i));
00567                     registerDestination(dest);
00568                     // FAULT-TOLERANCE
00569                     AbstractBody ownerBody = (AbstractBody) (LocalBodyStore.getInstance()
00570                                                                            .getLocalBody(FuturePool.this.ownerBody));
00571                     if (ownerBody == null) {
00572                         //this might be a halfbody !
00573                         ownerBody = (AbstractBody) (LocalBodyStore.getInstance()
00574                                                                   .getLocalHalfBody(FuturePool.this.ownerBody));
00575                     }
00576                     FTManager ftm = ownerBody.getFTManager();
00577                     if (ftm != null) {
00578                         ftm.sendReply(reply, dest);
00579                     } else {
00580                         //System.out.println("ACService.doAutomaticContinuation() : sending reply");
00581                         reply.send(dest);
00582                     }
00583                     removeDestination();
00584                 }
00585             }
00586         }
00587     } //ACService
00588 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1