00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.request;
00032
00033 import java.util.Iterator;
00034 import java.util.LinkedList;
00035
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.UniqueID;
00038 import org.objectweb.proactive.core.body.AbstractBody;
00039 import org.objectweb.proactive.core.event.RequestQueueEvent;
00040 import org.objectweb.proactive.core.group.spmd.MethodBarrier;
00041 import org.objectweb.proactive.core.group.spmd.MethodCallBarrierWithMethodName;
00042 import org.objectweb.proactive.core.group.spmd.ProActiveSPMDGroupManager;
00043 import org.objectweb.proactive.core.mop.MethodCall;
00044
00045
00046 public class BlockingRequestQueueImpl extends RequestQueueImpl
00047 implements java.io.Serializable, BlockingRequestQueue {
00048
00049
00050
00051 protected boolean shouldWait;
00052 private transient ProActiveSPMDGroupManager spmdManager = null;
00053 private boolean suspended = false;
00054 private boolean specialExecution = false;
00055 private String specialMethod = "";
00056 private LinkedList<MethodBarrier> methodBarriers = new LinkedList<MethodBarrier>();
00057
00058
00059
00060
00061 public BlockingRequestQueueImpl(UniqueID ownerID) {
00062 super(ownerID);
00063 shouldWait = true;
00064 }
00065
00066
00067
00068
00069 public synchronized void destroy() {
00070 super.clear();
00071 shouldWait = false;
00072 notifyAll();
00073 }
00074
00075 public synchronized boolean isDestroyed() {
00076 return !shouldWait;
00077 }
00078
00079 public synchronized int add(Request r) {
00080 int ftres = super.add(r);
00081 if (logger.isDebugEnabled()) {
00082 logger.debug("Adding request " + r.getMethodName());
00083 }
00084
00085
00086
00087 if (r instanceof AwaitedRequest) {
00088 this.notifyAll();
00089 return ftres;
00090 }
00091
00092
00093 if (this.methodBarriers.size() != 0) {
00094 Iterator<MethodBarrier> it = this.methodBarriers.iterator();
00095 boolean methodFound = false;
00096 MethodBarrier mb;
00097 while (it.hasNext() && !methodFound) {
00098 mb = it.next();
00099 methodFound = mb.checkMethod(r.getMethodName());
00100 if (methodFound) {
00101 if (mb.barrierOver()) {
00102 it.remove();
00103 if (this.methodBarriers.size() == 0) {
00104 this.resume();
00105 }
00106 }
00107 this.specialMethod = r.getMethodName();
00108 this.specialExecution = true;
00109 }
00110 }
00111 }
00112
00113
00114 if (r.getMethodCall() instanceof MethodCallBarrierWithMethodName) {
00115 MethodCallBarrierWithMethodName mcbwmn = (MethodCallBarrierWithMethodName) r.getMethodCall();
00116 this.methodBarriers.add(new MethodBarrier(mcbwmn.getMethodNames()));
00117 this.suspend();
00118 }
00119 this.notifyAll();
00120 return ftres;
00121 }
00122
00123 public synchronized int addToFront(Request r) {
00124 int ftres = super.addToFront(r);
00125 this.notifyAll();
00126 return ftres;
00127 }
00128
00129 public synchronized Request blockingRemoveOldest(
00130 RequestFilter requestFilter) {
00131 return blockingRemove(requestFilter, true, 0);
00132 }
00133
00134 public synchronized Request blockingRemoveOldest(
00135 RequestFilter requestFilter, long timeout) {
00136 return blockingRemove(requestFilter, true, timeout);
00137 }
00138
00139 public synchronized Request blockingRemoveOldest(String methodName) {
00140 requestFilterOnMethodName.setMethodName(methodName);
00141 return blockingRemove(requestFilterOnMethodName, true, 0);
00142 }
00143
00144 public synchronized Request blockingRemoveOldest() {
00145 return blockingRemove(null, true, 0);
00146 }
00147
00148 public synchronized Request blockingRemoveOldest(long timeout) {
00149 return blockingRemove(null, true, timeout);
00150 }
00151
00152 public synchronized Request blockingRemoveYoungest(
00153 RequestFilter requestFilter) {
00154 return blockingRemove(requestFilter, false);
00155 }
00156
00157 public synchronized Request blockingRemoveYoungest(
00158 RequestFilter requestFilter, long timeout) {
00159 return blockingRemove(requestFilter, false, timeout);
00160 }
00161
00162 public synchronized Request blockingRemoveYoungest(String methodName) {
00163 return blockingRemove(methodName, false);
00164 }
00165
00166 public synchronized Request blockingRemoveYoungest() {
00167 return blockingRemove(false);
00168 }
00169
00170 public synchronized Request blockingRemoveYoungest(long timeout) {
00171 return blockingRemove(timeout, false);
00172 }
00173
00174 public synchronized void waitForRequest() {
00175 while (isEmpty() && shouldWait) {
00176 if (hasListeners()) {
00177 notifyAllListeners(new RequestQueueEvent(ownerID,
00178 RequestQueueEvent.WAIT_FOR_REQUEST));
00179 }
00180 try {
00181 this.wait();
00182 } catch (InterruptedException e) {
00183 }
00184 }
00185 }
00186
00187
00188
00189
00190 protected Request blockingRemove(RequestFilter requestFilter, boolean oldest) {
00191 return blockingRemove(requestFilter, oldest, 0);
00192 }
00193
00194 protected Request blockingRemove(RequestFilter requestFilter,
00195 boolean oldest, long timeout) {
00196 if (oldest && (requestFilter == null) && (timeout == 0)) {
00197 if (this.spmdManager == null) {
00198 this.spmdManager = ((AbstractBody) ProActive.getBodyOnThis()).getProActiveSPMDGroupManager();
00199 }
00200 if(!spmdManager.isCurrentBarriersEmpty()) {
00201 return this.barrierBlockingRemove();
00202 }
00203 }
00204
00205 long timeStartWaiting = 0;
00206 if (timeout > 0) {
00207 timeStartWaiting = System.currentTimeMillis();
00208 }
00209 Request r = oldest
00210 ? ((requestFilter == null) ? removeOldest()
00211 : removeOldest(requestFilter))
00212 : ((requestFilter == null) ? removeYoungest()
00213 : removeYoungest(requestFilter));
00214 while ((r == null) && shouldWait) {
00215 if (hasListeners()) {
00216 notifyAllListeners(new RequestQueueEvent(ownerID,
00217 RequestQueueEvent.WAIT_FOR_REQUEST));
00218 }
00219 try {
00220 this.wait(timeout);
00221 } catch (InterruptedException e) {
00222 }
00223 r = oldest
00224 ? ((requestFilter == null) ? removeOldest()
00225 : removeOldest(requestFilter))
00226 : ((requestFilter == null) ? removeYoungest()
00227 : removeYoungest(requestFilter));
00228 if ((timeout != 0) &&
00229 ((System.currentTimeMillis() - timeStartWaiting) > timeout)) {
00230
00231 return r;
00232 }
00233 }
00234 return r;
00235 }
00236
00245 protected Request blockingRemove(String methodName, boolean oldest) {
00246 requestFilterOnMethodName.setMethodName(methodName);
00247 return blockingRemove(requestFilterOnMethodName, oldest, 0);
00248 }
00249
00257 protected Request blockingRemove(boolean oldest) {
00258 return blockingRemove(null, oldest, 0);
00259 }
00260
00270 protected Request blockingRemove(long timeout, boolean oldest) {
00271 return blockingRemove(null, oldest, timeout);
00272 }
00273
00280 protected Request barrierBlockingRemoveOldest(long timeout) {
00281 long timeStartWaiting = 0;
00282 if (timeout > 0) {
00283 timeStartWaiting = System.currentTimeMillis();
00284 }
00285 while (((this.isEmpty() && this.shouldWait) || this.suspended ||
00286 (this.indexOfRequestToServe() == -1)) &&
00287 !this.specialExecution) {
00288 if (hasListeners()) {
00289 notifyAllListeners(new RequestQueueEvent(ownerID,
00290 RequestQueueEvent.WAIT_FOR_REQUEST));
00291 }
00292 try {
00293 wait(timeout);
00294 } catch (InterruptedException e) {
00295 }
00296 if ((System.currentTimeMillis() - timeStartWaiting) > timeout) {
00297 return removeOldest();
00298 }
00299 }
00300 if (specialExecution) {
00301 specialExecution = false;
00302 return blockingRemoveOldest(specialMethod);
00303 }
00304 return barrierRemoveOldest();
00305 }
00306
00307 protected Request barrierRemoveOldest() {
00308 Request r = (Request) requestQueue.remove(indexOfRequestToServe());
00309 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00310 notifyAllListeners(new RequestQueueEvent(ownerID,
00311 RequestQueueEvent.REMOVE_REQUEST));
00312 }
00313 return r;
00314 }
00315
00322 protected Request barrierBlockingRemove() {
00323 while (((this.isEmpty() && this.shouldWait) || this.suspended ||
00324 (this.indexOfRequestToServe() == -1)) &&
00325 !this.specialExecution) {
00326 if (this.hasListeners()) {
00327 this.notifyAllListeners(new RequestQueueEvent(this.ownerID,
00328 RequestQueueEvent.WAIT_FOR_REQUEST));
00329 }
00330 try {
00331 this.wait();
00332 } catch (InterruptedException e) {
00333 }
00334 }
00335 if (this.specialExecution) {
00336 this.specialExecution = false;
00337 return this.blockingRemoveOldest(this.specialMethod);
00338 }
00339 return this.barrierRemoveOldest();
00340 }
00341
00346 public void suspend() {
00347 this.suspended = true;
00348 }
00349
00353 public void resume() {
00354 this.suspended = false;
00355 }
00356
00361 private int indexOfRequestToServe() {
00362
00363 if (this.spmdManager.isCurrentBarriersEmpty()) {
00364 return 0;
00365 } else {
00366 int index = -1;
00367 boolean isServable = false;
00368 Iterator it = this.requestQueue.iterator();
00369
00370
00371 while (!isServable && it.hasNext()) {
00372 index++;
00373 MethodCall mc = ((Request) it.next()).getMethodCall();
00374
00375
00376 if (mc == null) {
00377 return -1;
00378 }
00379 isServable = this.spmdManager.checkExecution(mc.getBarrierTags());
00380 }
00381 return isServable ? index : (-1);
00382 }
00383 }
00384 }