00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.p2p.service;
00032 
00033 import java.io.IOException;
00034 import java.io.Serializable;
00035 import java.util.Random;
00036 import java.util.UUID;
00037 import java.util.Vector;
00038 
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.ActiveObjectCreationException;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.InitActive;
00043 import org.objectweb.proactive.ProActive;
00044 import org.objectweb.proactive.ProActiveInternalObject;
00045 import org.objectweb.proactive.Service;
00046 import org.objectweb.proactive.core.Constants;
00047 import org.objectweb.proactive.core.body.UniversalBody;
00048 import org.objectweb.proactive.core.body.request.Request;
00049 import org.objectweb.proactive.core.body.request.RequestFilter;
00050 import org.objectweb.proactive.core.mop.MOP;
00051 import org.objectweb.proactive.core.node.Node;
00052 import org.objectweb.proactive.core.node.NodeException;
00053 import org.objectweb.proactive.core.node.NodeFactory;
00054 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00055 import org.objectweb.proactive.core.util.log.Loggers;
00056 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00057 import org.objectweb.proactive.core.util.wrapper.IntWrapper;
00058 import org.objectweb.proactive.core.util.wrapper.StringWrapper;
00059 import org.objectweb.proactive.p2p.service.exception.P2POldMessageException;
00060 import org.objectweb.proactive.p2p.service.node.P2PNode;
00061 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00062 import org.objectweb.proactive.p2p.service.node.P2PNodeManager;
00063 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00064 
00065 
00073 public class P2PService implements InitActive, P2PConstants, Serializable,
00074     ProActiveInternalObject {
00075 
00077     private static final Logger logger = ProActiveLogger.getLogger(Loggers.P2P_SERVICE);
00078 
00080     private P2PService acquaintances;
00081 
00085     private P2PAcquaintanceManager acquaintanceManager;
00086 
00090     private Node p2pServiceNode = null;
00091     private static final int MSG_MEMORY = Integer.parseInt(System.getProperty(
00092                 P2PConstants.PROPERTY_MSG_MEMORY));
00093     private static final int NOA = Integer.parseInt(System.getProperty(
00094                 P2PConstants.PROPERTY_NOA));
00095     private static final int EXPL_MSG = Integer.parseInt(System.getProperty(
00096                 P2PConstants.PROPERTY_EXPLORING_MSG)) - 1;
00097     private static final long ACQ_TO = Long.parseLong(System.getProperty(
00098                 P2PConstants.PROPERTY_NODES_ACQUISITION_T0));
00099 
00103     private static final Random randomizer = new Random();
00104 
00108     private Vector<UUID> oldMessageList = new Vector<UUID>(MSG_MEMORY);
00109     private P2PNodeManager nodeManager = null;
00110 
00114     private Vector<P2PNodeLookup> waitingNodesLookup = new Vector<P2PNodeLookup>();
00115     private Vector<P2PNodeLookup> waitingMaximunNodesLookup = new Vector<P2PNodeLookup>();
00116     private P2PService stubOnThis = null;
00117 
00118     
00119     private Service service = null;
00120     private RequestFilter filter = new RequestFilter() {
00121 
00125             public boolean acceptRequest(Request request) {
00126                 String requestName = request.getMethodName();
00127                 if (requestName.compareToIgnoreCase("askingNode") == 0) {
00128                     return false;
00129                 }
00130                 return true;
00131             }
00132         };
00133 
00134     
00135     
00136     
00137 
00143     public P2PService() {
00144         
00145     }
00146 
00147     
00148     
00149     
00150 
00155     public void firstContact(Vector peers) {
00156         
00157         Object[] params = new Object[3];
00158         params[0] = peers;
00159         params[1] = this.acquaintanceManager;
00160         params[2] = this.stubOnThis;
00161         try {
00162             ProActive.newActive(P2PFirstContact.class.getName(), params,
00163                 this.p2pServiceNode);
00164         } catch (ActiveObjectCreationException e) {
00165             logger.warn("Couldn't active P2PFirstContact", e);
00166         } catch (NodeException e) {
00167             logger.warn("Couldn't active P2PFirstContact", e);
00168         }
00169     }
00170 
00176     public void register(P2PService service) {
00177         try {
00178             if (!this.stubOnThis.equals(service)) {
00179                 this.acquaintanceManager.add(service);
00180                 if (logger.isDebugEnabled()) {
00181                     logger.debug("Remote peer localy registered: " +
00182                         ProActive.getActiveObjectNodeUrl(service));
00183                 }
00184 
00185                 
00186                 this.wakeUpEveryBody();
00187             }
00188         } catch (Exception e) {
00189             logger.debug("The remote P2P service is certainly down", e);
00190         }
00191     }
00192 
00196     public void heartBeat() {
00197         logger.debug("Heart-beat message received");
00198     }
00199 
00208     public void exploring(int ttl, UUID uuid, P2PService remoteService) {
00209         if (uuid != null) {
00210             logger.debug("Exploring message received with #" + uuid);
00211             ttl--;
00212         }
00213 
00214         boolean broadcast;
00215         try {
00216             broadcast = broadcaster(ttl, uuid, remoteService);
00217         } catch (P2POldMessageException e) {
00218             return;
00219         }
00220 
00221         
00222         if (this.shouldBeAcquaintance(remoteService)) {
00223             this.register(remoteService);
00224             try {
00225                 remoteService.register(this.stubOnThis);
00226             } catch (Exception e) {
00227                 logger.debug("Trouble with registering remote peer", e);
00228                 this.acquaintanceManager.remove(remoteService);
00229             }
00230         } else if (broadcast) {
00231             
00232             if (uuid == null) {
00233                 logger.debug("Generating uuid for exploring message");
00234                 uuid = UUID.randomUUID();
00235             }
00236             this.acquaintances.exploring(ttl, uuid, remoteService);
00237             logger.debug("Broadcast exploring message with #" + uuid);
00238             uuid = null;
00239         }
00240     }
00241 
00253     public void askingNode(int ttl, UUID uuid, P2PService remoteService,
00254         int numberOfNodes, P2PNodeLookup lookup, String vnName, String jobId,
00255         String nodeFamilyRegexp) {
00256         boolean broadcast;
00257         if (uuid != null) {
00258             logger.debug("AskingNode message received with #" + uuid);
00259             ttl--;
00260             try {
00261                 broadcast = broadcaster(ttl, uuid, remoteService);
00262             } catch (P2POldMessageException e) {
00263                 return;
00264             }
00265         } else {
00266             broadcast = true;
00267         }
00268 
00269         
00270         if ((uuid != null) || (numberOfNodes == MAX_NODE)) {
00271             if (numberOfNodes == MAX_NODE) {
00272                 Vector nodes = this.nodeManager.askingAllNodes(nodeFamilyRegexp);
00273                 for (int i = 0; i < nodes.size(); i++) {
00274                     Node current = (Node) nodes.get(i);
00275                     if (vnName != null) {
00276                         try {
00277                             current.getProActiveRuntime()
00278                                    .registerVirtualNode(vnName, true);
00279                         } catch (Exception e) {
00280                             logger.warn("Couldn't register " + vnName +
00281                                 " in the PAR", e);
00282                         }
00283                     }
00284                     if (jobId != null) {
00285                         current.getNodeInformation().setJobID(jobId);
00286                     }
00287                 }
00288                 if (nodes.size() > 0) {
00289                     lookup.giveNodeForMax(nodes, this.nodeManager);
00290                 }
00291             } else {
00292                 P2PNode askedNode = this.nodeManager.askingNode(nodeFamilyRegexp);
00293 
00294                 
00295                 Node nodeAvailable = askedNode.getNode();
00296                 if (nodeAvailable != null) {
00297                     IntWrapper nodeAck;
00298 
00299                     try {
00300                         nodeAck = lookup.giveNode(nodeAvailable,
00301                                 askedNode.getNodeManager());
00302                     } catch (Exception lookupExcption) {
00303                         logger.info("Cannot contact the remote lookup",
00304                             lookupExcption);
00305                         this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00306                         return;
00307                     }
00308                     if (nodeAck != null) {
00309                         
00310                         long endTime = System.currentTimeMillis() + ACQ_TO;
00311                         while ((System.currentTimeMillis() < endTime) &&
00312                                 ProActive.isAwaited(nodeAck)) {
00313                             if (this.service.hasRequestToServe()) {
00314                                 service.serveAll(this.filter);
00315                             } else {
00316                                 try {
00317                                     Thread.sleep(100);
00318                                 } catch (InterruptedException e) {
00319                                     logger.debug(e);
00320                                 }
00321                             }
00322                         }
00323 
00324                         
00325                         if (ProActive.isAwaited(nodeAck)) {
00326                             
00327                             
00328                             logger.debug("Ack timeout expired");
00329                             this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00330                             return;
00331                         }
00332                     }
00333 
00334                     if (nodeAck.intValue() != -1) {
00335                         
00336                         if (vnName != null) {
00337                             try {
00338                                 nodeAvailable.getProActiveRuntime()
00339                                              .registerVirtualNode(vnName, true);
00340                             } catch (Exception e) {
00341                                 logger.warn("Couldn't register " + vnName +
00342                                     " in the PAR", e);
00343                             }
00344                         }
00345                         if (jobId != null) {
00346                             nodeAvailable.getNodeInformation().setJobID(jobId);
00347                         }
00348                         numberOfNodes = (numberOfNodes == MAX_NODE) ? MAX_NODE
00349                                                                     : nodeAck.intValue();
00350                         logger.info("Giving 1 node to vn: " + vnName);
00351                     } else {
00352                         
00353                         logger.debug("NACK node received");
00354                         this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00355                         
00356                         return;
00357                     }
00358                 }
00359 
00360                 
00361                 if (numberOfNodes == 0) {
00362                     logger.debug("No more nodes are needed");
00363                     return;
00364                 }
00365             }
00366         }
00367 
00368         
00369         
00370         
00371         if (broadcast) {
00372             if (uuid == null) {
00373                 logger.debug("Generating uuid for askingNode message");
00374                 uuid = UUID.randomUUID();
00375             }
00376             this.acquaintances.askingNode(ttl, uuid, remoteService,
00377                 numberOfNodes, lookup, vnName, jobId, nodeFamilyRegexp);
00378             logger.debug("Broadcast askingNode message with #" + uuid);
00379         }
00380     }
00381 
00389     public P2PNodeLookup getNodes(int numberOfNodes, String nodeFamilyRegexp,
00390         String vnName, String jobId) {
00391         assert vnName != null : vnName;
00392         assert jobId != null : jobId;
00393         Object[] params = new Object[5];
00394         params[0] = new Integer(numberOfNodes);
00395         params[1] = this.stubOnThis;
00396         params[2] = vnName;
00397         params[3] = jobId;
00398         params[4] = nodeFamilyRegexp;
00399 
00400         P2PNodeLookup lookup = null;
00401         try {
00402             lookup = (P2PNodeLookup) ProActive.newActive(P2PNodeLookup.class.getName(),
00403                     params, this.p2pServiceNode);
00404             ProActive.enableAC(lookup);
00405             if (numberOfNodes == MAX_NODE) {
00406                 this.waitingMaximunNodesLookup.add(lookup);
00407             } else {
00408                 this.waitingNodesLookup.add(lookup);
00409             }
00410         } catch (ActiveObjectCreationException e) {
00411             logger.fatal("Couldn't create an active lookup", e);
00412             return null;
00413         } catch (NodeException e) {
00414             logger.fatal("Couldn't connect node to creat", e);
00415             return null;
00416         } catch (IOException e) {
00417             if (logger.isDebugEnabled()) {
00418                 logger.debug("Couldn't enable AC for a nodes lookup", e);
00419             }
00420         }
00421 
00422         if (logger.isInfoEnabled()) {
00423             if (numberOfNodes != MAX_NODE) {
00424                 logger.info("Asking for " + numberOfNodes + " nodes");
00425             } else {
00426                 logger.info("Asking for maxinum nodes");
00427             }
00428         }
00429         return lookup;
00430     }
00431 
00438     public P2PNodeLookup getNodes(int numberOfNodes, String vnName, String jobId) {
00439         return this.getNodes(numberOfNodes, ".*", vnName, jobId);
00440     }
00441 
00449     public Node getANode(String vnName, String jobId) {
00450         return this.stubOnThis.getANode(vnName, jobId, this.stubOnThis);
00451     }
00452 
00460     public Node getANode(String vnName, String jobId, P2PService service) {
00461         if (service.equals(this.stubOnThis)) {
00462             return this.acquaintanceManager.randomPeer()
00463                                            .getANode(vnName, jobId, service);
00464         }
00465         P2PNode askedNode = this.nodeManager.askingNode(null);
00466         Node nodeAvailable = askedNode.getNode();
00467 
00468         if (nodeAvailable != null) {
00469             if (vnName != null) {
00470                 try {
00471                     nodeAvailable.getProActiveRuntime()
00472                                  .registerVirtualNode(vnName, true);
00473                 } catch (Exception e) {
00474                     logger.warn("Couldn't register " + vnName + " in the PAR", e);
00475                 }
00476             }
00477             if (jobId != null) {
00478                 nodeAvailable.getNodeInformation().setJobID(jobId);
00479             }
00480             return nodeAvailable;
00481         }
00482         return this.acquaintanceManager.randomPeer()
00483                                        .getANode(vnName, jobId, service);
00484     }
00485 
00493     public P2PNodeLookup getMaximunNodes(String vnName, String jobId) {
00494         return this.getNodes(P2PConstants.MAX_NODE, vnName, jobId);
00495     }
00496 
00501     public StringWrapper getAddress() {
00502         return new StringWrapper(this.p2pServiceNode.getNodeInformation()
00503                                                     .getURL());
00504     }
00505 
00511 
00519 
00534 
00576 
00599 
00616 
00629 
00674