00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.component.collectiveitfs;
00032
00033 import java.io.Serializable;
00034 import java.lang.reflect.Method;
00035 import java.util.HashMap;
00036 import java.util.List;
00037 import java.util.Map;
00038 import java.util.Timer;
00039 import java.util.TimerTask;
00040
00041 import org.apache.log4j.Logger;
00042 import org.objectweb.proactive.core.ProActiveRuntimeException;
00043 import org.objectweb.proactive.core.body.future.FutureResult;
00044 import org.objectweb.proactive.core.body.migration.MigrationException;
00045 import org.objectweb.proactive.core.component.exceptions.GathercastTimeoutException;
00046 import org.objectweb.proactive.core.component.identity.ProActiveComponent;
00047 import org.objectweb.proactive.core.component.representative.ItfID;
00048 import org.objectweb.proactive.core.component.request.ComponentRequest;
00049 import org.objectweb.proactive.core.component.type.annotations.gathercast.MethodSynchro;
00050 import org.objectweb.proactive.core.node.Node;
00051 import org.objectweb.proactive.core.util.SerializableMethod;
00052 import org.objectweb.proactive.core.util.log.Loggers;
00053 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00054
00055
00067 public class GatherRequestsQueue implements Serializable {
00068 private ProActiveComponent owner;
00069 private GatherFuturesHandler futuresHandler;
00070 private List<ItfID> connectedClientItfs;
00071 private Map<ItfID, ComponentRequest> requests;
00072 private String serverItfName;
00073 private SerializableMethod itfTypeInvokedMethod;
00074 transient long creationTime = System.currentTimeMillis();
00075 public static final long DEFAULT_TIMEOUT = 1000000;
00076 private Timer timeoutTimer = null;
00077 boolean timedout = false;
00078 boolean thrownTimeoutException = false;
00079 long timeout = DEFAULT_TIMEOUT;
00080 private static Logger logger = ProActiveLogger.getLogger(Loggers.COMPONENTS_GATHERCAST);
00081 GatherFuturesHandlerPool gatherFuturesHandlerPool;
00082 boolean resultsReturned = false;
00083 boolean oneWayCall = true;
00084
00085 public GatherRequestsQueue(ProActiveComponent owner, String serverItfName,
00086 Method itfTypeMethod, List<ItfID> connectedClientItfs,
00087 GatherFuturesHandlerPool gatherFuturesHandlerPool) {
00088 this.owner = owner;
00089 this.serverItfName = serverItfName;
00090
00091
00092 itfTypeInvokedMethod = new SerializableMethod(itfTypeMethod);
00093 this.gatherFuturesHandlerPool = gatherFuturesHandlerPool;
00094 this.connectedClientItfs = connectedClientItfs;
00095 try {
00096 if (logger.isDebugEnabled()) {
00097 logger.debug("adding futures handler for requests on " +
00098 serverItfName + "." + itfTypeMethod.getName());
00099 }
00100
00101
00102 if (!Void.TYPE.equals(itfTypeMethod.getReturnType())) {
00103 oneWayCall = false;
00104
00105 futuresHandler = (GatherFuturesHandler) GatherFuturesHandlerPool.instance()
00106 .borrowFuturesHandler();
00107 futuresHandler.setConnectedClientItfs(connectedClientItfs);
00108 }
00109 } catch (Exception e) {
00110 throw new ProActiveRuntimeException("cannot create futures handler for gather interface",
00111 e);
00112 }
00113
00114 requests = new HashMap<ItfID, ComponentRequest>();
00115
00116
00117
00118 }
00119
00120 public boolean containsRequestFrom(ItfID clientItfID) {
00121 return requests.containsKey(clientItfID);
00122 }
00123
00124 public synchronized Object put(ItfID clientItfID, ComponentRequest request) {
00125 if (isFull()) {
00126 throw new ProActiveRuntimeException("gather requests queue is full");
00127 }
00128 requests.put(clientItfID, request);
00129
00130 if (!oneWayCall) {
00131
00132 if (timeoutTimer == null) {
00133 timeoutTimer = new Timer();
00134 MethodSynchro sc = itfTypeInvokedMethod.getMethod()
00135 .getAnnotation(MethodSynchro.class);
00136 if (sc != null) {
00137 timeout = sc.timeout();
00138 } else {
00139 timeout = DEFAULT_TIMEOUT;
00140 }
00141 if (logger.isDebugEnabled()) {
00142 logger.debug(
00143 "gather request queue timer starting with timeout = " +
00144 timeout);
00145 }
00146 timeoutTimer.schedule(new TimeoutTask(this), timeout);
00147 }
00148
00149 if (isFull()) {
00150 timeoutTimer.cancel();
00151 }
00152
00153 if (((System.currentTimeMillis() - creationTime) / 1000) >= timeout) {
00154
00155 timedout = true;
00156 addFutureForGatheredRequest(null);
00157 }
00158 Object reply = futuresHandler.distribute(clientItfID);
00159
00160
00161 try {
00162 return reply;
00163 } finally {
00164 if (isFull()) {
00165 try {
00166 finalize();
00167 } catch (Throwable e) {
00168 e.printStackTrace();
00169 }
00170 }
00171 }
00172 } else {
00173 return null;
00174 }
00175 }
00176
00177 public ComponentRequest get(ItfID id) {
00178 return requests.get(id);
00179 }
00180
00181 public boolean isFull() {
00182 return (requests.size() == connectedClientItfs.size());
00183 }
00184
00185 public int size() {
00186 return requests.size();
00187 }
00188
00189 public Method getInvokedMethod() {
00190
00191 if (requests.isEmpty()) {
00192 return null;
00193 }
00194 return requests.get(requests.keySet().iterator().next()).getMethodCall()
00195 .getReifiedMethod();
00196 }
00197
00198 public boolean oneWayMethods() {
00199 if (requests.isEmpty()) {
00200 return false;
00201 }
00202 return requests.get(requests.keySet().iterator().next()).isOneWay();
00203 }
00204
00205 public void addFutureForGatheredRequest(FutureResult futureResult) {
00206 if (timedout && !resultsReturned) {
00207
00208 if (!thrownTimeoutException) {
00209 if (logger.isDebugEnabled()) {
00210 logger.debug("timeout reached at " + timeout +
00211 "for gather request on [" +
00212 itfTypeInvokedMethod.getMethod().getName() + "]");
00213 }
00214 thrownTimeoutException = true;
00215 futuresHandler.setFutureOfGatheredInvocation(new FutureResult(
00216 null,
00217 new GathercastTimeoutException("timeout of " + timeout +
00218 " reached before invocations from all clients were received for gather invocation (method " +
00219 itfTypeInvokedMethod.getMethod().toGenericString() +
00220 " on gather interface " + serverItfName), null));
00221 }
00222
00223
00224 } else {
00225 if (!resultsReturned) {
00226
00227 resultsReturned = true;
00228 futuresHandler.setFutureOfGatheredInvocation(futureResult);
00229 } else {
00230
00231 }
00232 try {
00233 GatherFuturesHandlerPool.instance()
00234 .returnFuturesHandler(futuresHandler);
00235 } catch (Exception e) {
00236 e.printStackTrace();
00237 }
00238 }
00239 timeoutTimer.cancel();
00240 }
00241
00245 public long getCreationTime() {
00246 return creationTime;
00247 }
00248
00249 public void returnFuturesHandlerToPool() {
00250 if (futuresHandler != null) {
00251 futuresHandler.passivate();
00252 }
00253 }
00254
00258 public Map<ItfID, ComponentRequest> getRequests() {
00259 return requests;
00260 }
00261
00265 public List<ItfID> getConnectedClientItfs() {
00266 return connectedClientItfs;
00267 }
00268
00269 private class TimeoutTask extends TimerTask {
00270 GatherRequestsQueue requestsQueue;
00271
00272 public TimeoutTask(GatherRequestsQueue requestsQueue) {
00273 this.requestsQueue = requestsQueue;
00274 }
00275
00276 public void run() {
00277 timedout = true;
00278 if (!resultsReturned) {
00279 if (!thrownTimeoutException) {
00280 requestsQueue.addFutureForGatheredRequest(new FutureResult(
00281 null,
00282 new GathercastTimeoutException("timeout of " +
00283 timeout +
00284 " reached before invocations from all clients were received for gather invocation (method " +
00285 itfTypeInvokedMethod.getMethod()
00286 .toGenericString() +
00287 " on gather interface " + serverItfName), null));
00288 }
00289 }
00290 }
00291 }
00292
00293 public void migrateFuturesHandlerTo(Node node) throws MigrationException {
00294 futuresHandler.migrateTo(node);
00295 }
00296
00297 private void writeObject(java.io.ObjectOutputStream out)
00298 throws java.io.IOException {
00299
00300 out.defaultWriteObject();
00301 }
00302
00303 private void readObject(java.io.ObjectInputStream in)
00304 throws java.io.IOException, ClassNotFoundException {
00305
00306 in.defaultReadObject();
00307 }
00308 }