00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.request;
00032
00033 import org.objectweb.proactive.Body;
00034 import org.objectweb.proactive.core.UniqueID;
00035 import org.objectweb.proactive.core.body.LocalBodyStore;
00036 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00037 import org.objectweb.proactive.core.event.*;
00038 import org.objectweb.proactive.core.util.CircularArrayList;
00039
00040
00041 public class RequestQueueImpl extends AbstractEventProducer
00042 implements java.io.Serializable, RequestQueue {
00043
00044
00045
00046 protected CircularArrayList requestQueue;
00047 protected UniqueID ownerID;
00048 protected RequestFilterOnMethodName requestFilterOnMethodName;
00049 protected static final boolean SEND_ADD_REMOVE_EVENT = false;
00050 protected NonFunctionalRequestsProcessor nfRequestsProcessor;
00051
00052
00053
00054
00055 public RequestQueueImpl(UniqueID ownerID) {
00056 this.requestQueue = new CircularArrayList(20);
00057 this.ownerID = ownerID;
00058 this.requestFilterOnMethodName = new RequestFilterOnMethodName();
00059 this.nfRequestsProcessor = new NonFunctionalRequestsProcessor();
00060 }
00061
00062
00063
00064
00065 public java.util.Iterator iterator() {
00066 return requestQueue.iterator();
00067 }
00068
00069 public CircularArrayList getInternalQueue() {
00070 return this.requestQueue;
00071 }
00072
00073 public synchronized boolean isEmpty() {
00074 return requestQueue.isEmpty();
00075 }
00076
00077 public synchronized int size() {
00078 return requestQueue.size();
00079 }
00080
00081 public boolean hasRequest(String s) {
00082 return getOldest(s) != null;
00083 }
00084
00085 public synchronized void clear() {
00086 requestQueue.clear();
00087 }
00088
00089 public synchronized Request getOldest() {
00090 if (requestQueue.isEmpty()) {
00091 return null;
00092
00093 } else if(!nfRequestsProcessor.isEmpty()){
00094 return nfRequestsProcessor.getOldestPriorityNFRequest(false);
00095 }
00096 return (Request) requestQueue.get(0);
00097 }
00098
00099 public synchronized Request getOldest(String methodName) {
00100 requestFilterOnMethodName.setMethodName(methodName);
00101 return findOldest(requestFilterOnMethodName, false);
00102 }
00103
00104 public synchronized Request getOldest(RequestFilter requestFilter) {
00105 return findOldest(requestFilter, false);
00106 }
00107
00108 public synchronized Request removeOldest() {
00109 if (requestQueue.isEmpty()) {
00110 return null;
00111 } else if (!nfRequestsProcessor.isEmpty()) {
00112 Request r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00113 requestQueue.remove(r);
00114 return r;
00115 }
00116
00117 Request r = (Request) requestQueue.remove(0);
00118 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00119 notifyAllListeners(new RequestQueueEvent(ownerID,
00120 RequestQueueEvent.REMOVE_REQUEST));
00121 }
00122 return r;
00123 }
00124
00125 public synchronized Request removeOldest(String methodName) {
00126 requestFilterOnMethodName.setMethodName(methodName);
00127 return findOldest(requestFilterOnMethodName, true);
00128 }
00129
00130 public synchronized Request removeOldest(RequestFilter requestFilter) {
00131 return findOldest(requestFilter, true);
00132 }
00133
00134 public synchronized Request getYoungest() {
00135 if (requestQueue.isEmpty()) {
00136 return null;
00137 } else if (!nfRequestsProcessor.isEmpty()){
00138 return nfRequestsProcessor.getYoungestPriorityNFRequest(false);
00139 }
00140 return (Request) requestQueue.get(requestQueue.size() - 1);
00141 }
00142
00143 public synchronized Request getYoungest(String methodName) {
00144 requestFilterOnMethodName.setMethodName(methodName);
00145 return findYoungest(requestFilterOnMethodName, false);
00146 }
00147
00148 public synchronized Request getYoungest(RequestFilter requestFilter) {
00149 return findYoungest(requestFilter, false);
00150 }
00151
00152 public synchronized Request removeYoungest() {
00153 if (requestQueue.isEmpty()) {
00154 return null;
00155 }else if (!nfRequestsProcessor.isEmpty()) {
00156 Request r = nfRequestsProcessor.getYoungestPriorityNFRequest(true);
00157 requestQueue.remove(r);
00158 return r;
00159 }
00160 Request r = (Request) requestQueue.remove(requestQueue.size() - 1);
00161 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00162 notifyAllListeners(new RequestQueueEvent(ownerID,
00163 RequestQueueEvent.REMOVE_REQUEST));
00164 }
00165 return r;
00166 }
00167
00168 public synchronized Request removeYoungest(String methodName) {
00169 requestFilterOnMethodName.setMethodName(methodName);
00170 return findYoungest(requestFilterOnMethodName, true);
00171 }
00172
00173 public synchronized Request removeYoungest(RequestFilter requestFilter) {
00174 return findYoungest(requestFilter, true);
00175 }
00176
00177 public synchronized int add(Request request) {
00178
00179
00180 int ftres = FTManager.NON_FT;
00181 FTManager ftm = request.getFTManager();
00182 if (ftm != null) {
00183
00184 ftres = ftm.onDeliverRequest(request);
00185 if (request.ignoreIt()) {
00186 return ftres;
00187 }
00188 }
00189
00190
00191 int priority = request.getNFRequestPriority();
00192 if(priority == Request.NFREQUEST_IMMEDIATE_PRIORITY || priority == Request.NFREQUEST_PRIORITY) {
00193 nfRequestsProcessor.addToNFRequestsQueue(request);
00194 }
00195
00196 requestQueue.add(request);
00197 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00198 notifyAllListeners(new RequestQueueEvent(ownerID,
00199 RequestQueueEvent.ADD_REQUEST));
00200 }
00201
00202 return ftres;
00203 }
00204
00205 public synchronized int addToFront(Request request) {
00206 int ftres = 0;
00207
00208
00209 int priority = request.getNFRequestPriority();
00210 if(priority == Request.NFREQUEST_IMMEDIATE_PRIORITY || priority == Request.NFREQUEST_PRIORITY) {
00211 nfRequestsProcessor.addToNFRequestsQueue(request);
00212 }
00213
00214 requestQueue.add(0, request);
00215 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00216 notifyAllListeners(new RequestQueueEvent(ownerID,
00217 RequestQueueEvent.ADD_REQUEST));
00218 }
00219 return ftres;
00220 }
00221
00222 public synchronized void processRequests(RequestProcessor processor,
00223 Body body) {
00224 for (int i = 0; i < requestQueue.size(); i++) {
00225 Request r ;
00226
00227
00228 while (!nfRequestsProcessor.isEmpty()) {
00229 r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00230 LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00231 requestQueue.remove(r);
00232 }
00233 if(requestQueue.isEmpty()) {
00234 return;
00235 }
00236
00237
00238 r = (Request) requestQueue.get(i);
00239 int result = processor.processRequest(r);
00240 switch (result) {
00241 case RequestProcessor.REMOVE_AND_SERVE:
00242 requestQueue.remove(i);
00243 i--;
00244 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00245 notifyAllListeners(new RequestQueueEvent(ownerID,
00246 RequestQueueEvent.REMOVE_REQUEST));
00247 }
00248 body.serve(r);
00249 break;
00250 case RequestProcessor.REMOVE:
00251 requestQueue.remove(i);
00252 i--;
00253 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00254 notifyAllListeners(new RequestQueueEvent(ownerID,
00255 RequestQueueEvent.REMOVE_REQUEST));
00256 }
00257 break;
00258 case RequestProcessor.KEEP:
00259 break;
00260 }
00261 }
00262 }
00263
00264 public synchronized String toString() {
00265 StringBuffer sb = new StringBuffer();
00266 sb.append("--- RequestQueueImpl n=").append(requestQueue.size()).append(" requests --- ->\n");
00267 int count = 0;
00268 java.util.Iterator iterator = requestQueue.iterator();
00269 while (iterator.hasNext()) {
00270 Request currentrequest = (Request) iterator.next();
00271 sb.append(count).append("--> ")
00272 .append(currentrequest.getMethodName()).append("\n");
00273 count++;
00274 }
00275 sb.append("--- End RequestQueueImpl ---");
00276 sb.append("\n" + nfRequestsProcessor.toString());
00277 return sb.toString();
00278 }
00279
00280 public void addRequestQueueEventListener(RequestQueueEventListener listener) {
00281 addListener(listener);
00282 }
00283
00284 public void removeRequestQueueEventListener(
00285 RequestQueueEventListener listener) {
00286 removeListener(listener);
00287 }
00288
00289
00290
00291
00292 protected void notifyOneListener(ProActiveListener listener,
00293 ProActiveEvent event) {
00294 ((RequestQueueEventListener) listener).requestQueueModified((RequestQueueEvent) event);
00295 }
00296
00297
00298
00299
00300
00309 private Request findOldest(RequestFilter requestFilter, boolean shouldRemove) {
00310 java.util.Iterator iterator;
00311 Request r;
00312
00313
00314 if(shouldRemove) {
00315 while (!nfRequestsProcessor.isEmpty()) {
00316 r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00317 LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00318 requestQueue.remove(r);
00319 }
00320 }
00321
00322 iterator = requestQueue.iterator();
00323
00324 while (iterator.hasNext()) {
00325 r = (Request) iterator.next();
00326 if (requestFilter.acceptRequest(r)) {
00327 if (shouldRemove) {
00328 iterator.remove();
00329 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00330 notifyAllListeners(new RequestQueueEvent(ownerID,
00331 RequestQueueEvent.REMOVE_REQUEST));
00332 }
00333 }
00334 return r;
00335 }
00336 }
00337 return null;
00338 }
00339
00348 private Request findYoungest(RequestFilter requestFilter,
00349 boolean shouldRemove) {
00350 Request r;
00351
00352
00353 while (!nfRequestsProcessor.isEmpty()) {
00354 r = nfRequestsProcessor.getYoungestPriorityNFRequest(true);
00355 LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00356 requestQueue.remove(r);
00357 }
00358
00359
00360 java.util.ListIterator iterator = requestQueue.listIterator(requestQueue.size());
00361 while (iterator.hasPrevious()) {
00362 r = (Request) iterator.previous();
00363 if (requestFilter.acceptRequest(r)) {
00364 if (shouldRemove) {
00365 iterator.remove();
00366 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00367 notifyAllListeners(new RequestQueueEvent(ownerID,
00368 RequestQueueEvent.REMOVE_REQUEST));
00369 }
00370 }
00371 return r;
00372 }
00373 }
00374 return null;
00375 }
00376
00377 private void writeObject(java.io.ObjectOutputStream out)
00378 throws java.io.IOException {
00379
00380 org.objectweb.proactive.Body owner = LocalBodyStore.getInstance()
00381 .getLocalBody(ownerID);
00382 owner.getFuturePool().setMigrationTag();
00383 out.defaultWriteObject();
00384 }
00385
00386
00387
00388
00389
00390 protected class RequestFilterOnMethodName implements RequestFilter,
00391 java.io.Serializable {
00392 private String methodName;
00393
00394 public RequestFilterOnMethodName() {
00395 }
00396
00397 public boolean acceptRequest(Request request) {
00398 return methodName.equals(request.getMethodName());
00399 }
00400
00401 public void setMethodName(String methodName) {
00402 this.methodName = methodName;
00403 }
00404 }
00405 }