org/objectweb/proactive/p2p/service/node/P2PNodeLookup.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.p2p.service.node;
00032 
00033 import java.io.Serializable;
00034 import java.util.HashMap;
00035 import java.util.Vector;
00036 
00037 import org.apache.log4j.Logger;
00038 import org.objectweb.proactive.Body;
00039 import org.objectweb.proactive.EndActive;
00040 import org.objectweb.proactive.InitActive;
00041 import org.objectweb.proactive.ProActive;
00042 import org.objectweb.proactive.ProActiveInternalObject;
00043 import org.objectweb.proactive.RunActive;
00044 import org.objectweb.proactive.Service;
00045 import org.objectweb.proactive.core.ProActiveException;
00046 import org.objectweb.proactive.core.node.Node;
00047 import org.objectweb.proactive.core.node.NodeFactory;
00048 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00049 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00050 import org.objectweb.proactive.core.util.log.Loggers;
00051 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00052 import org.objectweb.proactive.core.util.wrapper.IntWrapper;
00053 import org.objectweb.proactive.p2p.service.P2PService;
00054 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00055 
00056 
00062 public class P2PNodeLookup implements InitActive, RunActive, EndActive,
00063     P2PConstants, Serializable, ProActiveInternalObject {
00064     private static final Logger logger = ProActiveLogger.getLogger(Loggers.P2P_NODES);
00065     private Vector<Node> waitingNodesList;
00066     private Vector<String> nodesToKillList;
00067     private long expirationTime;
00068     private static final long TIMEOUT = Long.parseLong(System.getProperty(
00069                 PROPERTY_NODES_ACQUISITION_T0));
00070     private static final long LOOKUP_FREQ = Long.parseLong(System.getProperty(
00071                 PROPERTY_LOOKUP_FREQ));
00072     private static final int TTL = Integer.parseInt(System.getProperty(
00073                 PROPERTY_TTL));
00074     private int numberOfAskedNodes;
00075     private int acquiredNodes = 0;
00076     private P2PService localP2pService;
00077     private String vnName;
00078     private String jobId;
00079     private P2PNodeLookup stub;
00080     private ProActiveRuntime paRuntime;
00081     private String parUrl;
00082     private HashMap<String, P2PNodeManager> nodeManagerMap = new HashMap<String, P2PNodeManager>();
00083     private boolean killAllFlag = false;
00084     private String nodeFamilyRegexp = null;
00085 
00086     public P2PNodeLookup() {
00087         // the empty constructor
00088     }
00089 
00090     public P2PNodeLookup(Integer numberOfAskedNodes,
00091         P2PService localP2pService, String vnName, String jobId,
00092         String nodeFamilyRegexp) {
00093         this.waitingNodesList = new Vector<Node>();
00094         this.nodesToKillList = new Vector<String>();
00095         this.expirationTime = System.currentTimeMillis() + TIMEOUT;
00096         this.numberOfAskedNodes = numberOfAskedNodes.intValue();
00097         assert (this.numberOfAskedNodes > 0) ||
00098         (this.numberOfAskedNodes == MAX_NODE) : "None authorized value for asked nodes";
00099         // Use special case: do not check TO
00100         //        if (this.numberOfAskedNodes == MAX_NODE) {
00101         //            this.expirationTime = Long.MAX_VALUE;
00102         //        }
00103         this.localP2pService = localP2pService;
00104         this.vnName = vnName;
00105         this.jobId = jobId;
00106         this.nodeFamilyRegexp = nodeFamilyRegexp;
00107     }
00108 
00109     // -------------------------------------------------------------------------
00110     // Access methods
00111     // -------------------------------------------------------------------------
00112 
00119     public boolean allArrived() {
00120         if (this.numberOfAskedNodes != MAX_NODE) {
00121             return (this.numberOfAskedNodes == this.acquiredNodes);
00122         } else {
00123             return false;
00124         }
00125     }
00126 
00132     public boolean nArrived(int n) {
00133         return n <= this.acquiredNodes;
00134     }
00135 
00140     public void killNode(String node) {
00141         try {
00142             Node remoteNode = NodeFactory.getNode(node);
00143             ProActiveRuntime remoteRuntime = remoteNode.getProActiveRuntime();
00144             P2PNodeManager remoteNodeManager = (P2PNodeManager) this.nodeManagerMap.get(node);
00145 
00146             remoteRuntime.unregisterVirtualNode(vnName);
00147             remoteRuntime.rmAcquaintance(this.parUrl);
00148             this.paRuntime.rmAcquaintance(remoteRuntime.getURL());
00149             remoteNodeManager.leaveNode(remoteNode, this.vnName);
00150 
00151             logger.info("Node at " + node + " succefuly removed");
00152 
00153             // Unregister the remote runtime
00154             this.paRuntime.unregister(remoteRuntime, remoteRuntime.getURL(),
00155                 "p2p", System.getProperty(PROPERTY_ACQUISITION) + ":",
00156                 remoteRuntime.getVMInformation().getName());
00157         } catch (Exception e) {
00158             logger.info("Node @" + node + " already down");
00159         } finally {
00160             this.nodesToKillList.remove(node);
00161         }
00162     }
00163 
00169     public void killAllNodes() {
00170         this.killAllFlag = true;
00171         while (this.nodesToKillList.size() > 0) {
00172             String currentNode = this.nodesToKillList.get(0);
00173             this.killNode(currentNode);
00174         }
00175     }
00176 
00180     public void wakeUp() {
00181         // nothing to do, just wake up the run activity
00182     }
00183 
00191     public IntWrapper giveNode(Node givenNode, P2PNodeManager remoteNodeManager) {
00192         if (logger.isDebugEnabled()) {
00193             logger.debug("Given node received from " +
00194                 givenNode.getNodeInformation().getURL());
00195         }
00196 
00197         // Get currrent nodes accessor
00198         if (!this.allArrived()) {
00199             this.waitingNodesList.add(givenNode);
00200             String nodeUrl = givenNode.getNodeInformation().getURL();
00201             this.nodeManagerMap.put(nodeUrl, remoteNodeManager);
00202             this.acquiredNodes++;
00203             ProActiveRuntime remoteRt = givenNode.getProActiveRuntime();
00204             try {
00205                 remoteRt.addAcquaintance(this.parUrl);
00206                 this.paRuntime.addAcquaintance(remoteRt.getURL());
00207                 this.paRuntime.register(remoteRt, remoteRt.getURL(), "p2p",
00208                     System.getProperty(PROPERTY_ACQUISITION) + ":",
00209                     remoteRt.getVMInformation().getName());
00210             } catch (ProActiveException e) {
00211                 logger.warn("Couldn't recgister the remote runtime", e);
00212             }
00213             logger.info("Node at " + nodeUrl + " succefuly added");
00214             logger.info("Lookup got " + this.acquiredNodes + " nodes");
00215             return new IntWrapper(this.numberOfAskedNodes - this.acquiredNodes);
00216         }
00217         return new IntWrapper(-1);
00218     }
00219 
00220     public void giveNodeForMax(Vector<Node> givenNodes,
00221         P2PNodeManager remoteNodeManager) {
00222         // Get currrent nodes accessor
00223         this.waitingNodesList.addAll(givenNodes);
00224         for (int i = 0; i < givenNodes.size(); i++) {
00225             Node current = (Node) givenNodes.get(i);
00226             String nodeUrl = current.getNodeInformation().getURL();
00227             this.nodeManagerMap.put(nodeUrl, remoteNodeManager);
00228             this.acquiredNodes++;
00229             ProActiveRuntime remoteRt = current.getProActiveRuntime();
00230             try {
00231                 remoteRt.addAcquaintance(this.parUrl);
00232                 this.paRuntime.addAcquaintance(remoteRt.getURL());
00233                 this.paRuntime.register(remoteRt, remoteRt.getURL(), "p2p",
00234                     System.getProperty(PROPERTY_ACQUISITION) + ":",
00235                     remoteRt.getVMInformation().getName());
00236             } catch (ProActiveException e) {
00237                 logger.warn("Couldn't recgister the remote runtime", e);
00238             }
00239             logger.info("Node at " + nodeUrl + " succefuly added");
00240         }
00241         logger.info("Lookup MAX got " + this.acquiredNodes + " nodes");
00242     }
00243 
00252     public boolean isActive() {
00253         return ProActive.getBodyOnThis().isActive();
00254     }
00255 
00259     public Vector getAndRemoveNodes() {
00260         Vector<Node> v = new Vector<Node>();
00261         while (this.waitingNodesList.size() != 0) {
00262             Node elem = this.waitingNodesList.get(0);
00263             v.add(elem);
00264             this.waitingNodesList.remove(elem);
00265             this.nodesToKillList.add(elem.getNodeInformation().getURL());
00266         }
00267         return v;
00268     }
00269 
00274     public Vector getNodes() {
00275         Service service = new Service(ProActive.getBodyOnThis());
00276         while (!this.allArrived()) {
00277             this.localP2pService.askingNode(TTL, null, this.localP2pService,
00278                 this.numberOfAskedNodes - this.acquiredNodes, stub,
00279                 this.vnName, this.jobId, this.nodeFamilyRegexp);
00280 
00281             // Serving request
00282             service.blockingServeOldest(LOOKUP_FREQ);
00283             while (service.hasRequestToServe()) {
00284                 service.serveOldest();
00285             }
00286         }
00287         return ((P2PNodeLookup) ProActive.getStubOnThis()).getAndRemoveNodes();
00288     }
00289 
00295     public Vector getNodes(long timeout) {
00296         long endTime = System.currentTimeMillis() + timeout;
00297         Service service = new Service(ProActive.getBodyOnThis());
00298         while (!this.allArrived() && (System.currentTimeMillis() < endTime)) {
00299             this.localP2pService.askingNode(TTL, null, this.localP2pService,
00300                 this.numberOfAskedNodes - this.acquiredNodes, stub,
00301                 this.vnName, this.jobId, this.nodeFamilyRegexp);
00302             // Serving request
00303             if (timeout < LOOKUP_FREQ) {
00304                 service.blockingServeOldest(500);
00305             } else {
00306                 service.blockingServeOldest(LOOKUP_FREQ);
00307             }
00308             while (service.hasRequestToServe()) {
00309                 service.serveOldest();
00310             }
00311         }
00312         return ((P2PNodeLookup) ProActive.getStubOnThis()).getAndRemoveNodes();
00313     }
00314 
00320     public void moveTo(String nodeUrl) {
00321         try {
00322             ProActive.migrateTo(nodeUrl);
00323         } catch (Exception e) {
00324             logger.fatal("Couldn't migrate the node lookup to " + nodeUrl, e);
00325         }
00326     }
00327 
00328     // -------------------------------------------------------------------------
00329     // ProActive methods
00330     // -------------------------------------------------------------------------
00331 
00335     public void initActivity(Body body) {
00336         this.stub = (P2PNodeLookup) ProActive.getStubOnThis();
00337         try {
00338             this.paRuntime = RuntimeFactory.getDefaultRuntime();
00339             this.parUrl = this.paRuntime.getURL();
00340         } catch (ProActiveException e) {
00341             logger.fatal("Problem to get local runtime", e);
00342         }
00343     }
00344 
00348     public void runActivity(Body body) {
00349         logger.info("Looking for " +
00350             ((this.numberOfAskedNodes == MAX_NODE) ? "MAX"
00351                                                    : (this.numberOfAskedNodes +
00352             "")) + " nodes");
00353         Service service = new Service(body);
00354 
00355         String reason = null;
00356         while (true) {
00357             logger.debug("Aksing nodes");
00358 
00359             // Send a message to everybody
00360             this.localP2pService.askingNode(TTL, null, this.localP2pService,
00361                 (this.numberOfAskedNodes == P2PConstants.MAX_NODE)
00362                 ? P2PConstants.MAX_NODE
00363                 : (this.numberOfAskedNodes - this.acquiredNodes), stub,
00364                 this.vnName, this.jobId, this.nodeFamilyRegexp);
00365 
00366             // Serving request
00367             logger.debug("Waiting for requests");
00368             service.blockingServeOldest(LOOKUP_FREQ);
00369             while (service.hasRequestToServe()) {
00370                 if (logger.isDebugEnabled()) {
00371                     logger.info("Serving request: " +
00372                         service.getOldest().getMethodName());
00373                 }
00374                 service.serveOldest();
00375             }
00376 
00377             // Test conditions to go out of the loop
00378             if (this.killAllFlag) {
00379                 reason = "killing nodes request";
00380                 break;
00381             } else if (this.numberOfAskedNodes == MAX_NODE) {
00382                 reason = "Max node asked";
00383                 continue;
00384             } else if (this.allArrived()) {
00385                 reason = "all nodes are arrived";
00386                 break;
00387             } else if ((System.currentTimeMillis() > this.expirationTime)) {
00388                 reason = "timeout is expired";
00389                 break;
00390             } else {
00391                 reason = "Normal case continue";
00392                 continue;
00393             }
00394         }
00395 
00396         if (reason == null) {
00397             reason = "Houston. We have a problem...";
00398         }
00399         logger.info("Ending loop activity because: " + reason);
00400     }
00401 
00407     public void endActivity(Body body) {
00408         Service service = new Service(body);
00409         logger.info("Nodes (" + this.acquiredNodes +
00410             ") arrived ending activity");
00411         this.localP2pService.removeWaitingAccessor(this.stub);
00412         while ((this.waitingNodesList.size() > 0) ||
00413                 (this.nodesToKillList.size() > 0)) {
00414             service.blockingServeOldest(LOOKUP_FREQ);
00415             while (service.hasRequestToServe()) {
00416                 service.serveOldest();
00417             }
00418         }
00419         logger.info("This P2P nodes lookup is no more active, bye..");
00420     }
00421 }

Generated on Mon Jan 22 15:16:11 2007 for ProActive by  doxygen 1.5.1