00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.core.body.request;
00032 
00033 import org.objectweb.proactive.Body;
00034 import org.objectweb.proactive.core.UniqueID;
00035 import org.objectweb.proactive.core.body.LocalBodyStore;
00036 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00037 import org.objectweb.proactive.core.event.*;
00038 import org.objectweb.proactive.core.util.CircularArrayList;
00039 
00040 
00041 public class RequestQueueImpl extends AbstractEventProducer
00042     implements java.io.Serializable, RequestQueue {
00043     
00044     
00045     
00046     protected CircularArrayList requestQueue;
00047     protected UniqueID ownerID;
00048     protected RequestFilterOnMethodName requestFilterOnMethodName;
00049     protected static final boolean SEND_ADD_REMOVE_EVENT = false;
00050     protected NonFunctionalRequestsProcessor nfRequestsProcessor;
00051     
00052     
00053     
00054     
00055     public RequestQueueImpl(UniqueID ownerID) {
00056         this.requestQueue = new CircularArrayList(20);
00057         this.ownerID = ownerID;
00058         this.requestFilterOnMethodName = new RequestFilterOnMethodName();
00059         this.nfRequestsProcessor = new NonFunctionalRequestsProcessor();
00060     }
00061 
00062     
00063     
00064     
00065     public java.util.Iterator iterator() {
00066         return requestQueue.iterator();
00067     }
00068 
00069     public CircularArrayList getInternalQueue() {
00070         return this.requestQueue;
00071     }
00072 
00073     public synchronized boolean isEmpty() {
00074         return requestQueue.isEmpty();
00075     }
00076 
00077     public synchronized int size() {
00078         return requestQueue.size();
00079     }
00080 
00081     public boolean hasRequest(String s) {
00082         return getOldest(s) != null;
00083     }
00084 
00085     public synchronized void clear() {
00086         requestQueue.clear();
00087     }
00088 
00089     public synchronized Request getOldest() {
00090         if (requestQueue.isEmpty()) {
00091             return null;
00092             
00093         } else if(!nfRequestsProcessor.isEmpty()){
00094                 return nfRequestsProcessor.getOldestPriorityNFRequest(false);
00095         } 
00096         return (Request) requestQueue.get(0);
00097     }
00098 
00099     public synchronized Request getOldest(String methodName) {
00100         requestFilterOnMethodName.setMethodName(methodName);
00101         return findOldest(requestFilterOnMethodName, false);
00102     }
00103 
00104     public synchronized Request getOldest(RequestFilter requestFilter) {
00105         return findOldest(requestFilter, false);
00106     }
00107 
00108     public synchronized Request removeOldest() {
00109         if (requestQueue.isEmpty()) {
00110             return null;
00111         } else if (!nfRequestsProcessor.isEmpty()) {
00112                 Request r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00113                 requestQueue.remove(r);
00114                 return r;
00115         }
00116         
00117         Request r = (Request) requestQueue.remove(0);
00118         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00119             notifyAllListeners(new RequestQueueEvent(ownerID,
00120                     RequestQueueEvent.REMOVE_REQUEST));
00121         }
00122         return r;
00123     }
00124 
00125     public synchronized Request removeOldest(String methodName) {
00126         requestFilterOnMethodName.setMethodName(methodName);
00127         return findOldest(requestFilterOnMethodName, true);
00128     }
00129 
00130     public synchronized Request removeOldest(RequestFilter requestFilter) {
00131         return findOldest(requestFilter, true);
00132     }
00133 
00134     public synchronized Request getYoungest() {
00135         if (requestQueue.isEmpty()) {
00136             return null;
00137         } else if (!nfRequestsProcessor.isEmpty()){
00138                 return nfRequestsProcessor.getYoungestPriorityNFRequest(false);
00139         }
00140         return (Request) requestQueue.get(requestQueue.size() - 1);
00141     }
00142 
00143     public synchronized Request getYoungest(String methodName) {
00144         requestFilterOnMethodName.setMethodName(methodName);
00145         return findYoungest(requestFilterOnMethodName, false);
00146     }
00147 
00148     public synchronized Request getYoungest(RequestFilter requestFilter) {
00149         return findYoungest(requestFilter, false);
00150     }
00151 
00152     public synchronized Request removeYoungest() {
00153         if (requestQueue.isEmpty()) {
00154             return null;
00155         }else if (!nfRequestsProcessor.isEmpty()) {
00156                 Request r = nfRequestsProcessor.getYoungestPriorityNFRequest(true);
00157                 requestQueue.remove(r);
00158                 return r;
00159         }
00160         Request r = (Request) requestQueue.remove(requestQueue.size() - 1);
00161         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00162             notifyAllListeners(new RequestQueueEvent(ownerID,
00163                     RequestQueueEvent.REMOVE_REQUEST));
00164         }
00165         return r;
00166     }
00167 
00168     public synchronized Request removeYoungest(String methodName) {
00169         requestFilterOnMethodName.setMethodName(methodName);
00170         return findYoungest(requestFilterOnMethodName, true);
00171     }
00172 
00173     public synchronized Request removeYoungest(RequestFilter requestFilter) {
00174         return findYoungest(requestFilter, true);
00175     }
00176 
00177     public synchronized int add(Request request) {
00178         
00179         
00180         int ftres = FTManager.NON_FT;
00181         FTManager ftm = request.getFTManager();
00182         if (ftm != null) {
00183             
00184             ftres = ftm.onDeliverRequest(request);
00185             if (request.ignoreIt()) {
00186                 return ftres;
00187             }
00188         }
00189         
00190         
00191         int priority = request.getNFRequestPriority();
00192         if(priority == Request.NFREQUEST_IMMEDIATE_PRIORITY || priority == Request.NFREQUEST_PRIORITY) {
00193                 nfRequestsProcessor.addToNFRequestsQueue(request);
00194         }
00195        
00196         requestQueue.add(request);
00197         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00198             notifyAllListeners(new RequestQueueEvent(ownerID,
00199                     RequestQueueEvent.ADD_REQUEST));
00200         }
00201 
00202         return ftres;
00203     }
00204 
00205     public synchronized int addToFront(Request request) {
00206         int ftres = 0;
00207         
00208         
00209         int priority = request.getNFRequestPriority();
00210         if(priority == Request.NFREQUEST_IMMEDIATE_PRIORITY || priority == Request.NFREQUEST_PRIORITY) {
00211                 nfRequestsProcessor.addToNFRequestsQueue(request);
00212         }
00213         
00214         requestQueue.add(0, request);
00215         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00216             notifyAllListeners(new RequestQueueEvent(ownerID,
00217                     RequestQueueEvent.ADD_REQUEST));
00218         }
00219         return ftres;
00220     }
00221 
00222     public synchronized void processRequests(RequestProcessor processor,
00223         Body body) {
00224         for (int i = 0; i < requestQueue.size(); i++) {
00225             Request r ;
00226             
00227             
00228             while (!nfRequestsProcessor.isEmpty()) {
00229                 r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00230                 LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00231                 requestQueue.remove(r); 
00232             }
00233             if(requestQueue.isEmpty()) {
00234                 return;
00235             }
00236             
00237             
00238             r = (Request) requestQueue.get(i);
00239             int result = processor.processRequest(r);
00240             switch (result) {
00241             case RequestProcessor.REMOVE_AND_SERVE:
00242                 requestQueue.remove(i);
00243                 i--;
00244                 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00245                     notifyAllListeners(new RequestQueueEvent(ownerID,
00246                             RequestQueueEvent.REMOVE_REQUEST));
00247                 }
00248                 body.serve(r);
00249                 break;
00250             case RequestProcessor.REMOVE:
00251                 requestQueue.remove(i);
00252                 i--;
00253                 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00254                     notifyAllListeners(new RequestQueueEvent(ownerID,
00255                             RequestQueueEvent.REMOVE_REQUEST));
00256                 }
00257                 break;
00258             case RequestProcessor.KEEP:
00259                 break;
00260             }
00261         }
00262     }
00263 
00264     public synchronized String toString() {
00265         StringBuffer sb = new StringBuffer();
00266         sb.append("--- RequestQueueImpl n=").append(requestQueue.size()).append("   requests --- ->\n");
00267         int count = 0;
00268         java.util.Iterator iterator = requestQueue.iterator();
00269         while (iterator.hasNext()) {
00270             Request currentrequest = (Request) iterator.next();
00271             sb.append(count).append("--> ")
00272               .append(currentrequest.getMethodName()).append("\n");
00273             count++;
00274         }
00275         sb.append("--- End RequestQueueImpl ---");
00276         sb.append("\n" + nfRequestsProcessor.toString());
00277         return sb.toString(); 
00278     }
00279 
00280     public void addRequestQueueEventListener(RequestQueueEventListener listener) {
00281         addListener(listener);
00282     }
00283 
00284     public void removeRequestQueueEventListener(
00285         RequestQueueEventListener listener) {
00286         removeListener(listener);
00287     }
00288 
00289     
00290     
00291     
00292     protected void notifyOneListener(ProActiveListener listener,
00293         ProActiveEvent event) {
00294         ((RequestQueueEventListener) listener).requestQueueModified((RequestQueueEvent) event);
00295     }
00296 
00297     
00298     
00299     
00300 
00309     private Request findOldest(RequestFilter requestFilter, boolean shouldRemove) {
00310         java.util.Iterator iterator;
00311         Request r;
00312         
00313         
00314         if(shouldRemove) {
00315                 while (!nfRequestsProcessor.isEmpty()) {
00316                         r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00317                         LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00318                         requestQueue.remove(r); 
00319                 }
00320         }
00321         
00322          iterator = requestQueue.iterator();
00323         
00324         while (iterator.hasNext()) {
00325              r = (Request) iterator.next();
00326             if (requestFilter.acceptRequest(r)) {
00327                 if (shouldRemove) {
00328                     iterator.remove();
00329                     if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00330                         notifyAllListeners(new RequestQueueEvent(ownerID,
00331                                 RequestQueueEvent.REMOVE_REQUEST));
00332                     }
00333                 }
00334                 return r;
00335             }
00336         }
00337         return null;
00338     }
00339 
00348     private Request findYoungest(RequestFilter requestFilter,
00349         boolean shouldRemove) {
00350         Request r;
00351         
00352         
00353         while (!nfRequestsProcessor.isEmpty()) {
00354                 r = nfRequestsProcessor.getYoungestPriorityNFRequest(true);
00355                 LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00356                 requestQueue.remove(r); 
00357         }
00358         
00359         
00360         java.util.ListIterator iterator = requestQueue.listIterator(requestQueue.size());
00361         while (iterator.hasPrevious()) {
00362              r = (Request) iterator.previous();
00363             if (requestFilter.acceptRequest(r)) {
00364                 if (shouldRemove) {
00365                     iterator.remove();
00366                     if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00367                         notifyAllListeners(new RequestQueueEvent(ownerID,
00368                                 RequestQueueEvent.REMOVE_REQUEST));
00369                     }
00370                 }
00371                 return r;
00372             }
00373         }
00374         return null;
00375     }
00376 
00377     private void writeObject(java.io.ObjectOutputStream out)
00378         throws java.io.IOException {
00379         
00380         org.objectweb.proactive.Body owner = LocalBodyStore.getInstance()
00381                                                            .getLocalBody(ownerID);
00382         owner.getFuturePool().setMigrationTag();
00383         out.defaultWriteObject();
00384     }
00385 
00386       
00387     
00388     
00389     
00390     protected class RequestFilterOnMethodName implements RequestFilter,
00391         java.io.Serializable {
00392         private String methodName;
00393 
00394         public RequestFilterOnMethodName() {
00395         }
00396 
00397         public boolean acceptRequest(Request request) {
00398             return methodName.equals(request.getMethodName());
00399         }
00400 
00401         public void setMethodName(String methodName) {
00402             this.methodName = methodName;
00403         }
00404     }
00405 }