org/objectweb/proactive/core/body/request/RequestQueueImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.request;
00032 
00033 import org.objectweb.proactive.Body;
00034 import org.objectweb.proactive.core.UniqueID;
00035 import org.objectweb.proactive.core.body.LocalBodyStore;
00036 import org.objectweb.proactive.core.body.ft.protocols.FTManager;
00037 import org.objectweb.proactive.core.event.*;
00038 import org.objectweb.proactive.core.util.CircularArrayList;
00039 
00040 
00041 public class RequestQueueImpl extends AbstractEventProducer
00042     implements java.io.Serializable, RequestQueue {
00043     //
00044     // -- PROTECTED MEMBERS -----------------------------------------------
00045     //
00046     protected CircularArrayList requestQueue;
00047     protected UniqueID ownerID;
00048     protected RequestFilterOnMethodName requestFilterOnMethodName;
00049     protected static final boolean SEND_ADD_REMOVE_EVENT = false;
00050     protected NonFunctionalRequestsProcessor nfRequestsProcessor;
00051     
00052     //
00053     // -- CONSTRUCTORS -----------------------------------------------
00054     //
00055     public RequestQueueImpl(UniqueID ownerID) {
00056         this.requestQueue = new CircularArrayList(20);
00057         this.ownerID = ownerID;
00058         this.requestFilterOnMethodName = new RequestFilterOnMethodName();
00059         this.nfRequestsProcessor = new NonFunctionalRequestsProcessor();
00060     }
00061 
00062     //
00063     // -- PUBLIC METHODS -----------------------------------------------
00064     //
00065     public java.util.Iterator iterator() {
00066         return requestQueue.iterator();
00067     }
00068 
00069     public CircularArrayList getInternalQueue() {
00070         return this.requestQueue;
00071     }
00072 
00073     public synchronized boolean isEmpty() {
00074         return requestQueue.isEmpty();
00075     }
00076 
00077     public synchronized int size() {
00078         return requestQueue.size();
00079     }
00080 
00081     public boolean hasRequest(String s) {
00082         return getOldest(s) != null;
00083     }
00084 
00085     public synchronized void clear() {
00086         requestQueue.clear();
00087     }
00088 
00089     public synchronized Request getOldest() {
00090         if (requestQueue.isEmpty()) {
00091             return null;
00092             //serves the non functional requests first.
00093         } else if(!nfRequestsProcessor.isEmpty()){
00094                 return nfRequestsProcessor.getOldestPriorityNFRequest(false);
00095         } 
00096         return (Request) requestQueue.get(0);
00097     }
00098 
00099     public synchronized Request getOldest(String methodName) {
00100         requestFilterOnMethodName.setMethodName(methodName);
00101         return findOldest(requestFilterOnMethodName, false);
00102     }
00103 
00104     public synchronized Request getOldest(RequestFilter requestFilter) {
00105         return findOldest(requestFilter, false);
00106     }
00107 
00108     public synchronized Request removeOldest() {
00109         if (requestQueue.isEmpty()) {
00110             return null;
00111         } else if (!nfRequestsProcessor.isEmpty()) {
00112                 Request r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00113                 requestQueue.remove(r);
00114                 return r;
00115         }
00116         
00117         Request r = (Request) requestQueue.remove(0);
00118         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00119             notifyAllListeners(new RequestQueueEvent(ownerID,
00120                     RequestQueueEvent.REMOVE_REQUEST));
00121         }
00122         return r;
00123     }
00124 
00125     public synchronized Request removeOldest(String methodName) {
00126         requestFilterOnMethodName.setMethodName(methodName);
00127         return findOldest(requestFilterOnMethodName, true);
00128     }
00129 
00130     public synchronized Request removeOldest(RequestFilter requestFilter) {
00131         return findOldest(requestFilter, true);
00132     }
00133 
00134     public synchronized Request getYoungest() {
00135         if (requestQueue.isEmpty()) {
00136             return null;
00137         } else if (!nfRequestsProcessor.isEmpty()){
00138                 return nfRequestsProcessor.getYoungestPriorityNFRequest(false);
00139         }
00140         return (Request) requestQueue.get(requestQueue.size() - 1);
00141     }
00142 
00143     public synchronized Request getYoungest(String methodName) {
00144         requestFilterOnMethodName.setMethodName(methodName);
00145         return findYoungest(requestFilterOnMethodName, false);
00146     }
00147 
00148     public synchronized Request getYoungest(RequestFilter requestFilter) {
00149         return findYoungest(requestFilter, false);
00150     }
00151 
00152     public synchronized Request removeYoungest() {
00153         if (requestQueue.isEmpty()) {
00154             return null;
00155         }else if (!nfRequestsProcessor.isEmpty()) {
00156                 Request r = nfRequestsProcessor.getYoungestPriorityNFRequest(true);
00157                 requestQueue.remove(r);
00158                 return r;
00159         }
00160         Request r = (Request) requestQueue.remove(requestQueue.size() - 1);
00161         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00162             notifyAllListeners(new RequestQueueEvent(ownerID,
00163                     RequestQueueEvent.REMOVE_REQUEST));
00164         }
00165         return r;
00166     }
00167 
00168     public synchronized Request removeYoungest(String methodName) {
00169         requestFilterOnMethodName.setMethodName(methodName);
00170         return findYoungest(requestFilterOnMethodName, true);
00171     }
00172 
00173     public synchronized Request removeYoungest(RequestFilter requestFilter) {
00174         return findYoungest(requestFilter, true);
00175     }
00176 
00177     public synchronized int add(Request request) {
00178         //System.out.println("  --> RequestQueue.add m="+request.getMethodName());
00179         // FAULT-TOLERANCE  
00180         int ftres = FTManager.NON_FT;
00181         FTManager ftm = request.getFTManager();
00182         if (ftm != null) {
00183             // null if FT is disable OR if request is an awaited request         
00184             ftres = ftm.onDeliverRequest(request);
00185             if (request.ignoreIt()) {
00186                 return ftres;
00187             }
00188         }
00189         
00190         //if the request is non functional and priority, a reference on it is added in a nonFunctionalRequestsQueue.
00191         int priority = request.getNFRequestPriority();
00192         if(priority == Request.NFREQUEST_IMMEDIATE_PRIORITY || priority == Request.NFREQUEST_PRIORITY) {
00193                 nfRequestsProcessor.addToNFRequestsQueue(request);
00194         }
00195        
00196         requestQueue.add(request);
00197         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00198             notifyAllListeners(new RequestQueueEvent(ownerID,
00199                     RequestQueueEvent.ADD_REQUEST));
00200         }
00201 
00202         return ftres;
00203     }
00204 
00205     public synchronized int addToFront(Request request) {
00206         int ftres = 0;
00207         
00208         //if the request is non functional and priority, a reference on it is added in a nonFunctionalRequestsQueue.
00209         int priority = request.getNFRequestPriority();
00210         if(priority == Request.NFREQUEST_IMMEDIATE_PRIORITY || priority == Request.NFREQUEST_PRIORITY) {
00211                 nfRequestsProcessor.addToNFRequestsQueue(request);
00212         }
00213         
00214         requestQueue.add(0, request);
00215         if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00216             notifyAllListeners(new RequestQueueEvent(ownerID,
00217                     RequestQueueEvent.ADD_REQUEST));
00218         }
00219         return ftres;
00220     }
00221 
00222     public synchronized void processRequests(RequestProcessor processor,
00223         Body body) {
00224         for (int i = 0; i < requestQueue.size(); i++) {
00225             Request r ;
00226             
00227             // First, we deal with priotity non functional requests
00228             while (!nfRequestsProcessor.isEmpty()) {
00229                 r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00230                 LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00231                 requestQueue.remove(r); 
00232             }
00233             if(requestQueue.isEmpty()) {
00234                 return;
00235             }
00236             
00237             
00238             r = (Request) requestQueue.get(i);
00239             int result = processor.processRequest(r);
00240             switch (result) {
00241             case RequestProcessor.REMOVE_AND_SERVE:
00242                 requestQueue.remove(i);
00243                 i--;
00244                 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00245                     notifyAllListeners(new RequestQueueEvent(ownerID,
00246                             RequestQueueEvent.REMOVE_REQUEST));
00247                 }
00248                 body.serve(r);
00249                 break;
00250             case RequestProcessor.REMOVE:
00251                 requestQueue.remove(i);
00252                 i--;
00253                 if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00254                     notifyAllListeners(new RequestQueueEvent(ownerID,
00255                             RequestQueueEvent.REMOVE_REQUEST));
00256                 }
00257                 break;
00258             case RequestProcessor.KEEP:
00259                 break;
00260             }
00261         }
00262     }
00263 
00264     public synchronized String toString() {
00265         StringBuffer sb = new StringBuffer();
00266         sb.append("--- RequestQueueImpl n=").append(requestQueue.size()).append("   requests --- ->\n");
00267         int count = 0;
00268         java.util.Iterator iterator = requestQueue.iterator();
00269         while (iterator.hasNext()) {
00270             Request currentrequest = (Request) iterator.next();
00271             sb.append(count).append("--> ")
00272               .append(currentrequest.getMethodName()).append("\n");
00273             count++;
00274         }
00275         sb.append("--- End RequestQueueImpl ---");
00276         sb.append("\n" + nfRequestsProcessor.toString());
00277         return sb.toString(); 
00278     }
00279 
00280     public void addRequestQueueEventListener(RequestQueueEventListener listener) {
00281         addListener(listener);
00282     }
00283 
00284     public void removeRequestQueueEventListener(
00285         RequestQueueEventListener listener) {
00286         removeListener(listener);
00287     }
00288 
00289     //
00290     // -- PROTECTED METHODS -----------------------------------------------
00291     //
00292     protected void notifyOneListener(ProActiveListener listener,
00293         ProActiveEvent event) {
00294         ((RequestQueueEventListener) listener).requestQueueModified((RequestQueueEvent) event);
00295     }
00296 
00297     //
00298     // -- PRIVATE METHODS -----------------------------------------------
00299     //
00300 
00309     private Request findOldest(RequestFilter requestFilter, boolean shouldRemove) {
00310         java.util.Iterator iterator;
00311         Request r;
00312         
00313         //First, we deal with priority non functional requests
00314         if(shouldRemove) {
00315                 while (!nfRequestsProcessor.isEmpty()) {
00316                         r = nfRequestsProcessor.getOldestPriorityNFRequest(true);
00317                         LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00318                         requestQueue.remove(r); 
00319                 }
00320         }
00321         
00322          iterator = requestQueue.iterator();
00323         //then we look for the oldest request fullfilling the criteria defined by the given filter
00324         while (iterator.hasNext()) {
00325              r = (Request) iterator.next();
00326             if (requestFilter.acceptRequest(r)) {
00327                 if (shouldRemove) {
00328                     iterator.remove();
00329                     if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00330                         notifyAllListeners(new RequestQueueEvent(ownerID,
00331                                 RequestQueueEvent.REMOVE_REQUEST));
00332                     }
00333                 }
00334                 return r;
00335             }
00336         }
00337         return null;
00338     }
00339 
00348     private Request findYoungest(RequestFilter requestFilter,
00349         boolean shouldRemove) {
00350         Request r;
00351         
00352         //First, we deal with priotity non functional requests
00353         while (!nfRequestsProcessor.isEmpty()) {
00354                 r = nfRequestsProcessor.getYoungestPriorityNFRequest(true);
00355                 LocalBodyStore.getInstance().getLocalBody(ownerID).serve(r);
00356                 requestQueue.remove(r); 
00357         }
00358         
00359         
00360         java.util.ListIterator iterator = requestQueue.listIterator(requestQueue.size());
00361         while (iterator.hasPrevious()) {
00362              r = (Request) iterator.previous();
00363             if (requestFilter.acceptRequest(r)) {
00364                 if (shouldRemove) {
00365                     iterator.remove();
00366                     if (SEND_ADD_REMOVE_EVENT && hasListeners()) {
00367                         notifyAllListeners(new RequestQueueEvent(ownerID,
00368                                 RequestQueueEvent.REMOVE_REQUEST));
00369                     }
00370                 }
00371                 return r;
00372             }
00373         }
00374         return null;
00375     }
00376 
00377     private void writeObject(java.io.ObjectOutputStream out)
00378         throws java.io.IOException {
00379         // we must set migration tag because requests could contain awaited future (parameters)
00380         org.objectweb.proactive.Body owner = LocalBodyStore.getInstance()
00381                                                            .getLocalBody(ownerID);
00382         owner.getFuturePool().setMigrationTag();
00383         out.defaultWriteObject();
00384     }
00385 
00386       
00387     //
00388     // -- INNER CLASSES -----------------------------------------------
00389     //
00390     protected class RequestFilterOnMethodName implements RequestFilter,
00391         java.io.Serializable {
00392         private String methodName;
00393 
00394         public RequestFilterOnMethodName() {
00395         }
00396 
00397         public boolean acceptRequest(Request request) {
00398             return methodName.equals(request.getMethodName());
00399         }
00400 
00401         public void setMethodName(String methodName) {
00402             this.methodName = methodName;
00403         }
00404     }
00405 }

Generated on Mon Jan 22 15:16:06 2007 for ProActive by  doxygen 1.5.1