00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.component.collectiveitfs;
00032
00033 import java.io.Serializable;
00034 import java.lang.reflect.Method;
00035 import java.util.ArrayList;
00036 import java.util.HashMap;
00037 import java.util.Iterator;
00038 import java.util.List;
00039 import java.util.Map;
00040 import java.util.Set;
00041
00042 import org.apache.log4j.Logger;
00043 import org.objectweb.fractal.api.NoSuchInterfaceException;
00044 import org.objectweb.fractal.api.type.ComponentType;
00045 import org.objectweb.fractal.api.type.InterfaceType;
00046 import org.objectweb.proactive.ProActive;
00047 import org.objectweb.proactive.core.ProActiveRuntimeException;
00048 import org.objectweb.proactive.core.body.migration.MigrationException;
00049 import org.objectweb.proactive.core.body.reply.Reply;
00050 import org.objectweb.proactive.core.body.request.ServeException;
00051 import org.objectweb.proactive.core.component.Fractive;
00052 import org.objectweb.proactive.core.component.ProActiveInterface;
00053 import org.objectweb.proactive.core.component.body.ComponentBodyImpl;
00054 import org.objectweb.proactive.core.component.identity.ProActiveComponent;
00055 import org.objectweb.proactive.core.component.representative.ItfID;
00056 import org.objectweb.proactive.core.component.request.ComponentRequest;
00057 import org.objectweb.proactive.core.component.request.ComponentRequestImpl;
00058 import org.objectweb.proactive.core.component.type.ProActiveInterfaceType;
00059 import org.objectweb.proactive.core.component.type.ProActiveTypeFactory;
00060 import org.objectweb.proactive.core.mop.MethodCall;
00061 import org.objectweb.proactive.core.node.Node;
00062 import org.objectweb.proactive.core.util.SerializableMethod;
00063 import org.objectweb.proactive.core.util.log.Loggers;
00064 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00065
00082 public class GatherRequestsQueues implements Serializable {
00083 private static Logger logger = ProActiveLogger.getLogger(Loggers.COMPONENTS_GATHERCAST);
00084 Map<String, Map<SerializableMethod, List<GatherRequestsQueue>>> queues = new HashMap<String, Map<SerializableMethod, List<GatherRequestsQueue>>>();
00085 ProActiveComponent owner;
00086 List<ItfID> gatherItfs = new ArrayList<ItfID>();
00087 ProActiveInterfaceType[] itfTypes;
00088
00089 GatherFuturesHandlerPool gatherFuturesHandlerPool;
00090
00096 public void migrateFuturesHandlersTo(Node node) throws MigrationException {
00097 Set<String> itfNames = queues.keySet();
00098 for (Iterator iter = itfNames.iterator(); iter.hasNext();) {
00099 String itfName = (String) iter.next();
00100 Map<SerializableMethod, List<GatherRequestsQueue>> queuesPerNamedItf = queues.get(itfName);
00101 Set<SerializableMethod> invokedMethods = queuesPerNamedItf.keySet();
00102 for (Iterator iterator = invokedMethods.iterator(); iterator
00103 .hasNext();) {
00104 SerializableMethod method = (SerializableMethod) iterator.next();
00105 List<GatherRequestsQueue> listOfQueues = queuesPerNamedItf.get(method);
00106 for (Iterator iterator2 = listOfQueues.iterator(); iterator
00107 .hasNext();) {
00108 GatherRequestsQueue queue = (GatherRequestsQueue) iterator.next();
00109 queue.migrateFuturesHandlerTo(node);
00110 }
00111
00112 }
00113 }
00114 }
00115
00116 public GatherRequestsQueues(ProActiveComponent owner) {
00117 this.owner = owner;
00118 InterfaceType[] untypedItfs = ((ComponentType)owner.getFcType()).getFcInterfaceTypes();
00119 itfTypes = new ProActiveInterfaceType[untypedItfs.length];
00120 for (int i = 0; i < itfTypes.length; i++) {
00121 itfTypes[i] = (ProActiveInterfaceType)untypedItfs[i];
00122 }
00123
00124 for (int i = 0; i < itfTypes.length; i++) {
00125 if (ProActiveTypeFactory.GATHER_CARDINALITY.equals(
00126 itfTypes[i].getFcCardinality())) {
00127
00128 Map<SerializableMethod, List<GatherRequestsQueue>> map = new HashMap<SerializableMethod, List<GatherRequestsQueue>>();
00129 queues.put(itfTypes[i].getFcItfName(), map);
00130 gatherItfs.add(new ItfID(itfTypes[i].getFcItfName(),
00131 owner.getID()));
00132 }
00133 }
00134 }
00135
00139 public Object addRequest(ComponentRequest r) throws ServeException {
00140 Object result = null;
00141 String serverItfName = r.getMethodCall().getComponentMetadata()
00142 .getComponentInterfaceName();
00143 ItfID senderItfID = r.getMethodCall().getComponentMetadata()
00144 .getSenderItfID();
00145
00146 Method reifiedMethod = r.getMethodCall().getReifiedMethod();
00147 Method itfTypeMethod;
00148 try {
00149 itfTypeMethod = GatherBindingChecker.searchMatchingMethod(reifiedMethod,
00150 Class.forName(getItfType(serverItfName).getFcItfSignature())
00151 .getMethods(), false);
00152 } catch (Exception e1) {
00153 e1.printStackTrace();
00154 throw new ServeException("problem when analysing gather request", e1);
00155 }
00156
00157 List<ItfID> connectedClientItfs;
00158 try {
00159 connectedClientItfs = Fractive.getGathercastController(owner)
00160 .getConnectedClientItfs(serverItfName);
00161 } catch (NoSuchInterfaceException e) {
00162 throw new ServeException("this component has no binding controller");
00163 }
00164 if (!connectedClientItfs.contains(senderItfID)) {
00165 throw new ServeException(
00166 "cannot handle gather invocation : this invocation orginates from a client interface which is not bound ");
00167 }
00168
00169 if (!queues.containsKey(serverItfName)) {
00170 throw new ProActiveRuntimeException(
00171 "there is no gathercast interface named " + serverItfName);
00172 }
00173
00174 Map<SerializableMethod, List<GatherRequestsQueue>> map = queues.get(serverItfName);
00175
00176
00177 List<GatherRequestsQueue> list = map.get(new SerializableMethod(itfTypeMethod));
00178
00179 if (list == null) {
00180 list = new ArrayList<GatherRequestsQueue>();
00181
00182 GatherRequestsQueue queue = new GatherRequestsQueue(owner,
00183 serverItfName, itfTypeMethod, connectedClientItfs, gatherFuturesHandlerPool);
00184 list.add(queue);
00185 map.put(new SerializableMethod(itfTypeMethod), list);
00186 }
00187
00188 if (list.isEmpty()) {
00189 GatherRequestsQueue queue = new GatherRequestsQueue(owner,
00190 serverItfName, itfTypeMethod, connectedClientItfs, gatherFuturesHandlerPool);
00191 map.get(new SerializableMethod(itfTypeMethod)).add(queue);
00192 }
00193
00194
00195 for (Iterator iter = list.iterator(); iter.hasNext();) {
00196 GatherRequestsQueue queue = (GatherRequestsQueue) iter.next();
00197 if (queue.containsRequestFrom(senderItfID)) {
00198
00199 if (!iter.hasNext()) {
00200
00201
00202 queue = new GatherRequestsQueue(owner, serverItfName,
00203 itfTypeMethod, connectedClientItfs, gatherFuturesHandlerPool);
00204
00205 result = queue.put(senderItfID, r);
00206 list.add(queue);
00207 break;
00208 }
00209 continue;
00210 }
00211
00212
00213
00214 result = queue.put(senderItfID, r);
00215 break;
00216 }
00217 if (logger.isDebugEnabled()) {
00218 logger.debug("added request [" + r.getMethodName() +
00219 "] in gather queue");
00220 }
00221
00222 notifyUpdate(serverItfName, list);
00223
00224 return result;
00225 }
00226
00227 private void notifyUpdate(String serverItfName,
00228 List<GatherRequestsQueue> requestQueues) throws ServeException {
00229
00230
00231 try {
00232 List<ItfID> connectedClientItfs = Fractive.getGathercastController(owner)
00233 .getConnectedClientItfs(serverItfName);
00234 GatherRequestsQueue firstRequestsInLine = requestQueues.get(0);
00235 if (firstRequestsInLine.isFull()) {
00236
00237
00238
00239 Method clientMethod = firstRequestsInLine.getInvokedMethod();
00240 String methodName = clientMethod.getName();
00241 if (logger.isDebugEnabled()) {
00242 logger.debug(
00243 "conditions reached, processing gather request [" +
00244 methodName + "]");
00245 }
00246
00247 Class[] clientMethodParamTypes = clientMethod.getParameterTypes();
00248 Class[] gatherMethodParamTypes = new Class[clientMethodParamTypes.length];
00249
00250 for (int i = 0; i < clientMethodParamTypes.length; i++) {
00251 gatherMethodParamTypes[i] = List.class;
00252 }
00253
00254 Class gatherItfClass = Class.forName(((ProActiveInterfaceType) ((ProActiveInterface) owner.getFcInterface(
00255 serverItfName)).getFcItfType()).getFcItfSignature());
00256
00257 Method gatherMethod = gatherItfClass.getMethod(clientMethod.getName(),
00258 gatherMethodParamTypes);
00259 Object[] gatherEffectiveArguments = new Object[gatherMethodParamTypes.length];
00260
00261
00262 for (int i = 0; i < gatherEffectiveArguments.length; i++) {
00263 List<Object> l = new ArrayList<Object>(connectedClientItfs.size());
00264 for (Iterator iter = connectedClientItfs.iterator();
00265 iter.hasNext();) {
00266 ItfID id = (ItfID) iter.next();
00267
00268 l.add(firstRequestsInLine.get(id).getMethodCall()
00269 .getEffectiveArguments()[i]);
00270 }
00271
00272 gatherEffectiveArguments[i] = l;
00273 }
00274
00275
00276 MethodCall gatherMC = MethodCall.getComponentMethodCall(gatherMethod,
00277 gatherEffectiveArguments, null, serverItfName,
00278 new ItfID(serverItfName, owner.getID()));
00279 long sequenceID = ((ComponentBodyImpl) ProActive.getBodyOnThis()).getNextSequenceID();
00280
00281 ComponentRequest gatherRequest = new ComponentRequestImpl(gatherMC,
00282 ProActive.getBodyOnThis(),
00283 firstRequestsInLine.oneWayMethods(),
00284 sequenceID);
00285
00286
00287 if (logger.isDebugEnabled()) {
00288 logger.debug("gather request queues .serving request [" + gatherRequest.getMethodName()+ "]");
00289 }
00290 Reply reply = gatherRequest.serve(ProActive.getBodyOnThis());
00291
00292
00293 if (reply != null) {
00294 reply.getResult().getResult();
00295 firstRequestsInLine.addFutureForGatheredRequest(reply.getResult());
00296 }
00297
00298
00299 requestQueues.remove(0);
00300 }
00301 } catch (NoSuchInterfaceException e) {
00302 e.printStackTrace();
00303 } catch (ClassNotFoundException e) {
00304 e.printStackTrace();
00305 } catch (SecurityException e) {
00306 e.printStackTrace();
00307 } catch (NoSuchMethodException e) {
00308 e.printStackTrace();
00309 }
00310 }
00311
00312 private ProActiveInterfaceType getItfType(String name) {
00313 for (int i = 0; i < itfTypes.length; i++) {
00314 if (name.equals(itfTypes[i].getFcItfName())) {
00315 return itfTypes[i];
00316 }
00317 }
00318 return null;
00319 }
00320
00321 }