org/objectweb/proactive/core/component/collectiveitfs/GatherRequestsQueues.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.component.collectiveitfs;
00032 
00033 import java.io.Serializable;
00034 import java.lang.reflect.Method;
00035 import java.util.ArrayList;
00036 import java.util.HashMap;
00037 import java.util.Iterator;
00038 import java.util.List;
00039 import java.util.Map;
00040 import java.util.Set;
00041 
00042 import org.apache.log4j.Logger;
00043 import org.objectweb.fractal.api.NoSuchInterfaceException;
00044 import org.objectweb.fractal.api.type.ComponentType;
00045 import org.objectweb.fractal.api.type.InterfaceType;
00046 import org.objectweb.proactive.ProActive;
00047 import org.objectweb.proactive.core.ProActiveRuntimeException;
00048 import org.objectweb.proactive.core.body.migration.MigrationException;
00049 import org.objectweb.proactive.core.body.reply.Reply;
00050 import org.objectweb.proactive.core.body.request.ServeException;
00051 import org.objectweb.proactive.core.component.Fractive;
00052 import org.objectweb.proactive.core.component.ProActiveInterface;
00053 import org.objectweb.proactive.core.component.body.ComponentBodyImpl;
00054 import org.objectweb.proactive.core.component.identity.ProActiveComponent;
00055 import org.objectweb.proactive.core.component.representative.ItfID;
00056 import org.objectweb.proactive.core.component.request.ComponentRequest;
00057 import org.objectweb.proactive.core.component.request.ComponentRequestImpl;
00058 import org.objectweb.proactive.core.component.type.ProActiveInterfaceType;
00059 import org.objectweb.proactive.core.component.type.ProActiveTypeFactory;
00060 import org.objectweb.proactive.core.mop.MethodCall;
00061 import org.objectweb.proactive.core.node.Node;
00062 import org.objectweb.proactive.core.util.SerializableMethod;
00063 import org.objectweb.proactive.core.util.log.Loggers;
00064 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00065 
00082 public class GatherRequestsQueues implements Serializable {
00083     private static Logger logger = ProActiveLogger.getLogger(Loggers.COMPONENTS_GATHERCAST);
00084     Map<String, Map<SerializableMethod, List<GatherRequestsQueue>>> queues = new HashMap<String, Map<SerializableMethod, List<GatherRequestsQueue>>>();
00085     ProActiveComponent owner;
00086     List<ItfID> gatherItfs = new ArrayList<ItfID>();
00087     ProActiveInterfaceType[] itfTypes;
00088     
00089     GatherFuturesHandlerPool gatherFuturesHandlerPool;
00090     
00096     public void migrateFuturesHandlersTo(Node node) throws MigrationException {
00097         Set<String> itfNames = queues.keySet();
00098         for (Iterator iter = itfNames.iterator(); iter.hasNext();) {
00099                         String itfName = (String) iter.next();
00100                         Map<SerializableMethod, List<GatherRequestsQueue>> queuesPerNamedItf = queues.get(itfName);
00101                         Set<SerializableMethod> invokedMethods = queuesPerNamedItf.keySet();
00102                         for (Iterator iterator = invokedMethods.iterator(); iterator
00103                                         .hasNext();) {
00104                                 SerializableMethod method = (SerializableMethod) iterator.next();
00105                                 List<GatherRequestsQueue> listOfQueues = queuesPerNamedItf.get(method);
00106                                 for (Iterator iterator2 = listOfQueues.iterator(); iterator
00107                                                 .hasNext();) {
00108                                         GatherRequestsQueue queue = (GatherRequestsQueue) iterator.next();
00109                                         queue.migrateFuturesHandlerTo(node);
00110                                 }
00111                                 
00112                         }
00113                 }
00114     }
00115 
00116     public GatherRequestsQueues(ProActiveComponent owner) {
00117         this.owner = owner;
00118         InterfaceType[] untypedItfs = ((ComponentType)owner.getFcType()).getFcInterfaceTypes();
00119         itfTypes = new ProActiveInterfaceType[untypedItfs.length];
00120         for (int i = 0; i < itfTypes.length; i++) {
00121             itfTypes[i] = (ProActiveInterfaceType)untypedItfs[i];
00122         }
00123 
00124         for (int i = 0; i < itfTypes.length; i++) {
00125             if (ProActiveTypeFactory.GATHER_CARDINALITY.equals(
00126                         itfTypes[i].getFcCardinality())) {
00127                 // add a queue for each gather itf
00128                 Map<SerializableMethod, List<GatherRequestsQueue>> map = new HashMap<SerializableMethod, List<GatherRequestsQueue>>();
00129                 queues.put(itfTypes[i].getFcItfName(), map);
00130                 gatherItfs.add(new ItfID(itfTypes[i].getFcItfName(),
00131                         owner.getID()));
00132             }
00133         }
00134     }
00135 
00139     public Object addRequest(ComponentRequest r) throws ServeException {
00140         Object result = null;
00141         String serverItfName = r.getMethodCall().getComponentMetadata()
00142                                 .getComponentInterfaceName();
00143         ItfID senderItfID = r.getMethodCall().getComponentMetadata()
00144                              .getSenderItfID();
00145 
00146         Method reifiedMethod = r.getMethodCall().getReifiedMethod();
00147         Method itfTypeMethod;
00148         try {
00149             itfTypeMethod = GatherBindingChecker.searchMatchingMethod(reifiedMethod,
00150                     Class.forName(getItfType(serverItfName).getFcItfSignature())
00151                          .getMethods(), false);
00152         } catch (Exception e1) {
00153             e1.printStackTrace();
00154             throw new ServeException("problem when analysing gather request", e1);
00155         }
00156 
00157         List<ItfID> connectedClientItfs;
00158         try {
00159             connectedClientItfs = Fractive.getGathercastController(owner)
00160                                           .getConnectedClientItfs(serverItfName);
00161         } catch (NoSuchInterfaceException e) {
00162             throw new ServeException("this component has no binding controller");
00163         }
00164         if (!connectedClientItfs.contains(senderItfID)) {
00165             throw new ServeException(
00166                 "cannot handle gather invocation : this invocation orginates from a client interface which is not bound ");
00167         }
00168 
00169         if (!queues.containsKey(serverItfName)) {
00170             throw new ProActiveRuntimeException(
00171                 "there is no gathercast interface named " + serverItfName);
00172         }
00173 
00174         Map<SerializableMethod, List<GatherRequestsQueue>> map = queues.get(serverItfName);
00175 
00176         // SerializableMethod objects are used as keys
00177         List<GatherRequestsQueue> list = map.get(new SerializableMethod(itfTypeMethod));
00178 
00179         if (list == null) {
00180             list = new ArrayList<GatherRequestsQueue>();
00181             // new queue, and add current request
00182             GatherRequestsQueue queue = new GatherRequestsQueue(owner,
00183                     serverItfName, itfTypeMethod, connectedClientItfs, gatherFuturesHandlerPool);
00184             list.add(queue);
00185             map.put(new SerializableMethod(itfTypeMethod), list);
00186         }
00187 
00188         if (list.isEmpty()) {
00189             GatherRequestsQueue queue = new GatherRequestsQueue(owner,
00190                     serverItfName, itfTypeMethod, connectedClientItfs, gatherFuturesHandlerPool);
00191             map.get(new SerializableMethod(itfTypeMethod)).add(queue);
00192         } 
00193         
00194 
00195         for (Iterator iter = list.iterator(); iter.hasNext();) {
00196             GatherRequestsQueue queue = (GatherRequestsQueue) iter.next();
00197             if (queue.containsRequestFrom(senderItfID)) {
00198                 // there is already a request from this comp/itf
00199                 if (!iter.hasNext()) {
00200                     // no other queue to receive this request. create one
00201                     // concurrent access exception?
00202                     queue = new GatherRequestsQueue(owner, serverItfName,
00203                             itfTypeMethod, connectedClientItfs, gatherFuturesHandlerPool);
00204                     // add the request
00205                     result = queue.put(senderItfID, r);
00206                     list.add(queue);
00207                     break;
00208                 }
00209                 continue;
00210             }
00211             // TODO if request is synchronous : put in threaded queue, notify, then put the thread on sleep and serve next request 
00212 
00213             // add this request
00214             result = queue.put(senderItfID, r);
00215             break;
00216         }
00217         if (logger.isDebugEnabled()) {
00218             logger.debug("added request [" + r.getMethodName() +
00219                 "] in gather queue");
00220         }
00221         // check if needs to do something!
00222         notifyUpdate(serverItfName, list);
00223 
00224         return result;
00225     }
00226 
00227     private void notifyUpdate(String serverItfName,
00228         List<GatherRequestsQueue> requestQueues) throws ServeException {
00229 
00230         // default: if all connected itfs have sent a request, then process it
00231         try {
00232             List<ItfID> connectedClientItfs = Fractive.getGathercastController(owner)
00233                                                       .getConnectedClientItfs(serverItfName);
00234             GatherRequestsQueue firstRequestsInLine = requestQueues.get(0); // need to ensure this
00235             if (firstRequestsInLine.isFull()) {
00236                 // ok, condition met, proceed with request
00237 
00238                 // create a new gather request by gathering parameters
00239                 Method clientMethod = firstRequestsInLine.getInvokedMethod();
00240                 String methodName = clientMethod.getName();
00241                 if (logger.isDebugEnabled()) {
00242                     logger.debug(
00243                         "conditions reached, processing gather request [" +
00244                         methodName + "]");
00245                 }
00246 
00247                 Class[] clientMethodParamTypes = clientMethod.getParameterTypes();
00248                 Class[] gatherMethodParamTypes = new Class[clientMethodParamTypes.length];
00249 
00250                 for (int i = 0; i < clientMethodParamTypes.length; i++) {
00251                     gatherMethodParamTypes[i] = List.class;
00252                 }
00253 
00254                 Class gatherItfClass = Class.forName(((ProActiveInterfaceType) ((ProActiveInterface) owner.getFcInterface(
00255                             serverItfName)).getFcItfType()).getFcItfSignature());
00256 
00257                 Method gatherMethod = gatherItfClass.getMethod(clientMethod.getName(),
00258                         gatherMethodParamTypes);
00259                 Object[] gatherEffectiveArguments = new Object[gatherMethodParamTypes.length];
00260 
00261                 // build the list of parameters
00262                 for (int i = 0; i < gatherEffectiveArguments.length; i++) {
00263                     List<Object> l = new ArrayList<Object>(connectedClientItfs.size());
00264                     for (Iterator iter = connectedClientItfs.iterator();
00265                             iter.hasNext();) {
00266                         ItfID id = (ItfID) iter.next();
00267                         // keep same ordering as connected client itfs
00268                         l.add(firstRequestsInLine.get(id).getMethodCall()
00269                                                  .getEffectiveArguments()[i]);
00270                     }
00271                     // parameters from a given client have the same order than this client in the list of connected clients 
00272                     gatherEffectiveArguments[i] = l;
00273                 }
00274 
00275                 // create the request
00276                 MethodCall gatherMC = MethodCall.getComponentMethodCall(gatherMethod,
00277                         gatherEffectiveArguments, null, serverItfName,
00278                         new ItfID(serverItfName, owner.getID()));
00279                 long sequenceID = ((ComponentBodyImpl) ProActive.getBodyOnThis()).getNextSequenceID();
00280                 
00281                 ComponentRequest gatherRequest = new ComponentRequestImpl(gatherMC,
00282                         ProActive.getBodyOnThis(),
00283                         firstRequestsInLine.oneWayMethods(),
00284                         sequenceID);
00285 
00286                 // serve the request (do not reenqueue it)
00287                 if (logger.isDebugEnabled()) {
00288                                         logger.debug("gather request queues .serving request [" + gatherRequest.getMethodName()+ "]");
00289                                 }
00290                 Reply reply = gatherRequest.serve(ProActive.getBodyOnThis());
00291 
00292                 // handle the future for async invocations
00293                 if (reply != null) {
00294                         reply.getResult().getResult();
00295                     firstRequestsInLine.addFutureForGatheredRequest(reply.getResult());
00296                 }
00297 
00298                 // remove the list that was just used
00299                 requestQueues.remove(0);
00300             }
00301         } catch (NoSuchInterfaceException e) {
00302             e.printStackTrace();
00303         } catch (ClassNotFoundException e) {
00304             e.printStackTrace();
00305         } catch (SecurityException e) {
00306             e.printStackTrace();
00307         } catch (NoSuchMethodException e) {
00308             e.printStackTrace();
00309         }
00310     }
00311 
00312     private ProActiveInterfaceType getItfType(String name) {
00313         for (int i = 0; i < itfTypes.length; i++) {
00314             if (name.equals(itfTypes[i].getFcItfName())) {
00315                 return itfTypes[i];
00316             }
00317         }
00318         return null;
00319     }
00320     
00321 }

Generated on Mon Jan 22 15:16:06 2007 for ProActive by  doxygen 1.5.1