org/objectweb/proactive/core/body/request/BlockingRequestQueueImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.request;
00032 
00033 import java.util.Iterator;
00034 import java.util.LinkedList;
00035 
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.UniqueID;
00038 import org.objectweb.proactive.core.body.AbstractBody;
00039 import org.objectweb.proactive.core.event.RequestQueueEvent;
00040 import org.objectweb.proactive.core.group.spmd.MethodBarrier;
00041 import org.objectweb.proactive.core.group.spmd.MethodCallBarrierWithMethodName;
00042 import org.objectweb.proactive.core.group.spmd.ProActiveSPMDGroupManager;
00043 import org.objectweb.proactive.core.mop.MethodCall;
00044 
00045 
00046 public class BlockingRequestQueueImpl extends RequestQueueImpl
00047     implements java.io.Serializable, BlockingRequestQueue {
00048     //
00049     // -- PROTECTED MEMBERS -----------------------------------------------
00050     //
00051     protected boolean shouldWait;
00052     private transient ProActiveSPMDGroupManager spmdManager = null;
00053     private boolean suspended = false;
00054     private boolean specialExecution = false;
00055     private String specialMethod = "";
00056     private LinkedList<MethodBarrier> methodBarriers = new LinkedList<MethodBarrier>();
00057 
00058     //
00059     // -- CONSTRUCTORS -----------------------------------------------
00060     //
00061     public BlockingRequestQueueImpl(UniqueID ownerID) {
00062         super(ownerID);
00063         shouldWait = true;
00064     }
00065 
00066     //
00067     // -- PUBLIC METHODS -----------------------------------------------
00068     //
00069     public synchronized void destroy() {
00070         super.clear();
00071         shouldWait = false;
00072         notifyAll();
00073     }
00074 
00075     public synchronized boolean isDestroyed() {
00076         return !shouldWait;
00077     }
00078 
00079     public synchronized int add(Request r) {
00080         int ftres = super.add(r);
00081         if (logger.isDebugEnabled()) {
00082             logger.debug("Adding request " + r.getMethodName());
00083         }
00084 
00085         // FAULT-TOLERANCE
00086         // STILL NOT OOSPMD COMPLIANT !
00087         if (r instanceof AwaitedRequest) {
00088             this.notifyAll();
00089             return ftres;
00090         }
00091 
00092         // if there is a "method based barrier"
00093         if (this.methodBarriers.size() != 0) {
00094             Iterator<MethodBarrier> it = this.methodBarriers.iterator();
00095             boolean methodFound = false;
00096             MethodBarrier mb;
00097             while (it.hasNext() && !methodFound) {
00098                 mb = it.next();
00099                 methodFound = mb.checkMethod(r.getMethodName());
00100                 if (methodFound) {
00101                     if (mb.barrierOver()) {
00102                         it.remove();
00103                         if (this.methodBarriers.size() == 0) {
00104                             this.resume();
00105                         }
00106                     }
00107                     this.specialMethod = r.getMethodName();
00108                     this.specialExecution = true;
00109                 }
00110             }
00111         }
00112 
00113         // a "method based barrier" => stop the activity of this AO
00114         if (r.getMethodCall() instanceof MethodCallBarrierWithMethodName) {
00115             MethodCallBarrierWithMethodName mcbwmn = (MethodCallBarrierWithMethodName) r.getMethodCall();
00116             this.methodBarriers.add(new MethodBarrier(mcbwmn.getMethodNames()));
00117             this.suspend();
00118         }
00119         this.notifyAll();
00120         return ftres;
00121     }
00122 
00123     public synchronized int addToFront(Request r) {
00124         int ftres = super.addToFront(r);
00125         this.notifyAll();
00126         return ftres;
00127     }
00128 
00129     public synchronized Request blockingRemoveOldest(
00130         RequestFilter requestFilter) {
00131         return blockingRemove(requestFilter, true, 0);
00132     }
00133 
00134     public synchronized Request blockingRemoveOldest(
00135         RequestFilter requestFilter, long timeout) {
00136         return blockingRemove(requestFilter, true, timeout);
00137     }
00138 
00139     public synchronized Request blockingRemoveOldest(String methodName) {
00140         requestFilterOnMethodName.setMethodName(methodName);
00141         return blockingRemove(requestFilterOnMethodName, true, 0);
00142     }
00143 
00144     public synchronized Request blockingRemoveOldest() {
00145         return blockingRemove(null, true, 0);
00146     }
00147 
00148     public synchronized Request blockingRemoveOldest(long timeout) {
00149         return blockingRemove(null, true, timeout);
00150     }
00151 
00152     public synchronized Request blockingRemoveYoungest(
00153         RequestFilter requestFilter) {
00154         return blockingRemove(requestFilter, false);
00155     }
00156 
00157     public synchronized Request blockingRemoveYoungest(
00158         RequestFilter requestFilter, long timeout) {
00159         return blockingRemove(requestFilter, false, timeout);
00160     }
00161 
00162     public synchronized Request blockingRemoveYoungest(String methodName) {
00163         return blockingRemove(methodName, false);
00164     }
00165 
00166     public synchronized Request blockingRemoveYoungest() {
00167         return blockingRemove(false);
00168     }
00169 
00170     public synchronized Request blockingRemoveYoungest(long timeout) {
00171         return blockingRemove(timeout, false);
00172     }
00173 
00174     public synchronized void waitForRequest() {
00175         while (isEmpty() && shouldWait) {
00176             if (hasListeners()) {
00177                 notifyAllListeners(new RequestQueueEvent(ownerID,
00178                         RequestQueueEvent.WAIT_FOR_REQUEST));
00179             }
00180             try {
00181                 this.wait();
00182             } catch (InterruptedException e) {
00183             }
00184         }
00185     }
00186 
00187     //
00188     // -- PRIVATE METHODS -----------------------------------------------
00189     //
00190     protected Request blockingRemove(RequestFilter requestFilter, boolean oldest) {
00191         return blockingRemove(requestFilter, oldest, 0);
00192     }
00193 
00194     protected Request blockingRemove(RequestFilter requestFilter,
00195         boolean oldest, long timeout) {
00196         if (oldest && (requestFilter == null) && (timeout == 0)) {
00197             if (this.spmdManager == null) {
00198                 this.spmdManager = ((AbstractBody) ProActive.getBodyOnThis()).getProActiveSPMDGroupManager();
00199             }
00200             if(!spmdManager.isCurrentBarriersEmpty()) {
00201             return this.barrierBlockingRemove(); // the oospmd way ...
00202             }
00203         }
00204 
00205         long timeStartWaiting = 0;
00206         if (timeout > 0) {
00207             timeStartWaiting = System.currentTimeMillis();
00208         }
00209         Request r = oldest
00210             ? ((requestFilter == null) ? removeOldest()
00211                                        : removeOldest(requestFilter))
00212             : ((requestFilter == null) ? removeYoungest()
00213                                        : removeYoungest(requestFilter));
00214         while ((r == null) && shouldWait) {
00215             if (hasListeners()) {
00216                 notifyAllListeners(new RequestQueueEvent(ownerID,
00217                         RequestQueueEvent.WAIT_FOR_REQUEST));
00218             }
00219             try {
00220                 this.wait(timeout);
00221             } catch (InterruptedException e) {
00222             }
00223             r = oldest
00224                 ? ((requestFilter == null) ? removeOldest()
00225                                            : removeOldest(requestFilter))
00226                 : ((requestFilter == null) ? removeYoungest()
00227                                            : removeYoungest(requestFilter));
00228             if ((timeout != 0) &&
00229                     ((System.currentTimeMillis() - timeStartWaiting) > timeout)) {
00230                 // force return when timeout exceeded
00231                 return r;
00232             }
00233         }
00234         return r;
00235     }
00236 
00245     protected Request blockingRemove(String methodName, boolean oldest) {
00246         requestFilterOnMethodName.setMethodName(methodName);
00247         return blockingRemove(requestFilterOnMethodName, oldest, 0);
00248     }
00249 
00257     protected Request blockingRemove(boolean oldest) {
00258         return blockingRemove(null, oldest, 0);
00259     }
00260 
00270     protected Request blockingRemove(long timeout, boolean oldest) {
00271         return blockingRemove(null, oldest, timeout);
00272     }
00273 
00280     protected Request barrierBlockingRemoveOldest(long timeout) {
00281         long timeStartWaiting = 0;
00282         if (timeout > 0) {
00283             timeStartWaiting = System.currentTimeMillis();
00284         }
00285         while (((this.isEmpty() && this.shouldWait) || this.suspended ||
00286                 (this.indexOfRequestToServe() == -1)) &&
00287                 !this.specialExecution) {
00288             if (hasListeners()) {
00289                 notifyAllListeners(new RequestQueueEvent(ownerID,
00290                         RequestQueueEvent.WAIT_FOR_REQUEST));
00291             }
00292             try {
00293                 wait(timeout);
00294             } catch (InterruptedException e) {
00295             }
00296             if ((System.currentTimeMillis() - timeStartWaiting) > timeout) {
00297                 return removeOldest();
00298             }
00299         }
00300         if (specialExecution) {
00301             specialExecution = false;
00302             return blockingRemoveOldest(specialMethod);
00303         }
00304         return barrierRemoveOldest();
00305     }
00306 
00307     protected Request barrierRemoveOldest() {
00308         Request r = (Request) requestQueue.remove(indexOfRequestToServe());
00309         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00310             notifyAllListeners(new RequestQueueEvent(ownerID,
00311                     RequestQueueEvent.REMOVE_REQUEST));
00312         }
00313         return r;
00314     }
00315 
00322     protected Request barrierBlockingRemove() {
00323         while (((this.isEmpty() && this.shouldWait) || this.suspended ||
00324                 (this.indexOfRequestToServe() == -1)) &&
00325                 !this.specialExecution) {
00326             if (this.hasListeners()) {
00327                 this.notifyAllListeners(new RequestQueueEvent(this.ownerID,
00328                         RequestQueueEvent.WAIT_FOR_REQUEST));
00329             }
00330             try {
00331                 this.wait();
00332             } catch (InterruptedException e) {
00333             }
00334         }
00335         if (this.specialExecution) {
00336             this.specialExecution = false;
00337             return this.blockingRemoveOldest(this.specialMethod);
00338         }
00339         return this.barrierRemoveOldest();
00340     }
00341 
00346     public void suspend() {
00347         this.suspended = true;
00348     }
00349 
00353     public void resume() {
00354         this.suspended = false;
00355     }
00356 
00361     private int indexOfRequestToServe() {
00362         // if there is no barrier currently active, avoid the iteration
00363         if (this.spmdManager.isCurrentBarriersEmpty()) {
00364             return 0;
00365         } else { // there is at least one active barrier
00366             int index = -1;
00367             boolean isServable = false;
00368             Iterator it = this.requestQueue.iterator();
00369 
00370             // look for the first request in the queue we can serve
00371             while (!isServable && it.hasNext()) {
00372                 index++;
00373                 MethodCall mc = ((Request) it.next()).getMethodCall();
00374 
00375                 // FT : mc could be an awaited request
00376                 if (mc == null) {
00377                     return -1;
00378                 }
00379                 isServable = this.spmdManager.checkExecution(mc.getBarrierTags());
00380             }
00381             return isServable ? index : (-1);
00382         }
00383     }
00384 }

Generated on Mon Jan 22 15:16:06 2007 for ProActive by  doxygen 1.5.1