00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.p2p.service;
00032
00033 import java.io.IOException;
00034 import java.io.Serializable;
00035 import java.util.Random;
00036 import java.util.UUID;
00037 import java.util.Vector;
00038
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.ActiveObjectCreationException;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.InitActive;
00043 import org.objectweb.proactive.ProActive;
00044 import org.objectweb.proactive.ProActiveInternalObject;
00045 import org.objectweb.proactive.Service;
00046 import org.objectweb.proactive.core.Constants;
00047 import org.objectweb.proactive.core.body.UniversalBody;
00048 import org.objectweb.proactive.core.body.request.Request;
00049 import org.objectweb.proactive.core.body.request.RequestFilter;
00050 import org.objectweb.proactive.core.mop.MOP;
00051 import org.objectweb.proactive.core.node.Node;
00052 import org.objectweb.proactive.core.node.NodeException;
00053 import org.objectweb.proactive.core.node.NodeFactory;
00054 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00055 import org.objectweb.proactive.core.util.log.Loggers;
00056 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00057 import org.objectweb.proactive.core.util.wrapper.IntWrapper;
00058 import org.objectweb.proactive.core.util.wrapper.StringWrapper;
00059 import org.objectweb.proactive.p2p.service.exception.P2POldMessageException;
00060 import org.objectweb.proactive.p2p.service.node.P2PNode;
00061 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00062 import org.objectweb.proactive.p2p.service.node.P2PNodeManager;
00063 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00064
00065
00073 public class P2PService implements InitActive, P2PConstants, Serializable,
00074 ProActiveInternalObject {
00075
00077 private static final Logger logger = ProActiveLogger.getLogger(Loggers.P2P_SERVICE);
00078
00080 private P2PService acquaintances;
00081
00085 private P2PAcquaintanceManager acquaintanceManager;
00086
00090 private Node p2pServiceNode = null;
00091 private static final int MSG_MEMORY = Integer.parseInt(System.getProperty(
00092 P2PConstants.PROPERTY_MSG_MEMORY));
00093 private static final int NOA = Integer.parseInt(System.getProperty(
00094 P2PConstants.PROPERTY_NOA));
00095 private static final int EXPL_MSG = Integer.parseInt(System.getProperty(
00096 P2PConstants.PROPERTY_EXPLORING_MSG)) - 1;
00097 private static final long ACQ_TO = Long.parseLong(System.getProperty(
00098 P2PConstants.PROPERTY_NODES_ACQUISITION_T0));
00099
00103 private static final Random randomizer = new Random();
00104
00108 private Vector<UUID> oldMessageList = new Vector<UUID>(MSG_MEMORY);
00109 private P2PNodeManager nodeManager = null;
00110
00114 private Vector<P2PNodeLookup> waitingNodesLookup = new Vector<P2PNodeLookup>();
00115 private Vector<P2PNodeLookup> waitingMaximunNodesLookup = new Vector<P2PNodeLookup>();
00116 private P2PService stubOnThis = null;
00117
00118
00119 private Service service = null;
00120 private RequestFilter filter = new RequestFilter() {
00121
00125 public boolean acceptRequest(Request request) {
00126 String requestName = request.getMethodName();
00127 if (requestName.compareToIgnoreCase("askingNode") == 0) {
00128 return false;
00129 }
00130 return true;
00131 }
00132 };
00133
00134
00135
00136
00137
00143 public P2PService() {
00144
00145 }
00146
00147
00148
00149
00150
00155 public void firstContact(Vector peers) {
00156
00157 Object[] params = new Object[3];
00158 params[0] = peers;
00159 params[1] = this.acquaintanceManager;
00160 params[2] = this.stubOnThis;
00161 try {
00162 ProActive.newActive(P2PFirstContact.class.getName(), params,
00163 this.p2pServiceNode);
00164 } catch (ActiveObjectCreationException e) {
00165 logger.warn("Couldn't active P2PFirstContact", e);
00166 } catch (NodeException e) {
00167 logger.warn("Couldn't active P2PFirstContact", e);
00168 }
00169 }
00170
00176 public void register(P2PService service) {
00177 try {
00178 if (!this.stubOnThis.equals(service)) {
00179 this.acquaintanceManager.add(service);
00180 if (logger.isDebugEnabled()) {
00181 logger.debug("Remote peer localy registered: " +
00182 ProActive.getActiveObjectNodeUrl(service));
00183 }
00184
00185
00186 this.wakeUpEveryBody();
00187 }
00188 } catch (Exception e) {
00189 logger.debug("The remote P2P service is certainly down", e);
00190 }
00191 }
00192
00196 public void heartBeat() {
00197 logger.debug("Heart-beat message received");
00198 }
00199
00208 public void exploring(int ttl, UUID uuid, P2PService remoteService) {
00209 if (uuid != null) {
00210 logger.debug("Exploring message received with #" + uuid);
00211 ttl--;
00212 }
00213
00214 boolean broadcast;
00215 try {
00216 broadcast = broadcaster(ttl, uuid, remoteService);
00217 } catch (P2POldMessageException e) {
00218 return;
00219 }
00220
00221
00222 if (this.shouldBeAcquaintance(remoteService)) {
00223 this.register(remoteService);
00224 try {
00225 remoteService.register(this.stubOnThis);
00226 } catch (Exception e) {
00227 logger.debug("Trouble with registering remote peer", e);
00228 this.acquaintanceManager.remove(remoteService);
00229 }
00230 } else if (broadcast) {
00231
00232 if (uuid == null) {
00233 logger.debug("Generating uuid for exploring message");
00234 uuid = UUID.randomUUID();
00235 }
00236 this.acquaintances.exploring(ttl, uuid, remoteService);
00237 logger.debug("Broadcast exploring message with #" + uuid);
00238 uuid = null;
00239 }
00240 }
00241
00253 public void askingNode(int ttl, UUID uuid, P2PService remoteService,
00254 int numberOfNodes, P2PNodeLookup lookup, String vnName, String jobId,
00255 String nodeFamilyRegexp) {
00256 boolean broadcast;
00257 if (uuid != null) {
00258 logger.debug("AskingNode message received with #" + uuid);
00259 ttl--;
00260 try {
00261 broadcast = broadcaster(ttl, uuid, remoteService);
00262 } catch (P2POldMessageException e) {
00263 return;
00264 }
00265 } else {
00266 broadcast = true;
00267 }
00268
00269
00270 if ((uuid != null) || (numberOfNodes == MAX_NODE)) {
00271 if (numberOfNodes == MAX_NODE) {
00272 Vector nodes = this.nodeManager.askingAllNodes(nodeFamilyRegexp);
00273 for (int i = 0; i < nodes.size(); i++) {
00274 Node current = (Node) nodes.get(i);
00275 if (vnName != null) {
00276 try {
00277 current.getProActiveRuntime()
00278 .registerVirtualNode(vnName, true);
00279 } catch (Exception e) {
00280 logger.warn("Couldn't register " + vnName +
00281 " in the PAR", e);
00282 }
00283 }
00284 if (jobId != null) {
00285 current.getNodeInformation().setJobID(jobId);
00286 }
00287 }
00288 if (nodes.size() > 0) {
00289 lookup.giveNodeForMax(nodes, this.nodeManager);
00290 }
00291 } else {
00292 P2PNode askedNode = this.nodeManager.askingNode(nodeFamilyRegexp);
00293
00294
00295 Node nodeAvailable = askedNode.getNode();
00296 if (nodeAvailable != null) {
00297 IntWrapper nodeAck;
00298
00299 try {
00300 nodeAck = lookup.giveNode(nodeAvailable,
00301 askedNode.getNodeManager());
00302 } catch (Exception lookupExcption) {
00303 logger.info("Cannot contact the remote lookup",
00304 lookupExcption);
00305 this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00306 return;
00307 }
00308 if (nodeAck != null) {
00309
00310 long endTime = System.currentTimeMillis() + ACQ_TO;
00311 while ((System.currentTimeMillis() < endTime) &&
00312 ProActive.isAwaited(nodeAck)) {
00313 if (this.service.hasRequestToServe()) {
00314 service.serveAll(this.filter);
00315 } else {
00316 try {
00317 Thread.sleep(100);
00318 } catch (InterruptedException e) {
00319 logger.debug(e);
00320 }
00321 }
00322 }
00323
00324
00325 if (ProActive.isAwaited(nodeAck)) {
00326
00327
00328 logger.debug("Ack timeout expired");
00329 this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00330 return;
00331 }
00332 }
00333
00334 if (nodeAck.intValue() != -1) {
00335
00336 if (vnName != null) {
00337 try {
00338 nodeAvailable.getProActiveRuntime()
00339 .registerVirtualNode(vnName, true);
00340 } catch (Exception e) {
00341 logger.warn("Couldn't register " + vnName +
00342 " in the PAR", e);
00343 }
00344 }
00345 if (jobId != null) {
00346 nodeAvailable.getNodeInformation().setJobID(jobId);
00347 }
00348 numberOfNodes = (numberOfNodes == MAX_NODE) ? MAX_NODE
00349 : nodeAck.intValue();
00350 logger.info("Giving 1 node to vn: " + vnName);
00351 } else {
00352
00353 logger.debug("NACK node received");
00354 this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00355
00356 return;
00357 }
00358 }
00359
00360
00361 if (numberOfNodes == 0) {
00362 logger.debug("No more nodes are needed");
00363 return;
00364 }
00365 }
00366 }
00367
00368
00369
00370
00371 if (broadcast) {
00372 if (uuid == null) {
00373 logger.debug("Generating uuid for askingNode message");
00374 uuid = UUID.randomUUID();
00375 }
00376 this.acquaintances.askingNode(ttl, uuid, remoteService,
00377 numberOfNodes, lookup, vnName, jobId, nodeFamilyRegexp);
00378 logger.debug("Broadcast askingNode message with #" + uuid);
00379 }
00380 }
00381
00389 public P2PNodeLookup getNodes(int numberOfNodes, String nodeFamilyRegexp,
00390 String vnName, String jobId) {
00391 assert vnName != null : vnName;
00392 assert jobId != null : jobId;
00393 Object[] params = new Object[5];
00394 params[0] = new Integer(numberOfNodes);
00395 params[1] = this.stubOnThis;
00396 params[2] = vnName;
00397 params[3] = jobId;
00398 params[4] = nodeFamilyRegexp;
00399
00400 P2PNodeLookup lookup = null;
00401 try {
00402 lookup = (P2PNodeLookup) ProActive.newActive(P2PNodeLookup.class.getName(),
00403 params, this.p2pServiceNode);
00404 ProActive.enableAC(lookup);
00405 if (numberOfNodes == MAX_NODE) {
00406 this.waitingMaximunNodesLookup.add(lookup);
00407 } else {
00408 this.waitingNodesLookup.add(lookup);
00409 }
00410 } catch (ActiveObjectCreationException e) {
00411 logger.fatal("Couldn't create an active lookup", e);
00412 return null;
00413 } catch (NodeException e) {
00414 logger.fatal("Couldn't connect node to creat", e);
00415 return null;
00416 } catch (IOException e) {
00417 if (logger.isDebugEnabled()) {
00418 logger.debug("Couldn't enable AC for a nodes lookup", e);
00419 }
00420 }
00421
00422 if (logger.isInfoEnabled()) {
00423 if (numberOfNodes != MAX_NODE) {
00424 logger.info("Asking for " + numberOfNodes + " nodes");
00425 } else {
00426 logger.info("Asking for maxinum nodes");
00427 }
00428 }
00429 return lookup;
00430 }
00431
00438 public P2PNodeLookup getNodes(int numberOfNodes, String vnName, String jobId) {
00439 return this.getNodes(numberOfNodes, ".*", vnName, jobId);
00440 }
00441
00449 public Node getANode(String vnName, String jobId) {
00450 return this.stubOnThis.getANode(vnName, jobId, this.stubOnThis);
00451 }
00452
00460 public Node getANode(String vnName, String jobId, P2PService service) {
00461 if (service.equals(this.stubOnThis)) {
00462 return this.acquaintanceManager.randomPeer()
00463 .getANode(vnName, jobId, service);
00464 }
00465 P2PNode askedNode = this.nodeManager.askingNode(null);
00466 Node nodeAvailable = askedNode.getNode();
00467
00468 if (nodeAvailable != null) {
00469 if (vnName != null) {
00470 try {
00471 nodeAvailable.getProActiveRuntime()
00472 .registerVirtualNode(vnName, true);
00473 } catch (Exception e) {
00474 logger.warn("Couldn't register " + vnName + " in the PAR", e);
00475 }
00476 }
00477 if (jobId != null) {
00478 nodeAvailable.getNodeInformation().setJobID(jobId);
00479 }
00480 return nodeAvailable;
00481 }
00482 return this.acquaintanceManager.randomPeer()
00483 .getANode(vnName, jobId, service);
00484 }
00485
00493 public P2PNodeLookup getMaximunNodes(String vnName, String jobId) {
00494 return this.getNodes(P2PConstants.MAX_NODE, vnName, jobId);
00495 }
00496
00501 public StringWrapper getAddress() {
00502 return new StringWrapper(this.p2pServiceNode.getNodeInformation()
00503 .getURL());
00504 }
00505
00511
00519
00534
00576
00599
00616
00629
00674