00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.p2p.service.node;
00032
00033 import java.io.Serializable;
00034 import java.util.HashMap;
00035 import java.util.Vector;
00036
00037 import org.apache.log4j.Logger;
00038 import org.objectweb.proactive.Body;
00039 import org.objectweb.proactive.EndActive;
00040 import org.objectweb.proactive.InitActive;
00041 import org.objectweb.proactive.ProActive;
00042 import org.objectweb.proactive.ProActiveInternalObject;
00043 import org.objectweb.proactive.RunActive;
00044 import org.objectweb.proactive.Service;
00045 import org.objectweb.proactive.core.ProActiveException;
00046 import org.objectweb.proactive.core.node.Node;
00047 import org.objectweb.proactive.core.node.NodeFactory;
00048 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00049 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00050 import org.objectweb.proactive.core.util.log.Loggers;
00051 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00052 import org.objectweb.proactive.core.util.wrapper.IntWrapper;
00053 import org.objectweb.proactive.p2p.service.P2PService;
00054 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00055
00056
00062 public class P2PNodeLookup implements InitActive, RunActive, EndActive,
00063 P2PConstants, Serializable, ProActiveInternalObject {
00064 private static final Logger logger = ProActiveLogger.getLogger(Loggers.P2P_NODES);
00065 private Vector<Node> waitingNodesList;
00066 private Vector<String> nodesToKillList;
00067 private long expirationTime;
00068 private static final long TIMEOUT = Long.parseLong(System.getProperty(
00069 PROPERTY_NODES_ACQUISITION_T0));
00070 private static final long LOOKUP_FREQ = Long.parseLong(System.getProperty(
00071 PROPERTY_LOOKUP_FREQ));
00072 private static final int TTL = Integer.parseInt(System.getProperty(
00073 PROPERTY_TTL));
00074 private int numberOfAskedNodes;
00075 private int acquiredNodes = 0;
00076 private P2PService localP2pService;
00077 private String vnName;
00078 private String jobId;
00079 private P2PNodeLookup stub;
00080 private ProActiveRuntime paRuntime;
00081 private String parUrl;
00082 private HashMap<String, P2PNodeManager> nodeManagerMap = new HashMap<String, P2PNodeManager>();
00083 private boolean killAllFlag = false;
00084 private String nodeFamilyRegexp = null;
00085
00086 public P2PNodeLookup() {
00087
00088 }
00089
00090 public P2PNodeLookup(Integer numberOfAskedNodes,
00091 P2PService localP2pService, String vnName, String jobId,
00092 String nodeFamilyRegexp) {
00093 this.waitingNodesList = new Vector<Node>();
00094 this.nodesToKillList = new Vector<String>();
00095 this.expirationTime = System.currentTimeMillis() + TIMEOUT;
00096 this.numberOfAskedNodes = numberOfAskedNodes.intValue();
00097 assert (this.numberOfAskedNodes > 0) ||
00098 (this.numberOfAskedNodes == MAX_NODE) : "None authorized value for asked nodes";
00099
00100
00101
00102
00103 this.localP2pService = localP2pService;
00104 this.vnName = vnName;
00105 this.jobId = jobId;
00106 this.nodeFamilyRegexp = nodeFamilyRegexp;
00107 }
00108
00109
00110
00111
00112
00119 public boolean allArrived() {
00120 if (this.numberOfAskedNodes != MAX_NODE) {
00121 return (this.numberOfAskedNodes == this.acquiredNodes);
00122 } else {
00123 return false;
00124 }
00125 }
00126
00132 public boolean nArrived(int n) {
00133 return n <= this.acquiredNodes;
00134 }
00135
00140 public void killNode(String node) {
00141 try {
00142 Node remoteNode = NodeFactory.getNode(node);
00143 ProActiveRuntime remoteRuntime = remoteNode.getProActiveRuntime();
00144 P2PNodeManager remoteNodeManager = (P2PNodeManager) this.nodeManagerMap.get(node);
00145
00146 remoteRuntime.unregisterVirtualNode(vnName);
00147 remoteRuntime.rmAcquaintance(this.parUrl);
00148 this.paRuntime.rmAcquaintance(remoteRuntime.getURL());
00149 remoteNodeManager.leaveNode(remoteNode, this.vnName);
00150
00151 logger.info("Node at " + node + " succefuly removed");
00152
00153
00154 this.paRuntime.unregister(remoteRuntime, remoteRuntime.getURL(),
00155 "p2p", System.getProperty(PROPERTY_ACQUISITION) + ":",
00156 remoteRuntime.getVMInformation().getName());
00157 } catch (Exception e) {
00158 logger.info("Node @" + node + " already down");
00159 } finally {
00160 this.nodesToKillList.remove(node);
00161 }
00162 }
00163
00169 public void killAllNodes() {
00170 this.killAllFlag = true;
00171 while (this.nodesToKillList.size() > 0) {
00172 String currentNode = this.nodesToKillList.get(0);
00173 this.killNode(currentNode);
00174 }
00175 }
00176
00180 public void wakeUp() {
00181
00182 }
00183
00191 public IntWrapper giveNode(Node givenNode, P2PNodeManager remoteNodeManager) {
00192 if (logger.isDebugEnabled()) {
00193 logger.debug("Given node received from " +
00194 givenNode.getNodeInformation().getURL());
00195 }
00196
00197
00198 if (!this.allArrived()) {
00199 this.waitingNodesList.add(givenNode);
00200 String nodeUrl = givenNode.getNodeInformation().getURL();
00201 this.nodeManagerMap.put(nodeUrl, remoteNodeManager);
00202 this.acquiredNodes++;
00203 ProActiveRuntime remoteRt = givenNode.getProActiveRuntime();
00204 try {
00205 remoteRt.addAcquaintance(this.parUrl);
00206 this.paRuntime.addAcquaintance(remoteRt.getURL());
00207 this.paRuntime.register(remoteRt, remoteRt.getURL(), "p2p",
00208 System.getProperty(PROPERTY_ACQUISITION) + ":",
00209 remoteRt.getVMInformation().getName());
00210 } catch (ProActiveException e) {
00211 logger.warn("Couldn't recgister the remote runtime", e);
00212 }
00213 logger.info("Node at " + nodeUrl + " succefuly added");
00214 logger.info("Lookup got " + this.acquiredNodes + " nodes");
00215 return new IntWrapper(this.numberOfAskedNodes - this.acquiredNodes);
00216 }
00217 return new IntWrapper(-1);
00218 }
00219
00220 public void giveNodeForMax(Vector<Node> givenNodes,
00221 P2PNodeManager remoteNodeManager) {
00222
00223 this.waitingNodesList.addAll(givenNodes);
00224 for (int i = 0; i < givenNodes.size(); i++) {
00225 Node current = (Node) givenNodes.get(i);
00226 String nodeUrl = current.getNodeInformation().getURL();
00227 this.nodeManagerMap.put(nodeUrl, remoteNodeManager);
00228 this.acquiredNodes++;
00229 ProActiveRuntime remoteRt = current.getProActiveRuntime();
00230 try {
00231 remoteRt.addAcquaintance(this.parUrl);
00232 this.paRuntime.addAcquaintance(remoteRt.getURL());
00233 this.paRuntime.register(remoteRt, remoteRt.getURL(), "p2p",
00234 System.getProperty(PROPERTY_ACQUISITION) + ":",
00235 remoteRt.getVMInformation().getName());
00236 } catch (ProActiveException e) {
00237 logger.warn("Couldn't recgister the remote runtime", e);
00238 }
00239 logger.info("Node at " + nodeUrl + " succefuly added");
00240 }
00241 logger.info("Lookup MAX got " + this.acquiredNodes + " nodes");
00242 }
00243
00252 public boolean isActive() {
00253 return ProActive.getBodyOnThis().isActive();
00254 }
00255
00259 public Vector getAndRemoveNodes() {
00260 Vector<Node> v = new Vector<Node>();
00261 while (this.waitingNodesList.size() != 0) {
00262 Node elem = this.waitingNodesList.get(0);
00263 v.add(elem);
00264 this.waitingNodesList.remove(elem);
00265 this.nodesToKillList.add(elem.getNodeInformation().getURL());
00266 }
00267 return v;
00268 }
00269
00274 public Vector getNodes() {
00275 Service service = new Service(ProActive.getBodyOnThis());
00276 while (!this.allArrived()) {
00277 this.localP2pService.askingNode(TTL, null, this.localP2pService,
00278 this.numberOfAskedNodes - this.acquiredNodes, stub,
00279 this.vnName, this.jobId, this.nodeFamilyRegexp);
00280
00281
00282 service.blockingServeOldest(LOOKUP_FREQ);
00283 while (service.hasRequestToServe()) {
00284 service.serveOldest();
00285 }
00286 }
00287 return ((P2PNodeLookup) ProActive.getStubOnThis()).getAndRemoveNodes();
00288 }
00289
00295 public Vector getNodes(long timeout) {
00296 long endTime = System.currentTimeMillis() + timeout;
00297 Service service = new Service(ProActive.getBodyOnThis());
00298 while (!this.allArrived() && (System.currentTimeMillis() < endTime)) {
00299 this.localP2pService.askingNode(TTL, null, this.localP2pService,
00300 this.numberOfAskedNodes - this.acquiredNodes, stub,
00301 this.vnName, this.jobId, this.nodeFamilyRegexp);
00302
00303 if (timeout < LOOKUP_FREQ) {
00304 service.blockingServeOldest(500);
00305 } else {
00306 service.blockingServeOldest(LOOKUP_FREQ);
00307 }
00308 while (service.hasRequestToServe()) {
00309 service.serveOldest();
00310 }
00311 }
00312 return ((P2PNodeLookup) ProActive.getStubOnThis()).getAndRemoveNodes();
00313 }
00314
00320 public void moveTo(String nodeUrl) {
00321 try {
00322 ProActive.migrateTo(nodeUrl);
00323 } catch (Exception e) {
00324 logger.fatal("Couldn't migrate the node lookup to " + nodeUrl, e);
00325 }
00326 }
00327
00328
00329
00330
00331
00335 public void initActivity(Body body) {
00336 this.stub = (P2PNodeLookup) ProActive.getStubOnThis();
00337 try {
00338 this.paRuntime = RuntimeFactory.getDefaultRuntime();
00339 this.parUrl = this.paRuntime.getURL();
00340 } catch (ProActiveException e) {
00341 logger.fatal("Problem to get local runtime", e);
00342 }
00343 }
00344
00348 public void runActivity(Body body) {
00349 logger.info("Looking for " +
00350 ((this.numberOfAskedNodes == MAX_NODE) ? "MAX"
00351 : (this.numberOfAskedNodes +
00352 "")) + " nodes");
00353 Service service = new Service(body);
00354
00355 String reason = null;
00356 while (true) {
00357 logger.debug("Aksing nodes");
00358
00359
00360 this.localP2pService.askingNode(TTL, null, this.localP2pService,
00361 (this.numberOfAskedNodes == P2PConstants.MAX_NODE)
00362 ? P2PConstants.MAX_NODE
00363 : (this.numberOfAskedNodes - this.acquiredNodes), stub,
00364 this.vnName, this.jobId, this.nodeFamilyRegexp);
00365
00366
00367 logger.debug("Waiting for requests");
00368 service.blockingServeOldest(LOOKUP_FREQ);
00369 while (service.hasRequestToServe()) {
00370 if (logger.isDebugEnabled()) {
00371 logger.info("Serving request: " +
00372 service.getOldest().getMethodName());
00373 }
00374 service.serveOldest();
00375 }
00376
00377
00378 if (this.killAllFlag) {
00379 reason = "killing nodes request";
00380 break;
00381 } else if (this.numberOfAskedNodes == MAX_NODE) {
00382 reason = "Max node asked";
00383 continue;
00384 } else if (this.allArrived()) {
00385 reason = "all nodes are arrived";
00386 break;
00387 } else if ((System.currentTimeMillis() > this.expirationTime)) {
00388 reason = "timeout is expired";
00389 break;
00390 } else {
00391 reason = "Normal case continue";
00392 continue;
00393 }
00394 }
00395
00396 if (reason == null) {
00397 reason = "Houston. We have a problem...";
00398 }
00399 logger.info("Ending loop activity because: " + reason);
00400 }
00401
00407 public void endActivity(Body body) {
00408 Service service = new Service(body);
00409 logger.info("Nodes (" + this.acquiredNodes +
00410 ") arrived ending activity");
00411 this.localP2pService.removeWaitingAccessor(this.stub);
00412 while ((this.waitingNodesList.size() > 0) ||
00413 (this.nodesToKillList.size() > 0)) {
00414 service.blockingServeOldest(LOOKUP_FREQ);
00415 while (service.hasRequestToServe()) {
00416 service.serveOldest();
00417 }
00418 }
00419 logger.info("This P2P nodes lookup is no more active, bye..");
00420 }
00421 }