00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.core.descriptor.services;
00032 
00033 import java.util.Vector;
00034 
00035 import org.apache.log4j.Logger;
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.ProActiveException;
00038 import org.objectweb.proactive.core.descriptor.data.VirtualMachine;
00039 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00040 import org.objectweb.proactive.core.descriptor.data.VirtualNodeImpl;
00041 import org.objectweb.proactive.core.event.NodeCreationEvent;
00042 import org.objectweb.proactive.core.event.RuntimeRegistrationEvent;
00043 import org.objectweb.proactive.core.event.RuntimeRegistrationEventListener;
00044 import org.objectweb.proactive.core.node.Node;
00045 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00046 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00047 import org.objectweb.proactive.core.util.UrlBuilder;
00048 import org.objectweb.proactive.core.util.log.Loggers;
00049 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00050 import org.objectweb.proactive.p2p.service.P2PService;
00051 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00052 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00053 import org.objectweb.proactive.scheduler.SchedulerConstants;
00054 
00055 
00061 public class ServiceThread extends Thread {
00062     private static final long LOOK_UP_FREQ = new Long(System.getProperty(
00063                 P2PConstants.PROPERTY_LOOKUP_FREQ)).longValue();
00064     private static final int MAX_NODE = P2PConstants.MAX_NODE;
00065     private VirtualNode vn;
00066     private UniversalService service;
00067     private VirtualMachine vm;
00068     private ProActiveRuntime localRuntime;
00069     int nodeCount = 0;
00070     long timeout = 0;
00071     int nodeRequested;
00072     public static Logger loggerDeployment = ProActiveLogger.getLogger(Loggers.DEPLOYMENT);
00073     private static final long TIMEOUT = Long.parseLong(System.getProperty(
00074                 P2PConstants.PROPERTY_NODES_ACQUISITION_T0));
00075     private long expirationTime;
00076 
00077     public ServiceThread(VirtualNode vn, VirtualMachine vm) {
00078         this.vn = vn;
00079         this.service = vm.getService();
00080         this.vm = vm;
00081         this.localRuntime = ProActiveRuntimeImpl.getProActiveRuntime();
00082     }
00083 
00084     public void run() {
00085         ProActiveRuntime[] part = null;
00086 
00087         try {
00088             part = service.startService();
00089             nodeCount = nodeCount + ((part != null) ? part.length : 0);
00090             if (part != null) {
00091                 notifyVirtualNode(part);
00092             }
00093 
00094             if (service.getServiceName()
00095                            .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
00096                 SchedulerLookupService schedulerLookupService = ((SchedulerLookupService) service);
00097                 Node[] reservedNodes = schedulerLookupService.getNodes();
00098 
00099                 for (int i = 0; i < reservedNodes.length; i++) {
00100                     Node node = reservedNodes[i];
00101                     nodeCount++;
00102                     ((VirtualNodeImpl) vn).nodeCreated(new NodeCreationEvent(
00103                             vn, NodeCreationEvent.NODE_CREATED, node, nodeCount));
00104                     
00105                     if (loggerDeployment.isInfoEnabled()) {
00106                         loggerDeployment.info(
00107                             "Service thread just created event for node: " +
00108                             node.getNodeInformation().getURL());
00109                     }
00110                 }
00111             }
00112 
00113             if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
00114                 
00115                 P2PService p2pService = ((P2PDescriptorService) service).getP2PService();
00116                 String nodeFamilyRegexp = ((P2PDescriptorService) service).getNodeFamilyRegexp();
00117                 nodeFamilyRegexp = (nodeFamilyRegexp != null)
00118                     ? nodeFamilyRegexp : "";
00119                 P2PNodeLookup p2pNodesLookup = p2pService.getNodes(((P2PDescriptorService) service).getNodeNumber(),
00120                         nodeFamilyRegexp, this.vn.getName(), this.vn.getJobID());
00121                 ((VirtualNodeImpl) vn).addP2PNodesLookup(p2pNodesLookup);
00122                 this.nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
00123                 
00124                 long vnTimeout = vn.getTimeout();
00125                 this.expirationTime = System.currentTimeMillis() + TIMEOUT;
00126                 if (this.nodeRequested == MAX_NODE) {
00127                     this.expirationTime = Long.MAX_VALUE;
00128                     ((VirtualNodeImpl) vn).setTimeout(this.expirationTime, false);
00129                 } else if (vnTimeout < TIMEOUT) {
00130                     ((VirtualNodeImpl) vn).setTimeout(TIMEOUT, false);
00131                 }
00132 
00133                 long step = 100;
00134                 while (askForNodes() &&
00135                         ((nodeRequested == MAX_NODE) ? true
00136                                                          : (System.currentTimeMillis() < this.expirationTime))) {
00137                     if (step > LOOK_UP_FREQ) {
00138                         step = LOOK_UP_FREQ;
00139                     }
00140 
00141                     Vector nodes;
00142                     try {
00143                         Vector future = p2pNodesLookup.getAndRemoveNodes();
00144                         nodes = (Vector) ProActive.getFutureValue(future);
00145                     } catch (Exception e) {
00146                         loggerDeployment.debug("Couldn't contact the lookup", e);
00147                         continue;
00148                     }
00149                     for (int i = 0; i < nodes.size(); i++) {
00150                         Node node = (Node) nodes.get(i);
00151                         nodeCount++;
00152                         ((VirtualNodeImpl) vn).nodeCreated(new NodeCreationEvent(
00153                                 vn, NodeCreationEvent.NODE_CREATED, node,
00154                                 nodeCount));
00155                         if (loggerDeployment.isInfoEnabled()) {
00156                             loggerDeployment.info(
00157                                 "Service thread just created event for node: " +
00158                                 node.getNodeInformation().getURL());
00159                         }
00160                     }
00161 
00162                     
00163                     if ((this.nodeRequested == MAX_NODE) &&
00164                             (nodes.size() == 0) && (this.nodeCount != 0)) {
00165                         Thread.sleep(LOOK_UP_FREQ);
00166                     } else if (askForNodes() && (this.nodeCount == 0)) {
00167                         
00168                         Thread.sleep(step);
00169                         step += 100;
00170                     } else {
00171                         
00172                         if (step > LOOK_UP_FREQ) {
00173                             step = LOOK_UP_FREQ;
00174                             Thread.sleep(LOOK_UP_FREQ);
00175                         } else {
00176                             step += 100;
00177                             Thread.sleep(step);
00178                         }
00179                     }
00180                 }
00181             }
00182         } catch (ProActiveException e) {
00183             loggerDeployment.error(
00184                 "An exception occured while starting the service " +
00185                 service.getServiceName() + " for the VirtualNode " +
00186                 vn.getName() + " \n" + e.getMessage());
00187         } catch (InterruptedException e) {
00188             e.printStackTrace();
00189         }
00190     }
00191 
00192     public void notifyVirtualNode(ProActiveRuntime[] part) {
00193         for (int i = 0; i < part.length; i++) {
00194             String url = part[i].getURL();
00195             String protocol = UrlBuilder.getProtocol(url);
00196             RuntimeRegistrationEvent event = new RuntimeRegistrationEvent(localRuntime,
00197                     RuntimeRegistrationEvent.RUNTIME_ACQUIRED, part[i],
00198                     vn.getName(), protocol, vm.getName());
00199             ((RuntimeRegistrationEventListener) vn).runtimeRegistered(event);
00200         }
00201     }
00202 
00207     private boolean askForNodes() {
00208         if (nodeRequested == MAX_NODE) {
00209             
00210             return true;
00211         }
00212         return nodeCount < nodeRequested;
00213     }
00214 }