org/objectweb/proactive/core/descriptor/services/ServiceThread.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.descriptor.services;
00032 
00033 import java.util.Vector;
00034 
00035 import org.apache.log4j.Logger;
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.ProActiveException;
00038 import org.objectweb.proactive.core.descriptor.data.VirtualMachine;
00039 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00040 import org.objectweb.proactive.core.descriptor.data.VirtualNodeImpl;
00041 import org.objectweb.proactive.core.event.NodeCreationEvent;
00042 import org.objectweb.proactive.core.event.RuntimeRegistrationEvent;
00043 import org.objectweb.proactive.core.event.RuntimeRegistrationEventListener;
00044 import org.objectweb.proactive.core.node.Node;
00045 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00046 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00047 import org.objectweb.proactive.core.util.UrlBuilder;
00048 import org.objectweb.proactive.core.util.log.Loggers;
00049 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00050 import org.objectweb.proactive.p2p.service.P2PService;
00051 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00052 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00053 import org.objectweb.proactive.scheduler.SchedulerConstants;
00054 
00055 
00061 public class ServiceThread extends Thread {
00062     private static final long LOOK_UP_FREQ = new Long(System.getProperty(
00063                 P2PConstants.PROPERTY_LOOKUP_FREQ)).longValue();
00064     private static final int MAX_NODE = P2PConstants.MAX_NODE;
00065     private VirtualNode vn;
00066     private UniversalService service;
00067     private VirtualMachine vm;
00068     private ProActiveRuntime localRuntime;
00069     int nodeCount = 0;
00070     long timeout = 0;
00071     int nodeRequested;
00072     public static Logger loggerDeployment = ProActiveLogger.getLogger(Loggers.DEPLOYMENT);
00073     private static final long TIMEOUT = Long.parseLong(System.getProperty(
00074                 P2PConstants.PROPERTY_NODES_ACQUISITION_T0));
00075     private long expirationTime;
00076 
00077     public ServiceThread(VirtualNode vn, VirtualMachine vm) {
00078         this.vn = vn;
00079         this.service = vm.getService();
00080         this.vm = vm;
00081         this.localRuntime = ProActiveRuntimeImpl.getProActiveRuntime();
00082     }
00083 
00084     public void run() {
00085         ProActiveRuntime[] part = null;
00086 
00087         try {
00088             part = service.startService();
00089             nodeCount = nodeCount + ((part != null) ? part.length : 0);
00090             if (part != null) {
00091                 notifyVirtualNode(part);
00092             }
00093 
00094             if (service.getServiceName()
00095                            .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
00096                 SchedulerLookupService schedulerLookupService = ((SchedulerLookupService) service);
00097                 Node[] reservedNodes = schedulerLookupService.getNodes();
00098 
00099                 for (int i = 0; i < reservedNodes.length; i++) {
00100                     Node node = reservedNodes[i];
00101                     nodeCount++;
00102                     ((VirtualNodeImpl) vn).nodeCreated(new NodeCreationEvent(
00103                             vn, NodeCreationEvent.NODE_CREATED, node, nodeCount));
00104                     //                    System.out.println("--------------------" + node.getNodeInformation().getURL());
00105                     if (loggerDeployment.isInfoEnabled()) {
00106                         loggerDeployment.info(
00107                             "Service thread just created event for node: " +
00108                             node.getNodeInformation().getURL());
00109                     }
00110                 }
00111             }
00112 
00113             if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
00114                 // Start asking nodes
00115                 P2PService p2pService = ((P2PDescriptorService) service).getP2PService();
00116                 String nodeFamilyRegexp = ((P2PDescriptorService) service).getNodeFamilyRegexp();
00117                 nodeFamilyRegexp = (nodeFamilyRegexp != null)
00118                     ? nodeFamilyRegexp : "";
00119                 P2PNodeLookup p2pNodesLookup = p2pService.getNodes(((P2PDescriptorService) service).getNodeNumber(),
00120                         nodeFamilyRegexp, this.vn.getName(), this.vn.getJobID());
00121                 ((VirtualNodeImpl) vn).addP2PNodesLookup(p2pNodesLookup);
00122                 this.nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
00123                 // Timeout
00124                 long vnTimeout = vn.getTimeout();
00125                 this.expirationTime = System.currentTimeMillis() + TIMEOUT;
00126                 if (this.nodeRequested == MAX_NODE) {
00127                     this.expirationTime = Long.MAX_VALUE;
00128                     ((VirtualNodeImpl) vn).setTimeout(this.expirationTime, false);
00129                 } else if (vnTimeout < TIMEOUT) {
00130                     ((VirtualNodeImpl) vn).setTimeout(TIMEOUT, false);
00131                 }
00132 
00133                 long step = 100;
00134                 while (askForNodes() &&
00135                         ((nodeRequested == MAX_NODE) ? true
00136                                                          : (System.currentTimeMillis() < this.expirationTime))) {
00137                     if (step > LOOK_UP_FREQ) {
00138                         step = LOOK_UP_FREQ;
00139                     }
00140 
00141                     Vector nodes;
00142                     try {
00143                         Vector future = p2pNodesLookup.getAndRemoveNodes();
00144                         nodes = (Vector) ProActive.getFutureValue(future);
00145                     } catch (Exception e) {
00146                         loggerDeployment.debug("Couldn't contact the lookup", e);
00147                         continue;
00148                     }
00149                     for (int i = 0; i < nodes.size(); i++) {
00150                         Node node = (Node) nodes.get(i);
00151                         nodeCount++;
00152                         ((VirtualNodeImpl) vn).nodeCreated(new NodeCreationEvent(
00153                                 vn, NodeCreationEvent.NODE_CREATED, node,
00154                                 nodeCount));
00155                         if (loggerDeployment.isInfoEnabled()) {
00156                             loggerDeployment.info(
00157                                 "Service thread just created event for node: " +
00158                                 node.getNodeInformation().getURL());
00159                         }
00160                     }
00161 
00162                     // Sleeping with FastStart algo
00163                     if ((this.nodeRequested == MAX_NODE) &&
00164                             (nodes.size() == 0) && (this.nodeCount != 0)) {
00165                         Thread.sleep(LOOK_UP_FREQ);
00166                     } else if (askForNodes() && (this.nodeCount == 0)) {
00167                         // still no node
00168                         Thread.sleep(step);
00169                         step += 100;
00170                     } else {
00171                         // normal waiting
00172                         if (step > LOOK_UP_FREQ) {
00173                             step = LOOK_UP_FREQ;
00174                             Thread.sleep(LOOK_UP_FREQ);
00175                         } else {
00176                             step += 100;
00177                             Thread.sleep(step);
00178                         }
00179                     }
00180                 }
00181             }
00182         } catch (ProActiveException e) {
00183             loggerDeployment.error(
00184                 "An exception occured while starting the service " +
00185                 service.getServiceName() + " for the VirtualNode " +
00186                 vn.getName() + " \n" + e.getMessage());
00187         } catch (InterruptedException e) {
00188             e.printStackTrace();
00189         }
00190     }
00191 
00192     public void notifyVirtualNode(ProActiveRuntime[] part) {
00193         for (int i = 0; i < part.length; i++) {
00194             String url = part[i].getURL();
00195             String protocol = UrlBuilder.getProtocol(url);
00196             RuntimeRegistrationEvent event = new RuntimeRegistrationEvent(localRuntime,
00197                     RuntimeRegistrationEvent.RUNTIME_ACQUIRED, part[i],
00198                     vn.getName(), protocol, vm.getName());
00199             ((RuntimeRegistrationEventListener) vn).runtimeRegistered(event);
00200         }
00201     }
00202 
00207     private boolean askForNodes() {
00208         if (nodeRequested == MAX_NODE) {
00209             // nodeRequested = -1 means try to get the max number of nodes
00210             return true;
00211         }
00212         return nodeCount < nodeRequested;
00213     }
00214 }

Generated on Mon Jan 22 15:16:07 2007 for ProActive by  doxygen 1.5.1