00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.descriptor.services;
00032
00033 import java.util.Vector;
00034
00035 import org.apache.log4j.Logger;
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.ProActiveException;
00038 import org.objectweb.proactive.core.descriptor.data.VirtualMachine;
00039 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00040 import org.objectweb.proactive.core.descriptor.data.VirtualNodeImpl;
00041 import org.objectweb.proactive.core.event.NodeCreationEvent;
00042 import org.objectweb.proactive.core.event.RuntimeRegistrationEvent;
00043 import org.objectweb.proactive.core.event.RuntimeRegistrationEventListener;
00044 import org.objectweb.proactive.core.node.Node;
00045 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00046 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00047 import org.objectweb.proactive.core.util.UrlBuilder;
00048 import org.objectweb.proactive.core.util.log.Loggers;
00049 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00050 import org.objectweb.proactive.p2p.service.P2PService;
00051 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00052 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00053 import org.objectweb.proactive.scheduler.SchedulerConstants;
00054
00055
00061 public class ServiceThread extends Thread {
00062 private static final long LOOK_UP_FREQ = new Long(System.getProperty(
00063 P2PConstants.PROPERTY_LOOKUP_FREQ)).longValue();
00064 private static final int MAX_NODE = P2PConstants.MAX_NODE;
00065 private VirtualNode vn;
00066 private UniversalService service;
00067 private VirtualMachine vm;
00068 private ProActiveRuntime localRuntime;
00069 int nodeCount = 0;
00070 long timeout = 0;
00071 int nodeRequested;
00072 public static Logger loggerDeployment = ProActiveLogger.getLogger(Loggers.DEPLOYMENT);
00073 private static final long TIMEOUT = Long.parseLong(System.getProperty(
00074 P2PConstants.PROPERTY_NODES_ACQUISITION_T0));
00075 private long expirationTime;
00076
00077 public ServiceThread(VirtualNode vn, VirtualMachine vm) {
00078 this.vn = vn;
00079 this.service = vm.getService();
00080 this.vm = vm;
00081 this.localRuntime = ProActiveRuntimeImpl.getProActiveRuntime();
00082 }
00083
00084 public void run() {
00085 ProActiveRuntime[] part = null;
00086
00087 try {
00088 part = service.startService();
00089 nodeCount = nodeCount + ((part != null) ? part.length : 0);
00090 if (part != null) {
00091 notifyVirtualNode(part);
00092 }
00093
00094 if (service.getServiceName()
00095 .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
00096 SchedulerLookupService schedulerLookupService = ((SchedulerLookupService) service);
00097 Node[] reservedNodes = schedulerLookupService.getNodes();
00098
00099 for (int i = 0; i < reservedNodes.length; i++) {
00100 Node node = reservedNodes[i];
00101 nodeCount++;
00102 ((VirtualNodeImpl) vn).nodeCreated(new NodeCreationEvent(
00103 vn, NodeCreationEvent.NODE_CREATED, node, nodeCount));
00104
00105 if (loggerDeployment.isInfoEnabled()) {
00106 loggerDeployment.info(
00107 "Service thread just created event for node: " +
00108 node.getNodeInformation().getURL());
00109 }
00110 }
00111 }
00112
00113 if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
00114
00115 P2PService p2pService = ((P2PDescriptorService) service).getP2PService();
00116 String nodeFamilyRegexp = ((P2PDescriptorService) service).getNodeFamilyRegexp();
00117 nodeFamilyRegexp = (nodeFamilyRegexp != null)
00118 ? nodeFamilyRegexp : "";
00119 P2PNodeLookup p2pNodesLookup = p2pService.getNodes(((P2PDescriptorService) service).getNodeNumber(),
00120 nodeFamilyRegexp, this.vn.getName(), this.vn.getJobID());
00121 ((VirtualNodeImpl) vn).addP2PNodesLookup(p2pNodesLookup);
00122 this.nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
00123
00124 long vnTimeout = vn.getTimeout();
00125 this.expirationTime = System.currentTimeMillis() + TIMEOUT;
00126 if (this.nodeRequested == MAX_NODE) {
00127 this.expirationTime = Long.MAX_VALUE;
00128 ((VirtualNodeImpl) vn).setTimeout(this.expirationTime, false);
00129 } else if (vnTimeout < TIMEOUT) {
00130 ((VirtualNodeImpl) vn).setTimeout(TIMEOUT, false);
00131 }
00132
00133 long step = 100;
00134 while (askForNodes() &&
00135 ((nodeRequested == MAX_NODE) ? true
00136 : (System.currentTimeMillis() < this.expirationTime))) {
00137 if (step > LOOK_UP_FREQ) {
00138 step = LOOK_UP_FREQ;
00139 }
00140
00141 Vector nodes;
00142 try {
00143 Vector future = p2pNodesLookup.getAndRemoveNodes();
00144 nodes = (Vector) ProActive.getFutureValue(future);
00145 } catch (Exception e) {
00146 loggerDeployment.debug("Couldn't contact the lookup", e);
00147 continue;
00148 }
00149 for (int i = 0; i < nodes.size(); i++) {
00150 Node node = (Node) nodes.get(i);
00151 nodeCount++;
00152 ((VirtualNodeImpl) vn).nodeCreated(new NodeCreationEvent(
00153 vn, NodeCreationEvent.NODE_CREATED, node,
00154 nodeCount));
00155 if (loggerDeployment.isInfoEnabled()) {
00156 loggerDeployment.info(
00157 "Service thread just created event for node: " +
00158 node.getNodeInformation().getURL());
00159 }
00160 }
00161
00162
00163 if ((this.nodeRequested == MAX_NODE) &&
00164 (nodes.size() == 0) && (this.nodeCount != 0)) {
00165 Thread.sleep(LOOK_UP_FREQ);
00166 } else if (askForNodes() && (this.nodeCount == 0)) {
00167
00168 Thread.sleep(step);
00169 step += 100;
00170 } else {
00171
00172 if (step > LOOK_UP_FREQ) {
00173 step = LOOK_UP_FREQ;
00174 Thread.sleep(LOOK_UP_FREQ);
00175 } else {
00176 step += 100;
00177 Thread.sleep(step);
00178 }
00179 }
00180 }
00181 }
00182 } catch (ProActiveException e) {
00183 loggerDeployment.error(
00184 "An exception occured while starting the service " +
00185 service.getServiceName() + " for the VirtualNode " +
00186 vn.getName() + " \n" + e.getMessage());
00187 } catch (InterruptedException e) {
00188 e.printStackTrace();
00189 }
00190 }
00191
00192 public void notifyVirtualNode(ProActiveRuntime[] part) {
00193 for (int i = 0; i < part.length; i++) {
00194 String url = part[i].getURL();
00195 String protocol = UrlBuilder.getProtocol(url);
00196 RuntimeRegistrationEvent event = new RuntimeRegistrationEvent(localRuntime,
00197 RuntimeRegistrationEvent.RUNTIME_ACQUIRED, part[i],
00198 vn.getName(), protocol, vm.getName());
00199 ((RuntimeRegistrationEventListener) vn).runtimeRegistered(event);
00200 }
00201 }
00202
00207 private boolean askForNodes() {
00208 if (nodeRequested == MAX_NODE) {
00209
00210 return true;
00211 }
00212 return nodeCount < nodeRequested;
00213 }
00214 }