org/objectweb/proactive/p2p/service/P2PService.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.p2p.service;
00032 
00033 import java.io.IOException;
00034 import java.io.Serializable;
00035 import java.util.Random;
00036 import java.util.UUID;
00037 import java.util.Vector;
00038 
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.ActiveObjectCreationException;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.InitActive;
00043 import org.objectweb.proactive.ProActive;
00044 import org.objectweb.proactive.ProActiveInternalObject;
00045 import org.objectweb.proactive.Service;
00046 import org.objectweb.proactive.core.Constants;
00047 import org.objectweb.proactive.core.body.UniversalBody;
00048 import org.objectweb.proactive.core.body.request.Request;
00049 import org.objectweb.proactive.core.body.request.RequestFilter;
00050 import org.objectweb.proactive.core.mop.MOP;
00051 import org.objectweb.proactive.core.node.Node;
00052 import org.objectweb.proactive.core.node.NodeException;
00053 import org.objectweb.proactive.core.node.NodeFactory;
00054 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00055 import org.objectweb.proactive.core.util.log.Loggers;
00056 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00057 import org.objectweb.proactive.core.util.wrapper.IntWrapper;
00058 import org.objectweb.proactive.core.util.wrapper.StringWrapper;
00059 import org.objectweb.proactive.p2p.service.exception.P2POldMessageException;
00060 import org.objectweb.proactive.p2p.service.node.P2PNode;
00061 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00062 import org.objectweb.proactive.p2p.service.node.P2PNodeManager;
00063 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00064 
00065 
00073 public class P2PService implements InitActive, P2PConstants, Serializable,
00074     ProActiveInternalObject {
00075 
00077     private static final Logger logger = ProActiveLogger.getLogger(Loggers.P2P_SERVICE);
00078 
00080     private P2PService acquaintances;
00081 
00085     private P2PAcquaintanceManager acquaintanceManager;
00086 
00090     private Node p2pServiceNode = null;
00091     private static final int MSG_MEMORY = Integer.parseInt(System.getProperty(
00092                 P2PConstants.PROPERTY_MSG_MEMORY));
00093     private static final int NOA = Integer.parseInt(System.getProperty(
00094                 P2PConstants.PROPERTY_NOA));
00095     private static final int EXPL_MSG = Integer.parseInt(System.getProperty(
00096                 P2PConstants.PROPERTY_EXPLORING_MSG)) - 1;
00097     private static final long ACQ_TO = Long.parseLong(System.getProperty(
00098                 P2PConstants.PROPERTY_NODES_ACQUISITION_T0));
00099 
00103     private static final Random randomizer = new Random();
00104 
00108     private Vector<UUID> oldMessageList = new Vector<UUID>(MSG_MEMORY);
00109     private P2PNodeManager nodeManager = null;
00110 
00114     private Vector<P2PNodeLookup> waitingNodesLookup = new Vector<P2PNodeLookup>();
00115     private Vector<P2PNodeLookup> waitingMaximunNodesLookup = new Vector<P2PNodeLookup>();
00116     private P2PService stubOnThis = null;
00117 
00118     // For asking nodes
00119     private Service service = null;
00120     private RequestFilter filter = new RequestFilter() {
00121 
00125             public boolean acceptRequest(Request request) {
00126                 String requestName = request.getMethodName();
00127                 if (requestName.compareToIgnoreCase("askingNode") == 0) {
00128                     return false;
00129                 }
00130                 return true;
00131             }
00132         };
00133 
00134     //--------------------------------------------------------------------------
00135     // Class Constructors
00136     //--------------------------------------------------------------------------
00137 
00143     public P2PService() {
00144         // empty
00145     }
00146 
00147     //--------------------------------------------------------------------------
00148     // Public Class methods
00149     // -------------------------------------------------------------------------
00150 
00155     public void firstContact(Vector peers) {
00156         // Creating an active P2PFirstContact
00157         Object[] params = new Object[3];
00158         params[0] = peers;
00159         params[1] = this.acquaintanceManager;
00160         params[2] = this.stubOnThis;
00161         try {
00162             ProActive.newActive(P2PFirstContact.class.getName(), params,
00163                 this.p2pServiceNode);
00164         } catch (ActiveObjectCreationException e) {
00165             logger.warn("Couldn't active P2PFirstContact", e);
00166         } catch (NodeException e) {
00167             logger.warn("Couldn't active P2PFirstContact", e);
00168         }
00169     }
00170 
00176     public void register(P2PService service) {
00177         try {
00178             if (!this.stubOnThis.equals(service)) {
00179                 this.acquaintanceManager.add(service);
00180                 if (logger.isDebugEnabled()) {
00181                     logger.debug("Remote peer localy registered: " +
00182                         ProActive.getActiveObjectNodeUrl(service));
00183                 }
00184 
00185                 // Wake up all node accessor, because new peers are know
00186                 this.wakeUpEveryBody();
00187             }
00188         } catch (Exception e) {
00189             logger.debug("The remote P2P service is certainly down", e);
00190         }
00191     }
00192 
00196     public void heartBeat() {
00197         logger.debug("Heart-beat message received");
00198     }
00199 
00208     public void exploring(int ttl, UUID uuid, P2PService remoteService) {
00209         if (uuid != null) {
00210             logger.debug("Exploring message received with #" + uuid);
00211             ttl--;
00212         }
00213 
00214         boolean broadcast;
00215         try {
00216             broadcast = broadcaster(ttl, uuid, remoteService);
00217         } catch (P2POldMessageException e) {
00218             return;
00219         }
00220 
00221         // This should be register
00222         if (this.shouldBeAcquaintance(remoteService)) {
00223             this.register(remoteService);
00224             try {
00225                 remoteService.register(this.stubOnThis);
00226             } catch (Exception e) {
00227                 logger.debug("Trouble with registering remote peer", e);
00228                 this.acquaintanceManager.remove(remoteService);
00229             }
00230         } else if (broadcast) {
00231             // Forwarding the message
00232             if (uuid == null) {
00233                 logger.debug("Generating uuid for exploring message");
00234                 uuid = UUID.randomUUID();
00235             }
00236             this.acquaintances.exploring(ttl, uuid, remoteService);
00237             logger.debug("Broadcast exploring message with #" + uuid);
00238             uuid = null;
00239         }
00240     }
00241 
00253     public void askingNode(int ttl, UUID uuid, P2PService remoteService,
00254         int numberOfNodes, P2PNodeLookup lookup, String vnName, String jobId,
00255         String nodeFamilyRegexp) {
00256         boolean broadcast;
00257         if (uuid != null) {
00258             logger.debug("AskingNode message received with #" + uuid);
00259             ttl--;
00260             try {
00261                 broadcast = broadcaster(ttl, uuid, remoteService);
00262             } catch (P2POldMessageException e) {
00263                 return;
00264             }
00265         } else {
00266             broadcast = true;
00267         }
00268 
00269         // Do not give a local node to a local request
00270         if ((uuid != null) || (numberOfNodes == MAX_NODE)) {
00271             if (numberOfNodes == MAX_NODE) {
00272                 Vector nodes = this.nodeManager.askingAllNodes(nodeFamilyRegexp);
00273                 for (int i = 0; i < nodes.size(); i++) {
00274                     Node current = (Node) nodes.get(i);
00275                     if (vnName != null) {
00276                         try {
00277                             current.getProActiveRuntime()
00278                                    .registerVirtualNode(vnName, true);
00279                         } catch (Exception e) {
00280                             logger.warn("Couldn't register " + vnName +
00281                                 " in the PAR", e);
00282                         }
00283                     }
00284                     if (jobId != null) {
00285                         current.getNodeInformation().setJobID(jobId);
00286                     }
00287                 }
00288                 if (nodes.size() > 0) {
00289                     lookup.giveNodeForMax(nodes, this.nodeManager);
00290                 }
00291             } else {
00292                 P2PNode askedNode = this.nodeManager.askingNode(nodeFamilyRegexp);
00293 
00294                 // Asking node available?
00295                 Node nodeAvailable = askedNode.getNode();
00296                 if (nodeAvailable != null) {
00297                     IntWrapper nodeAck;
00298 
00299                     try {
00300                         nodeAck = lookup.giveNode(nodeAvailable,
00301                                 askedNode.getNodeManager());
00302                     } catch (Exception lookupExcption) {
00303                         logger.info("Cannot contact the remote lookup",
00304                             lookupExcption);
00305                         this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00306                         return;
00307                     }
00308                     if (nodeAck != null) {
00309                         // Waitng the ACK
00310                         long endTime = System.currentTimeMillis() + ACQ_TO;
00311                         while ((System.currentTimeMillis() < endTime) &&
00312                                 ProActive.isAwaited(nodeAck)) {
00313                             if (this.service.hasRequestToServe()) {
00314                                 service.serveAll(this.filter);
00315                             } else {
00316                                 try {
00317                                     Thread.sleep(100);
00318                                 } catch (InterruptedException e) {
00319                                     logger.debug(e);
00320                                 }
00321                             }
00322                         }
00323 
00324                         // Testing future is here or timeout is expired??
00325                         if (ProActive.isAwaited(nodeAck)) {
00326                             // Do not forward the message
00327                             // Prevent from deadlock
00328                             logger.debug("Ack timeout expired");
00329                             this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00330                             return;
00331                         }
00332                     }
00333 
00334                     if (nodeAck.intValue() != -1) {
00335                         // Setting vnInformation and JobId
00336                         if (vnName != null) {
00337                             try {
00338                                 nodeAvailable.getProActiveRuntime()
00339                                              .registerVirtualNode(vnName, true);
00340                             } catch (Exception e) {
00341                                 logger.warn("Couldn't register " + vnName +
00342                                     " in the PAR", e);
00343                             }
00344                         }
00345                         if (jobId != null) {
00346                             nodeAvailable.getNodeInformation().setJobID(jobId);
00347                         }
00348                         numberOfNodes = (numberOfNodes == MAX_NODE) ? MAX_NODE
00349                                                                     : nodeAck.intValue();
00350                         logger.info("Giving 1 node to vn: " + vnName);
00351                     } else {
00352                         // It's a NACK node
00353                         logger.debug("NACK node received");
00354                         this.nodeManager.noMoreNodeNeeded(nodeAvailable);
00355                         // No more nodes needed
00356                         return;
00357                     }
00358                 }
00359 
00360                 // Do we need more nodes?
00361                 if (numberOfNodes == 0) {
00362                     logger.debug("No more nodes are needed");
00363                     return;
00364                 }
00365             }
00366         }
00367 
00368         // My friend needs more nodes, so I'm broadcasting his request to my own
00369         // friends
00370         // Forwarding the message
00371         if (broadcast) {
00372             if (uuid == null) {
00373                 logger.debug("Generating uuid for askingNode message");
00374                 uuid = UUID.randomUUID();
00375             }
00376             this.acquaintances.askingNode(ttl, uuid, remoteService,
00377                 numberOfNodes, lookup, vnName, jobId, nodeFamilyRegexp);
00378             logger.debug("Broadcast askingNode message with #" + uuid);
00379         }
00380     }
00381 
00389     public P2PNodeLookup getNodes(int numberOfNodes, String nodeFamilyRegexp,
00390         String vnName, String jobId) {
00391         assert vnName != null : vnName;
00392         assert jobId != null : jobId;
00393         Object[] params = new Object[5];
00394         params[0] = new Integer(numberOfNodes);
00395         params[1] = this.stubOnThis;
00396         params[2] = vnName;
00397         params[3] = jobId;
00398         params[4] = nodeFamilyRegexp;
00399 
00400         P2PNodeLookup lookup = null;
00401         try {
00402             lookup = (P2PNodeLookup) ProActive.newActive(P2PNodeLookup.class.getName(),
00403                     params, this.p2pServiceNode);
00404             ProActive.enableAC(lookup);
00405             if (numberOfNodes == MAX_NODE) {
00406                 this.waitingMaximunNodesLookup.add(lookup);
00407             } else {
00408                 this.waitingNodesLookup.add(lookup);
00409             }
00410         } catch (ActiveObjectCreationException e) {
00411             logger.fatal("Couldn't create an active lookup", e);
00412             return null;
00413         } catch (NodeException e) {
00414             logger.fatal("Couldn't connect node to creat", e);
00415             return null;
00416         } catch (IOException e) {
00417             if (logger.isDebugEnabled()) {
00418                 logger.debug("Couldn't enable AC for a nodes lookup", e);
00419             }
00420         }
00421 
00422         if (logger.isInfoEnabled()) {
00423             if (numberOfNodes != MAX_NODE) {
00424                 logger.info("Asking for " + numberOfNodes + " nodes");
00425             } else {
00426                 logger.info("Asking for maxinum nodes");
00427             }
00428         }
00429         return lookup;
00430     }
00431 
00438     public P2PNodeLookup getNodes(int numberOfNodes, String vnName, String jobId) {
00439         return this.getNodes(numberOfNodes, ".*", vnName, jobId);
00440     }
00441 
00449     public Node getANode(String vnName, String jobId) {
00450         return this.stubOnThis.getANode(vnName, jobId, this.stubOnThis);
00451     }
00452 
00460     public Node getANode(String vnName, String jobId, P2PService service) {
00461         if (service.equals(this.stubOnThis)) {
00462             return this.acquaintanceManager.randomPeer()
00463                                            .getANode(vnName, jobId, service);
00464         }
00465         P2PNode askedNode = this.nodeManager.askingNode(null);
00466         Node nodeAvailable = askedNode.getNode();
00467 
00468         if (nodeAvailable != null) {
00469             if (vnName != null) {
00470                 try {
00471                     nodeAvailable.getProActiveRuntime()
00472                                  .registerVirtualNode(vnName, true);
00473                 } catch (Exception e) {
00474                     logger.warn("Couldn't register " + vnName + " in the PAR", e);
00475                 }
00476             }
00477             if (jobId != null) {
00478                 nodeAvailable.getNodeInformation().setJobID(jobId);
00479             }
00480             return nodeAvailable;
00481         }
00482         return this.acquaintanceManager.randomPeer()
00483                                        .getANode(vnName, jobId, service);
00484     }
00485 
00493     public P2PNodeLookup getMaximunNodes(String vnName, String jobId) {
00494         return this.getNodes(P2PConstants.MAX_NODE, vnName, jobId);
00495     }
00496 
00501     public StringWrapper getAddress() {
00502         return new StringWrapper(this.p2pServiceNode.getNodeInformation()
00503                                                     .getURL());
00504     }
00505 
00511 
00519 
00534 
00576 
00599 
00616 
00629 
00674 

Generated on Mon Jan 22 15:16:11 2007 for ProActive by  doxygen 1.5.1