00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.core.descriptor.data;
00032 
00033 import java.io.File;
00034 import java.io.IOException;
00035 import java.io.Serializable;
00036 import java.rmi.AlreadyBoundException;
00037 import java.util.ArrayList;
00038 import java.util.Collection;
00039 import java.util.HashMap;
00040 import java.util.Hashtable;
00041 import java.util.Iterator;
00042 import java.util.Vector;
00043 import java.util.concurrent.ExecutorService;
00044 import java.util.concurrent.Executors;
00045 
00046 import org.apache.log4j.Logger;
00047 import org.objectweb.proactive.ProActive;
00048 import org.objectweb.proactive.core.ProActiveException;
00049 import org.objectweb.proactive.core.descriptor.services.FaultToleranceService;
00050 import org.objectweb.proactive.core.descriptor.services.P2PDescriptorService;
00051 import org.objectweb.proactive.core.descriptor.services.SchedulerLookupService;
00052 import org.objectweb.proactive.core.descriptor.services.ServiceThread;
00053 import org.objectweb.proactive.core.descriptor.services.ServiceUser;
00054 import org.objectweb.proactive.core.descriptor.services.TechnicalService;
00055 import org.objectweb.proactive.core.descriptor.services.UniversalService;
00056 import org.objectweb.proactive.core.event.NodeCreationEvent;
00057 import org.objectweb.proactive.core.event.NodeCreationEventListener;
00058 import org.objectweb.proactive.core.event.NodeCreationEventProducerImpl;
00059 import org.objectweb.proactive.core.event.RuntimeRegistrationEvent;
00060 import org.objectweb.proactive.core.event.RuntimeRegistrationEventListener;
00061 import org.objectweb.proactive.core.node.Node;
00062 import org.objectweb.proactive.core.node.NodeException;
00063 import org.objectweb.proactive.core.node.NodeFactory;
00064 import org.objectweb.proactive.core.node.NodeImpl;
00065 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator;
00066 import org.objectweb.proactive.core.process.AbstractSequentialListProcessDecorator;
00067 import org.objectweb.proactive.core.process.DependentProcess;
00068 import org.objectweb.proactive.core.process.ExternalProcess;
00069 import org.objectweb.proactive.core.process.ExternalProcessDecorator;
00070 import org.objectweb.proactive.core.process.JVMProcess;
00071 import org.objectweb.proactive.core.process.UniversalProcess;
00072 import org.objectweb.proactive.core.process.filetransfer.FileTransferDefinition;
00073 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00074 import org.objectweb.proactive.core.process.filetransfer.FileTransferDefinition.FileDescription;
00075 import org.objectweb.proactive.core.process.mpi.MPIProcess;
00076 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00077 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00078 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00079 import org.objectweb.proactive.core.util.UrlBuilder;
00080 import org.objectweb.proactive.core.util.log.Loggers;
00081 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00082 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00083 import org.objectweb.proactive.filetransfer.FileTransfer;
00084 import org.objectweb.proactive.filetransfer.FileVector;
00085 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00086 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00087 import org.objectweb.proactive.scheduler.SchedulerConstants;
00088 
00089 
00100 public class VirtualNodeImpl extends NodeCreationEventProducerImpl
00101     implements VirtualNode, Serializable, RuntimeRegistrationEventListener,
00102         NodeCreationEventListener, ServiceUser {
00103 
00105     private final static Logger P2P_LOGGER = ProActiveLogger.getLogger(Loggers.P2P_VN);
00106     private final static Logger FILETRANSFER_LOGGER = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00107     private final static Logger DEPLOYMENT_FILETRANSFER_LOGGER = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_FILETRANSFER);
00108     public static int counter = 0;
00109 
00110     
00111     
00112     
00113 
00115     protected transient ProActiveRuntimeImpl proActiveRuntimeImpl;
00116 
00118     private String name;
00119 
00121     private String property;
00122 
00124     private java.util.ArrayList<VirtualMachine> virtualMachines;
00125 
00127     private java.util.ArrayList<String> localVirtualMachines;
00128 
00130     private int lastVirtualMachineIndex;
00131 
00133     private ArrayList<ProActiveRuntime> createdRuntimeF;
00134 
00136     private java.util.ArrayList<Node> createdNodes;
00137 
00139     private java.util.ArrayList<FileTransferDefinition> fileTransferDeploy;
00140 
00142     private java.util.ArrayList<FileTransferDefinition> fileTransferRetrieve;
00143 
00145     private HashMap<String, FileVector> fileTransferDeployedStatus;
00146     private int fileBlockSize;
00147     private int overlapping;
00148 
00150     private int lastNodeIndex;
00151 
00153     private int nbMappedNodes;
00154 
00158     private int minNumberOfNodes = 0;
00159 
00161     private int nbCreatedNodes;
00162 
00164     private boolean nodeCreated = false;
00165     private boolean isActivated = false;
00166 
00169     private Hashtable<String, VirtualMachine> awaitedVirtualNodes;
00170     private String registrationProtocol;
00171     private boolean registration = false;
00172     private boolean waitForTimeout = false;
00173 
00174     
00175     
00176     private boolean MAX_P2P = false;
00177 
00178     
00179 
00181     protected long timeout = 70000;
00182 
00184     protected long globalTimeOut;
00185     private Object uniqueActiveObject = null;
00186 
00187     
00188     private ProActiveSecurityManager proactiveSecurityManager;
00189     protected String jobID = ProActive.getJobId();
00190 
00191     
00192     private FaultToleranceService ftService;
00193     private Vector<Node> p2pNodes = new Vector<Node>();
00194 
00195     
00196     private boolean mainVirtualNode;
00197     private String padURL;
00198     private Vector<P2PNodeLookup> p2pNodeslookupList = new Vector<P2PNodeLookup>();
00199 
00200     
00201     private final int REGISTRATION_ATTEMPTS = 2;
00202 
00203     
00204     ExternalProcess mpiProcess = null;
00205     private TechnicalService technicalService;
00206     private String descriptorURL;
00207 
00208     
00209     
00210     
00211 
00215     VirtualNodeImpl() {
00216     }
00217 
00221     VirtualNodeImpl(String name,
00222         ProActiveSecurityManager proactiveSecurityManager, String padURL,
00223         boolean isMainVN, ProActiveDescriptor descriptor) {
00224         
00225         
00226         
00227         
00228         if (isMainVN) {
00229             this.name = name + "_" + (counter++);
00230         } else {
00231             this.name = name;
00232         }
00233 
00234         virtualMachines = new java.util.ArrayList<VirtualMachine>(5);
00235         localVirtualMachines = new java.util.ArrayList<String>();
00236         createdNodes = new java.util.ArrayList<Node>();
00237         createdRuntimeF = new ArrayList<ProActiveRuntime>();
00238         awaitedVirtualNodes = new Hashtable<String, VirtualMachine>();
00239         fileTransferDeploy = new ArrayList<FileTransferDefinition>();
00240         fileTransferDeployedStatus = new HashMap<String, FileVector>();
00241         fileTransferRetrieve = new ArrayList<FileTransferDefinition>();
00242         proActiveRuntimeImpl = (ProActiveRuntimeImpl) ProActiveRuntimeImpl.getProActiveRuntime();
00243         fileBlockSize = org.objectweb.proactive.core.filetransfer.FileBlock.DEFAULT_BLOCK_SIZE;
00244         overlapping = org.objectweb.proactive.core.filetransfer.FileTransferService.DEFAULT_MAX_SIMULTANEOUS_BLOCKS;
00245 
00246         if (logger.isDebugEnabled()) {
00247             logger.debug("vn " + this.name + " registered on " +
00248                 proActiveRuntimeImpl.getVMInformation().getVMID().toString());
00249         }
00250 
00251         
00252         this.proactiveSecurityManager = proactiveSecurityManager;
00253 
00254         
00255         this.mainVirtualNode = isMainVN;
00256         this.padURL = padURL;
00257 
00258         this.descriptorURL = descriptor.getProActiveDescriptorURL();
00259 
00260     }
00261 
00262     
00263     
00264     
00265 
00270     public void setProperty(String value) {
00271         this.property = value;
00272     }
00273 
00274     public String getProperty() {
00275         return property;
00276     }
00277 
00278     public long getTimeout() {
00279         return timeout;
00280     }
00281 
00291     public void setTimeout(long timeout, boolean waitForTimeout) {
00292         this.timeout = timeout;
00293         this.waitForTimeout = waitForTimeout;
00294     }
00295 
00300     public void setName(String s) {
00301         this.name = s;
00302     }
00303 
00304     public String getName() {
00305         return name;
00306     }
00307 
00308     public void addVirtualMachine(VirtualMachine virtualMachine) {
00309         if (isActivated) {
00310             
00311             
00312             logger.error(
00313                 "addVirtualMachine() cannot be called when the virtualNode has been activated");
00314 
00315             return;
00316         }
00317 
00318         virtualMachines.add(virtualMachine);
00319 
00320         if (logger.isDebugEnabled()) {
00321             logger.debug("mapped VirtualNode=" + name +
00322                 " with VirtualMachine=" + virtualMachine.getName());
00323         }
00324     }
00325 
00326     public void addFileTransferDeploy(FileTransferDefinition ft) {
00327         if (ft == null) {
00328             return;
00329         }
00330 
00331         fileTransferDeploy.add(ft);
00332 
00333         if (logger.isDebugEnabled()) {
00334             logger.debug("mapped VirtualNode=" + name +
00335                 " with FileTransferDeploy id=" + ft.getId());
00336         }
00337     }
00338 
00339     public void addFileTransferRetrieve(FileTransferDefinition ft) {
00340         if (ft == null) {
00341             return;
00342         }
00343 
00344         fileTransferRetrieve.add(ft);
00345 
00346         if (logger.isDebugEnabled()) {
00347             logger.debug("mapped VirtualNode=" + name +
00348                 " with FileTransferRetrieve id=" + ft.getId());
00349         }
00350     }
00351 
00352     public VirtualMachine getVirtualMachine() {
00353         if (virtualMachines.isEmpty()) {
00354             return null;
00355         }
00356 
00357         VirtualMachine vm = (VirtualMachine) virtualMachines.get(lastVirtualMachineIndex);
00358 
00359         return vm;
00360     }
00361 
00368     public VirtualMachine getVirtualMachine(String name) {
00369         Iterator it = virtualMachines.iterator();
00370 
00371         while (it.hasNext()) {
00372             VirtualMachine vm = (VirtualMachine) it.next();
00373 
00374             if (vm.getName().equals(name)) {
00375                 return vm;
00376             }
00377         }
00378 
00379         return null;
00380     }
00381 
00385     public void activate() {
00386         if (!isActivated) {
00387             for (Iterator<VirtualMachine> it = virtualMachines.iterator();
00388                     it.hasNext();) {
00389                 VirtualMachine vm = it.next();
00390 
00391                 if (vm.getCreatorId() == null) {
00392                     vm.setCreatorId(this.name);
00393                 } else {
00394                     awaitedVirtualNodes.put(vm.getCreatorId(), vm);
00395                 }
00396             }
00397 
00398             proActiveRuntimeImpl.addRuntimeRegistrationEventListener(this);
00399             proActiveRuntimeImpl.registerLocalVirtualNode(this, this.name);
00400 
00401             for (int i = 0; i < virtualMachines.size(); i++) {
00402                 VirtualMachine vm = getVirtualMachine();
00403 
00404                 
00405                 if (vm.hasProcess()) {
00406                     boolean vmAlreadyAssigned = !((vm.getCreatorId()).equals(this.name));
00407 
00408                     
00409                     ExternalProcess process = getProcess(vm, vmAlreadyAssigned);
00410 
00411                     
00412 
00413 
00414 
00415 
00416 
00417 
00418 
00419 
00420 
00421 
00422 
00423 
00424 
00425 
00426 
00427                     
00428                     int rankOfSequentialProcess = checkForSequentialProcess(process);
00429 
00430                     
00431                     if (rankOfSequentialProcess > -1) {
00432                         ExternalProcess deepCopy = (ExternalProcess) makeDeepCopy(process);
00433 
00434                         
00435                         if (rankOfSequentialProcess > 0) {
00436                             process = getSequentialProcessInHierarchie(process,
00437                                     rankOfSequentialProcess);
00438                         }
00439 
00440                         
00441                         if (((AbstractSequentialListProcessDecorator) process).isFirstElementIsService()) {
00442                             UniversalService firstService = (UniversalService) ((AbstractSequentialListProcessDecorator) process).getFirstService();
00443 
00444                             
00445                             startService(firstService, vm);
00446                             globalTimeOut = System.currentTimeMillis() +
00447                                 timeout;
00448 
00449                             try {
00450                                 waitForAllNodesCreation();
00451                             } catch (NodeException e) {
00452                                 
00453                                 e.printStackTrace();
00454                             }
00455                         } else {
00456                             ExternalProcess firstProcess = (ExternalProcess) ((AbstractSequentialListProcessDecorator) process).getFirstProcess();
00457 
00458                             
00459                             if (rankOfSequentialProcess > 0) {
00460                                 firstProcess = buildProcessWithHierarchie((ExternalProcess) makeDeepCopy(
00461                                             deepCopy), firstProcess,
00462                                         rankOfSequentialProcess);
00463                             }
00464 
00465                             setParameters(firstProcess, vm);
00466 
00467                             try {
00468                                 proActiveRuntimeImpl.createVM(firstProcess);
00469                                 globalTimeOut = System.currentTimeMillis() +
00470                                     timeout;
00471                                 waitForAllNodesCreation();
00472                             } catch (java.io.IOException e) {
00473                                 e.printStackTrace();
00474                             } catch (NodeException e1) {
00475                                 e1.printStackTrace();
00476                             }
00477                         }
00478 
00479                         try {
00480                             ExternalProcess nextProcess = null;
00481 
00482                             
00483                             while ((nextProcess = (ExternalProcess) ((AbstractSequentialListProcessDecorator) process).getNextProcess()) != null) {
00484                                 boolean launchProcessManually = false;
00485 
00486                                 
00487 
00488 
00489 
00490 
00491                                 if (process.isDependent()) {
00492                                     ((DependentProcess) nextProcess).setDependencyParameters(getNodes());
00493 
00494                                     if (nextProcess instanceof MPIProcess) {
00495                                         launchProcessManually = true;
00496                                     }
00497                                 }
00498 
00499                                 
00500                                 if (rankOfSequentialProcess > 0) {
00501                                     nextProcess = this.buildProcessWithHierarchie((ExternalProcess) makeDeepCopy(
00502                                                 deepCopy), nextProcess,
00503                                             rankOfSequentialProcess);
00504                                 }
00505 
00506                                 if (!launchProcessManually) {
00507                                     setParameters(nextProcess, vm);
00508                                     proActiveRuntimeImpl.createVM(nextProcess);
00509                                     
00510                                     globalTimeOut = System.currentTimeMillis() +
00511                                         timeout;
00512                                     waitForAllNodesCreation();
00513                                 } else {
00514                                     mpiProcess = nextProcess;
00515                                 }
00516                             }
00517                         } catch (java.io.IOException e) {
00518                             e.printStackTrace();
00519                         } catch (NodeException e1) {
00520                             e1.printStackTrace();
00521                         }
00522                     } else {
00523                         
00524                         
00525                         
00526                         
00527                         if (!vmAlreadyAssigned) {
00528                             setParameters(process, vm);
00529 
00530                             try {
00531                                 
00532                                 proActiveRuntimeImpl.createVM(process);
00533                             } catch (java.io.IOException e) {
00534                                 e.printStackTrace();
00535                                 logger.error("cannot activate virtualNode " +
00536                                     this.name + " with the process " +
00537                                     process.getCommand());
00538                             }
00539                         }
00540                     }
00541                 } else {
00542                     
00543                     startService(vm);
00544                 }
00545 
00546                 increaseIndex();
00547             }
00548 
00549             
00550             for (int i = 0; i < localVirtualMachines.size(); i++) {
00551                 String protocol = localVirtualMachines.get(i);
00552                 internalCreateNodeOnCurrentJvm(protocol);
00553             }
00554 
00555             
00556             globalTimeOut = System.currentTimeMillis() + timeout;
00557             isActivated = true;
00558 
00559             if (registration) {
00560                 register();
00561             }
00562 
00563             
00564             try {
00565                 if (this.ftService != null) {
00566                     
00567                     this.ftService.registerRessources(this.getNodes());
00568                 }
00569             } catch (NodeException e) {
00570                 logger.error(e.getMessage());
00571             }
00572         } else {
00573             logger.debug("VirtualNode " + this.name + " already activated");
00574         }
00575     }
00576 
00581     public ExternalProcess getMPIProcess() {
00582         return (ExternalProcess) makeDeepCopy(mpiProcess);
00583     }
00584 
00585     public boolean hasMPIProcess() {
00586         return !(mpiProcess == null);
00587     }
00588 
00589     
00590     private ExternalProcess getSequentialProcessInHierarchie(
00591         ExternalProcess process, int rank) {
00592         while (rank > 0) {
00593             process = ((AbstractExternalProcessDecorator) process).getTargetProcess();
00594             rank--;
00595         }
00596 
00597         return process;
00598     }
00599 
00600     
00601     private ExternalProcess buildProcessWithHierarchie(
00602         ExternalProcess process, ExternalProcess finalProcess, int rank) {
00603         if (rank == 0) {
00604             return finalProcess;
00605         } else {
00606             ((AbstractExternalProcessDecorator) process).setTargetProcess(buildProcessWithHierarchie(
00607                     ((AbstractExternalProcessDecorator) process).getTargetProcess(),
00608                     finalProcess, rank - 1));
00609 
00610             return process;
00611         }
00612     }
00613 
00614     
00615     private int checkForSequentialProcess(ExternalProcess process) {
00616         int res = 0;
00617 
00618         while (!(process instanceof JVMProcess)) {
00619             
00620             if (process.isSequential()) {
00621                 return res;
00622             } else {
00623                 res++;
00624                 process = ((ExternalProcess) ((ExternalProcessDecorator) process).getTargetProcess());
00625             }
00626         }
00627 
00628         return -1;
00629     }
00630 
00635     public boolean isMainVirtualNode() {
00636         return mainVirtualNode;
00637     }
00638 
00643     public String getPadURL() {
00644         return padURL;
00645     }
00646 
00651     public void setIsMainVirtualNode(boolean isMainVirtualNode) {
00652         mainVirtualNode = isMainVirtualNode;
00653     }
00654 
00659     public void setPadURL(String XMLpadURL) {
00660         this.padURL = XMLpadURL;
00661     }
00662 
00663     
00664 
00665 
00666 
00667     public int getNbMappedNodes() {
00668         if (isActivated) {
00669             return nbMappedNodes;
00670         } else {
00671             int nbMappedNodesTmp = 0;
00672 
00673             for (int i = 0; i < virtualMachines.size(); i++) {
00674                 VirtualMachine vm = getVirtualMachine();
00675 
00676                 
00677                 if (vm.hasProcess()) {
00678                     ExternalProcess process = vm.getProcess();
00679                     int nbNodesPerCreatedVM = new Integer(vm.getNbNodesOnCreatedVMs()).intValue();
00680 
00681                     if (process.getNodeNumber() == UniversalProcess.UNKNOWN_NODE_NUMBER) {
00682                         return UniversalProcess.UNKNOWN_NODE_NUMBER;
00683                     } else {
00684                         nbMappedNodesTmp = nbMappedNodesTmp +
00685                             (process.getNodeNumber() * nbNodesPerCreatedVM);
00686                     }
00687                 }
00688             }
00689 
00690             return nbMappedNodesTmp;
00691         }
00692     }
00693 
00697     public int createdNodeCount() {
00698         throw new RuntimeException(
00699             "This method is deprecated, use getNumberOfCurrentlyCreatedNodes() or getNumberOfCreatedNodesAfterDeployment()");
00700     }
00701 
00702     
00703 
00704 
00705 
00706     public int getNumberOfCurrentlyCreatedNodes() {
00707         return nbCreatedNodes;
00708     }
00709 
00710     
00711 
00712 
00713 
00714     public int getNumberOfCreatedNodesAfterDeployment() {
00715         try {
00716             waitForAllNodesCreation();
00717         } catch (NodeException e) {
00718             logger.error("Problem occured while waiting for nodes creation");
00719         }
00720 
00721         return nbCreatedNodes;
00722     }
00723 
00729     public Node getNode() throws NodeException {
00730         
00731         Node node;
00732         waitForNodeCreation();
00733 
00734         if (!createdNodes.isEmpty()) {
00735             node = createdNodes.get(lastNodeIndex);
00736             increaseNodeIndex();
00737 
00738             
00739             FileVector fw = fileTransferDeployedStatus.get(node.getNodeInformation()
00740                                                                .getName());
00741 
00742             if (fw != null) {
00743                 fw.waitForAll(); 
00744             }
00745 
00746             return node;
00747         } else {
00748             throw new NodeException("Cannot get a node from Virtual Node " + this.name+". Descriptor in use : \""+descriptorURL+"\"." );
00749         }
00750     }
00751 
00752     @Deprecated
00753     public Node getNode(int index) throws NodeException {
00754         Node node = createdNodes.get(index);
00755 
00756         if (node == null) {
00757             throw new NodeException(
00758                 "Cannot return the first node, no nodes hava been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00759         }
00760 
00761         FileVector fw = fileTransferDeployedStatus.get(node.getNodeInformation()
00762                                                            .getName());
00763 
00764         if (fw != null) {
00765             fw.waitForAll(); 
00766         }
00767 
00768         return node;
00769     }
00770 
00771     public String[] getNodesURL() throws NodeException {
00772         String[] nodeNames;
00773 
00774         try {
00775             waitForAllNodesCreation();
00776         } catch (NodeException e) {
00777             logger.error(e.getMessage());
00778         }
00779 
00780         if (!createdNodes.isEmpty()) {
00781             synchronized (createdNodes) {
00782                 nodeNames = new String[createdNodes.size()];
00783 
00784                 for (int i = 0; i < createdNodes.size(); i++) {
00785                     nodeNames[i] = createdNodes.get(i).getNodeInformation()
00786                                                .getURL();
00787                 }
00788             }
00789         } else {
00790             if (!MAX_P2P) {
00791                 throw new NodeException(
00792                     "Cannot return nodes, no nodes have been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00793             } else {
00794                 logger.warn("WARN: No nodes have yet been created.");
00795                 logger.warn(
00796                     "WARN: This behavior might be normal, since P2P service is used with MAX number of nodes requested");
00797                 logger.warn("WARN: Returning empty array");
00798 
00799                 return new String[0];
00800             }
00801         }
00802 
00803         return nodeNames;
00804     }
00805 
00806     public Node[] getNodes() throws NodeException {
00807         Node[] nodeTab;
00808 
00809         try {
00810             waitForAllNodesCreation();
00811         } catch (NodeException e) {
00812             logger.error(e.getMessage());
00813         }
00814 
00815         if (!createdNodes.isEmpty()) {
00816             synchronized (createdNodes) {
00817                 nodeTab = new Node[createdNodes.size()];
00818 
00819                 for (int i = 0; i < createdNodes.size(); i++) {
00820                     nodeTab[i] = createdNodes.get(i);
00821                 }
00822             }
00823         } else {
00824             if (!MAX_P2P) {
00825                 throw new NodeException(
00826                     "Cannot return nodes, no nodes have been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00827             } else {
00828                 logger.warn("WARN: No nodes have yet been created.");
00829                 logger.warn(
00830                     "WARN: This behavior might be normal, since P2P service is used with MAX number of nodes requested");
00831                 logger.warn("WARN: Returning empty array");
00832 
00833                 return new Node[0];
00834             }
00835         }
00836 
00837         return nodeTab;
00838     }
00839 
00840     public Node getNode(String url) throws NodeException {
00841         Node node = null;
00842 
00843         try {
00844             waitForAllNodesCreation();
00845         } catch (NodeException e) {
00846             logger.error(e.getMessage());
00847         }
00848 
00849         if (!createdNodes.isEmpty()) {
00850             synchronized (createdNodes) {
00851                 for (int i = 0; i < createdNodes.size(); i++) {
00852                     if (createdNodes.get(i).getNodeInformation().getURL()
00853                                         .equals(url)) {
00854                         node = createdNodes.get(i);
00855 
00856                         break;
00857                     }
00858                 }
00859 
00860                 return node;
00861             }
00862         } else {
00863             throw new NodeException(
00864                 "Cannot return nodes, no nodes hava been created. Descriptor in use : \""+descriptorURL+"\".");
00865         }
00866     }
00867 
00868     public void killAll(boolean softly) {
00869         Node node;
00870         ProActiveRuntime part = null;
00871 
00872         if (isActivated) {
00873             
00874             if (this.p2pNodeslookupList.size() > 0) {
00875                 for (int index = 0; index < this.p2pNodeslookupList.size();
00876                         index++) {
00877                     P2PNodeLookup currentNodesLookup = this.p2pNodeslookupList.get(index);
00878                     currentNodesLookup.killAllNodes();
00879                 }
00880             }
00881 
00882             
00883             for (int i = 0; i < createdNodes.size(); i++) {
00884                 node = createdNodes.get(i);
00885                 part = node.getProActiveRuntime();
00886 
00887                 if (this.p2pNodes.contains(node)) {
00888                     continue;
00889                 }
00890 
00891                 
00892                 
00893                 
00894                 if (!NodeFactory.isNodeLocal(node)) {
00895                     try {
00896                         part.killRT(softly);
00897                     } catch (ProActiveException e1) {
00898                         e1.printStackTrace();
00899                     } catch (Exception e) {
00900                         logger.info(" Virtual Machine " +
00901                             part.getVMInformation().getVMID() + " on host " +
00902                             UrlBuilder.getHostNameorIP(
00903                                 part.getVMInformation().getInetAddress()) +
00904                             " terminated!!!");
00905                     }
00906                 } else {
00907                     try {
00908                         
00909                         part.killNode(node.getNodeInformation().getURL());
00910                     } catch (ProActiveException e) {
00911                         e.printStackTrace();
00912                     }
00913                 }
00914             }
00915 
00916             isActivated = false;
00917 
00918             try {
00919                 
00920                 if (registration) {
00921                     ProActive.unregisterVirtualNode(this);
00922                 }
00923                 
00924                 else {
00925                     proActiveRuntimeImpl.unregisterVirtualNode(this.name);
00926                 }
00927             } catch (ProActiveException e) {
00928                 e.printStackTrace();
00929             }
00930 
00931             
00932         } else {
00933             proActiveRuntimeImpl.unregisterVirtualNode(this.name);
00934         }
00935 
00936         for (int i = 0; i < createdRuntimeF.size(); i++) {
00937             part = createdRuntimeF.get(i);
00938 
00939             try {
00940                 part.killRT(true);
00941             } catch (Exception e) {
00942                 logger.info(" Forwarder " + part.getVMInformation().getVMID() +
00943                     " on host " +
00944                     UrlBuilder.getHostNameorIP(part.getVMInformation()
00945                                                    .getInetAddress()) +
00946                     " terminated!!!");
00947             }
00948         }
00949     }
00950 
00951     public void createNodeOnCurrentJvm(String protocol) {
00952         if (protocol == null) {
00953             protocol = System.getProperty("proactive.communication.protocol");
00954         }
00955 
00956         localVirtualMachines.add(protocol);
00957     }
00958 
00959     public Object getUniqueAO() throws ProActiveException {
00960         if (!property.equals("unique_singleAO")) {
00961             logger.warn(
00962                 "!!!!!!!!!!WARNING. This VirtualNode is not defined with unique_single_AO property in the XML descriptor. Calling getUniqueAO() on this VirtualNode can lead to unexpected behaviour");
00963         }
00964 
00965         if (uniqueActiveObject == null) {
00966             try {
00967                 Node node = getNode();
00968 
00969                 if (node.getActiveObjects().length > 1) {
00970                     logger.warn(
00971                         "!!!!!!!!!!WARNING. More than one active object is created on this VirtualNode.");
00972                 }
00973 
00974                 uniqueActiveObject = node.getActiveObjects()[0];
00975             } catch (Exception e) {
00976                 e.printStackTrace();
00977             }
00978         }
00979 
00980         if (uniqueActiveObject == null) {
00981             throw new ProActiveException(
00982                 "No active object are registered on this VirtualNode");
00983         }
00984 
00985         return uniqueActiveObject;
00986     }
00987 
00988     public boolean isActivated() {
00989         return isActivated;
00990     }
00991 
00995     public boolean isLookup() {
00996         return false;
00997     }
00998 
00999     
01000     
01001     
01002 
01006     public String getJobID() {
01007         return this.jobID;
01008     }
01009     
01010     
01011     
01012     
01013     private transient ExecutorService rrThreadpool = Executors.newCachedThreadPool();
01014 
01015 
01016     
01017     
01018     
01019     public void runtimeRegistered(RuntimeRegistrationEvent event) {
01020         if (event.getType() == RuntimeRegistrationEvent.FORWARDER_RUNTIME_REGISTERED) {
01021             forwarderRuntimeRegisteredPerform(event);
01022         } else {
01023             
01024             
01025             rrThreadpool.execute(new RuntimeRegisteredPerform(event));
01026         }
01027     }
01028 
01029     private class RuntimeRegisteredPerform implements Runnable {
01030         private RuntimeRegistrationEvent event;
01031 
01032         public RuntimeRegisteredPerform(RuntimeRegistrationEvent _event) {
01033             this.event = _event;
01034         }
01035 
01036         public void run() {
01037             runtimeRegisteredPerform(event);
01038         }
01039     }
01040     
01041     private synchronized void forwarderRuntimeRegisteredPerform(
01042         RuntimeRegistrationEvent event) {
01043         VirtualMachine virtualMachine = null;
01044 
01045         for (int i = 0; i < virtualMachines.size(); i++) {
01046             if (((VirtualMachine) virtualMachines.get(i)).getName()
01047                      .equals(event.getVmName())) {
01048                 virtualMachine = (VirtualMachine) virtualMachines.get(i);
01049             }
01050         }
01051 
01052         
01053         if ((event.getCreatorID().equals(this.name)) &&
01054                 (virtualMachine != null)) {
01055             if (logger.isDebugEnabled()) {
01056                 logger.debug("forwarder " + event.getCreatorID() +
01057                     " registered on virtualnode " + this.name);
01058             }
01059         }
01060 
01061         createdRuntimeF.add(event.getRegisteredRuntime());
01062     }
01063 
01064     private void runtimeRegisteredPerform(RuntimeRegistrationEvent event) {
01065         VirtualMachine virtualMachine = null;
01066 
01067         
01068         for (int i = 0; i < virtualMachines.size(); i++) {
01069             if (((VirtualMachine) virtualMachines.get(i)).getName()
01070                      .equals(event.getVmName())) {
01071                 virtualMachine = (VirtualMachine) virtualMachines.get(i);
01072             }
01073         }
01074 
01075         
01076         if ((event.getCreatorID().equals(this.name)) &&
01077                 (virtualMachine != null)) {
01078             if (logger.isDebugEnabled()) {
01079                 logger.debug("runtime " + event.getCreatorID() +
01080                     " registered on virtualnode " + this.name);
01081             }
01082 
01083             createNodes(event, virtualMachine);
01084         }
01085 
01086         
01087         
01088         if (awaitedVirtualNodes.containsKey(event.getCreatorID())) {
01089             createNodes(event, awaitedVirtualNodes.get(event.getCreatorID()));
01090         }
01091     }
01092 
01093     private void createNodes(RuntimeRegistrationEvent event,
01094         VirtualMachine _virtualMachine) {
01095         String protocol;
01096         ProActiveRuntime proActiveRuntimeRegistered;
01097         String nodeHost;
01098         int port;
01099         String url;
01100 
01101         protocol = event.getProtocol();
01102         proActiveRuntimeRegistered = event.getRegisteredRuntime();
01103         nodeHost = proActiveRuntimeRegistered.getVMInformation().getHostName();
01104         port = UrlBuilder.getPortFromUrl(proActiveRuntimeRegistered.getURL());
01105         url = null;
01106 
01107         try {
01108             int nodeNumber = (new Integer((String) _virtualMachine.getNbNodesOnCreatedVMs())).intValue();
01109 
01110             for (int i = 1; i <= nodeNumber; i++) {
01111                 ProActiveSecurityManager siblingPSM = null;
01112 
01113                 if (proactiveSecurityManager != null) {
01114                     siblingPSM = proactiveSecurityManager.generateSiblingCertificate(this.name);
01115                 }
01116 
01117                 int registerAttempts = REGISTRATION_ATTEMPTS;
01118 
01119                 while (registerAttempts > 0) { 
01120                                                
01121                                                
01122                                                
01123                                                
01124                     String nodeName = this.name +
01125                         Integer.toString(ProActiveRuntimeImpl.getNextInt());
01126                     url = buildURL(nodeHost, nodeName, protocol, port);
01127 
01128                     
01129                     
01130                     
01131                     
01132                     
01133                     
01134                     try {
01135                         proActiveRuntimeRegistered.createLocalNode(url, false,
01136                             siblingPSM, this.getName(), this.jobID);
01137 
01138                         
01139                         performOperations(proActiveRuntimeRegistered, url,
01140                             protocol, event.getVmName());
01141 
01142                         break;
01143                     } catch (AlreadyBoundException e) {
01144                         registerAttempts--;
01145                     }
01146                 }
01147 
01148                 if (registerAttempts == 0) {
01149                     logger.warn("createLocalNode failed " +
01150                         REGISTRATION_ATTEMPTS + " times for runtime " +
01151                         nodeHost);
01152                 }
01153             }
01154         } catch (ProActiveException e) {
01155             e.printStackTrace();
01156         }
01157     }
01158 
01164     public void setRuntimeInformations(String information, String value)
01165         throws ProActiveException {
01166         
01167         
01168         
01169         
01170         
01171         
01172         
01173         
01174         throw new ProActiveException(
01175             "No property can be set at runtime on this VirtualNode");
01176     }
01177 
01181     public void setService(UniversalService service) throws ProActiveException {
01182         if (FaultToleranceService.FT_SERVICE_NAME.equals(
01183                     service.getServiceName())) {
01184             this.ftService = (FaultToleranceService) service;
01185         } else {
01186             throw new ProActiveException(
01187                 " Unable to bind the given service to a virtual node");
01188         }
01189     }
01190 
01194     public String getUserClass() {
01195         return this.getClass().getName();
01196     }
01197 
01198     public void setRegistrationProtocol(String protocol) {
01199         setRegistrationValue(true);
01200         this.registrationProtocol = protocol;
01201     }
01202 
01203     public String getRegistrationProtocol() {
01204         return this.registrationProtocol;
01205     }
01206 
01214     public void setMinNumberOfNodes(int min) {
01215         this.minNumberOfNodes = min;
01216     }
01217 
01221     public int getMinNumberOfNodes() {
01222         return minNumberOfNodes;
01223     }
01224 
01228     public boolean isMultiple() {
01229         return ((virtualMachines.size() + localVirtualMachines.size()) > 1);
01230     }
01231 
01232     
01233     
01234     
01235     private void internalCreateNodeOnCurrentJvm(String protocol) {
01236         try {
01237             
01238             
01239             
01240             String url = null;
01241             increaseNumberOfNodes(1);
01242 
01243             
01244             ProActiveRuntime defaultRuntime = RuntimeFactory.getProtocolSpecificRuntime(checkProtocol(
01245                         protocol));
01246 
01247             
01248             ProActiveSecurityManager siblingPSM = null;
01249 
01250             if (proactiveSecurityManager != null) {
01251                 siblingPSM = proactiveSecurityManager.generateSiblingCertificate(this.name);
01252             }
01253 
01254             int registrationAttempts = REGISTRATION_ATTEMPTS;
01255 
01256             while (registrationAttempts > 0) { 
01257                 String nodeName = this.name +
01258                     Integer.toString(ProActiveRuntimeImpl.getNextInt());
01259 
01260                 try {
01261                     url = defaultRuntime.createLocalNode(nodeName, false,
01262                             siblingPSM, this.getName(), ProActive.getJobId());
01263                     registrationAttempts = 0;
01264                 } catch (AlreadyBoundException e) {
01265                     registrationAttempts--;
01266                 }
01267             }
01268 
01269             
01270             performOperations(defaultRuntime, url, protocol,
01271                 defaultRuntime.getVMInformation().getDescriptorVMName());
01272         } catch (Exception e) {
01273             e.printStackTrace();
01274         }
01275     }
01276 
01280     private synchronized void waitForNodeCreation() throws NodeException {
01281         while (!nodeCreated) {
01282             if (!timeoutExpired()) {
01283                 try {
01284                     wait(getTimeToSleep());
01285                 } catch (InterruptedException e2) {
01286                     e2.printStackTrace();
01287                 } catch (IllegalStateException e) {
01288                     
01289                     
01290                     
01291                     throw new NodeException(
01292                         "After many retries, not even one node can be found for Virtual Node \"" +
01293                         this.name + "\" in descriptor \"" + this.descriptorURL +
01294                         "\".");
01295                 }
01296             } else {
01297                 throw new NodeException(
01298                     "After many retries, not even one node can be found for Virtual Node \"" +
01299                     this.name + "\" in descriptor \"" + this.descriptorURL + "\".");
01300             }
01301         }
01302 
01303         return;
01304     }
01305 
01309     public void waitForAllNodesCreation() throws NodeException {
01310         int tempNodeCount = nbMappedNodes;
01311 
01312         if (tempNodeCount != P2PConstants.MAX_NODE) {
01313             
01314             
01315             MAX_P2P = false;
01316         }
01317 
01318         if (waitForTimeout) {
01319             try {
01320                 Thread.sleep(timeout);
01321             } catch (InterruptedException e2) {
01322                 e2.printStackTrace();
01323             }
01324         } else {
01325             
01326             
01327             internalWait(tempNodeCount);
01328         }
01329 
01330         
01331         Collection<FileVector> c = fileTransferDeployedStatus.values();
01332         Iterator<FileVector> it = c.iterator();
01333 
01334         while (it.hasNext()) {
01335             FileVector fw = it.next();
01336             fw.waitForAll(); 
01337         }
01338 
01339 
01340 
01341 
01342 
01343 
01344 
01345 
01346 
01347         return;
01348     }
01349 
01350     private synchronized void internalWait(int tempNodeCount)
01351         throws NodeException {
01352         
01353         
01354         if (minNumberOfNodes != 0) {
01355             tempNodeCount = minNumberOfNodes;
01356         }
01357 
01358         while (nbCreatedNodes < tempNodeCount) {
01359             if (!timeoutExpired()) {
01360                 try {
01361                     wait(getTimeToSleep());
01362                 } catch (InterruptedException e2) {
01363                     e2.printStackTrace();
01364                 } catch (IllegalStateException e) {
01365                     
01366                     
01367                     
01368                     throw new NodeException("After many retries, only " +
01369                         nbCreatedNodes + " nodes are created on " +
01370                         tempNodeCount + " expected for Virtual Node \"" +
01371                         this.name + "\" in descriptor \"" + descriptorURL + "\".");
01372                 }
01373             } else {
01374                 throw new NodeException("After many retries, only " +
01375                     nbCreatedNodes + " nodes are created on " + tempNodeCount +
01376                     " expected for Virtual Node \"" + this.name +
01377                     "\" in descriptor \"" + descriptorURL + "\".");
01378             }
01379         }
01380     }
01381 
01382     private boolean timeoutExpired() {
01383         long currentTime = System.currentTimeMillis();
01384 
01385         return (globalTimeOut < currentTime);
01386     }
01387 
01388     private long getTimeToSleep() {
01389         
01390         long timeToSleep = globalTimeOut - System.currentTimeMillis();
01391 
01392         if (timeToSleep > 0) {
01393             return timeToSleep;
01394         } else {
01395             throw new IllegalStateException("Timeout expired");
01396         }
01397     }
01398 
01404     private ExternalProcess getProcess(VirtualMachine vm,
01405         boolean vmAlreadyAssigned) {
01406         ExternalProcess copyProcess;
01407 
01408         
01409         ExternalProcess process = vm.getProcess();
01410 
01411         
01412         
01413         
01414         if (!vmAlreadyAssigned) {
01415             copyProcess = (ExternalProcess) makeDeepCopy(process);
01416             vm.setProcess(copyProcess);
01417 
01418             return copyProcess;
01419         } else {
01420             
01421             increaseNumberOfNodes(process.getNodeNumber() * new Integer(
01422                     vm.getNbNodesOnCreatedVMs()).intValue());
01423 
01424             return process;
01425         }
01426     }
01427 
01432     private void setParameters(ExternalProcess process, VirtualMachine vm) {
01433         JVMProcess jvmProcess;
01434 
01435         
01436         String protocolId = "";
01437         int nodeNumber = new Integer(vm.getNbNodesOnCreatedVMs()).intValue();
01438 
01439         if (logger.isDebugEnabled()) {
01440             logger.debug("asked for " + nodeNumber + " nodes");
01441         }
01442 
01443         protocolId = process.getProcessId();
01444 
01445         int cnt = process.getNodeNumber();
01446 
01447         if (cnt == UniversalProcess.UNKNOWN_NODE_NUMBER) {
01448             waitForTimeout = true;
01449         } else {
01450             increaseNumberOfNodes(cnt * nodeNumber);
01451         }
01452 
01453         
01454         
01455         jvmProcess = (JVMProcess) process.getFinalProcess();
01456         jvmProcess.setJvmOptions("-Dproactive.groupInformation=" +
01457             ProActiveRuntimeImpl.getProActiveRuntime().getVMInformation()
01458                                 .getVMID().toString() + "~" +
01459             jvmProcess.getNewGroupId());
01460 
01461         
01462         if (jvmProcess.getClassname()
01463                           .equals("org.objectweb.proactive.core.runtime.StartRuntime")) {
01464             String vnName = this.name;
01465 
01466             String localruntimeURL = null;
01467 
01468             try {
01469                 localruntimeURL = RuntimeFactory.getDefaultRuntime().getURL();
01470 
01471                 if (process.getUsername() != null) {
01472                     localruntimeURL = System.getProperty("user.name") + "@" +
01473                         localruntimeURL;
01474                 }
01475             } catch (ProActiveException e) {
01476                 e.printStackTrace();
01477             }
01478 
01479             if (logger.isDebugEnabled()) {
01480                 logger.debug(localruntimeURL);
01481             }
01482 
01483             
01484             if (mainVirtualNode || process.isHierarchical()) {
01485                 jvmProcess.setJvmOptions(" -Dproactive.pad=" + padURL);
01486             }
01487 
01488             jvmProcess.setJvmOptions("-Dproactive.jobid=" + jobID);
01489             jvmProcess.setParameters(vnName + " " + localruntimeURL + " " +
01490                 nodeNumber + " " + protocolId + " " + vm.getName());
01491 
01492             
01493             if (this.ftService != null) {
01494                 jvmProcess.setJvmOptions(this.ftService.buildParamsLine());
01495             }
01496         }
01497 
01498         
01499 
01500 
01501         FileTransferWorkShop ftsDeploy = process.getFileTransferWorkShopDeploy();
01502         FileTransferWorkShop ftsRetrieve = process.getFileTransferWorkShopRetrieve();
01503 
01504         if ((ftsDeploy != null) && ftsDeploy.isImplicit()) {
01505             for (int i = 0; i < fileTransferDeploy.size(); i++)
01506                 ftsDeploy.addFileTransfer(fileTransferDeploy.get(i));
01507         }
01508 
01509         if ((ftsRetrieve != null) && ftsRetrieve.isImplicit()) {
01510             for (int i = 0; i < fileTransferRetrieve.size(); i++)
01511                 ftsRetrieve.addFileTransfer(fileTransferRetrieve.get(i));
01512         }
01513     }
01514 
01520     private Object makeDeepCopy(Object process) {
01521         
01522         Object result = null;
01523 
01524         try {
01525             java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
01526             java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(baos);
01527             oos.writeObject(process);
01528             oos.flush();
01529             oos.close();
01530 
01531             java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(baos.toByteArray());
01532             java.io.ObjectInputStream ois = new java.io.ObjectInputStream(bais);
01533             result = ois.readObject();
01534             ois.close();
01535         } catch (Exception e) {
01536             e.printStackTrace();
01537         }
01538 
01539         
01540         return result;
01541     }
01542 
01543     private String buildURL(String host, String name, String protocol, int port) {
01544         if (port != 0) {
01545             return UrlBuilder.buildUrl(host, name, protocol, port);
01546         } else {
01547             return UrlBuilder.buildUrl(host, name, protocol);
01548         }
01549     }
01550 
01551     private void increaseIndex() {
01552         if (virtualMachines.size() > 1) {
01553             lastVirtualMachineIndex = (lastVirtualMachineIndex + 1) % virtualMachines.size();
01554         }
01555     }
01556 
01557     private void increaseNumberOfNodes(int n) {
01558         nbMappedNodes = nbMappedNodes + n;
01559 
01560         if (logger.isDebugEnabled()) {
01561             logger.debug("Number of nodes = " + nbMappedNodes);
01562         }
01563     }
01564 
01565     private void increaseNodeIndex() {
01566         if (createdNodes.size() > 1) {
01567             lastNodeIndex = (lastNodeIndex + 1) % createdNodes.size();
01568         }
01569     }
01570 
01571     private String checkProtocol(String protocol) {
01572         if (protocol.indexOf(":") == -1) {
01573             return protocol.concat(":");
01574         }
01575 
01576         return protocol;
01577     }
01578 
01579     private synchronized void performOperations(ProActiveRuntime part,
01580         String url, String protocol, String vmName) {
01581         Node node = new NodeImpl(part, url, checkProtocol(protocol),
01582                 this.jobID, vmName);
01583        synchronized (createdNodes) {
01584            createdNodes.add(node);      
01585         }
01586        
01587         logger.info("**** Mapping VirtualNode " + this.name + " with Node: " +
01588             url + " done");
01589         nodeCreated = true;
01590         nbCreatedNodes++;
01591 
01592         
01593         FileVector fw = fileTransferDeploy(node);
01594         this.fileTransferDeployedStatus.put(node.getNodeInformation().getName(),
01595             fw);
01596 
01597         if (this.technicalService != null) {
01598             this.technicalService.apply(node);
01599         }
01600 
01601         
01602         notifyAll();
01603 
01604         
01605         notifyListeners(this, NodeCreationEvent.NODE_CREATED, node,
01606             nbCreatedNodes);
01607     }
01608 
01609     private void register() {
01610         try {
01611             waitForAllNodesCreation();
01612 
01613             
01614             
01615             ProActive.registerVirtualNode(this, registrationProtocol, false);
01616         } catch (NodeException e) {
01617             logger.error(e.getMessage());
01618         } catch (ProActiveException e) {
01619             e.printStackTrace();
01620         } catch (AlreadyBoundException e) {
01621             logger.warn("The Virtual Node name " + this.getName() +
01622                 " is already bound in the registry", e);
01623         }
01624     }
01625 
01626     private void setRegistrationValue(boolean value) {
01627         this.registration = value;
01628     }
01629 
01630     private void startService(VirtualMachine vm) {
01631         UniversalService service = vm.getService();
01632 
01633         
01634         
01635         UniversalService copyService = (UniversalService) makeDeepCopy(service);
01636         vm.setService(copyService);
01637 
01638         if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
01639             int nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
01640 
01641             
01642             
01643             if (nodeRequested != ((P2PDescriptorService) service).getMAX()) {
01644                 increaseNumberOfNodes(nodeRequested);
01645 
01646                 
01647                 
01648             } else {
01649                 MAX_P2P = true;
01650             }
01651         } else if (service.getServiceName()
01652                               .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
01653             int nodeRequested = ((SchedulerLookupService) service).getNodeNumber();
01654             increaseNumberOfNodes(nodeRequested);
01655         } else {
01656             
01657             increaseNumberOfNodes(1);
01658         }
01659 
01660         new ServiceThread(this, vm).start();
01661     }
01662 
01663     private void startService(UniversalService service, VirtualMachine vm) {
01664         
01665         
01666         UniversalService copyService = (UniversalService) makeDeepCopy(service);
01667         vm.setService(copyService);
01668 
01669         if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
01670             int nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
01671 
01672             
01673             
01674             if (nodeRequested != ((P2PDescriptorService) service).getMAX()) {
01675                 increaseNumberOfNodes(nodeRequested);
01676 
01677                 
01678                 
01679             } else {
01680                 MAX_P2P = true;
01681             }
01682         } else if (service.getServiceName()
01683                               .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
01684             int nodeRequested = ((SchedulerLookupService) service).getNodeNumber();
01685             increaseNumberOfNodes(nodeRequested);
01686         } else {
01687             
01688             increaseNumberOfNodes(1);
01689         }
01690 
01691         new ServiceThread(this, vm).start();
01692     }
01693 
01694     private void writeObject(java.io.ObjectOutputStream out)
01695         throws java.io.IOException {
01696         if (isActivated) {
01697             try {
01698                 waitForAllNodesCreation();
01699             } catch (NodeException e) {
01700                 out.defaultWriteObject();
01701 
01702                 return;
01703             }
01704         }
01705         
01706         out.defaultWriteObject();
01707     }
01708 
01709     private void readObject(java.io.ObjectInputStream in)
01710         throws java.io.IOException, ClassNotFoundException {
01711         in.defaultReadObject();
01712         this.proActiveRuntimeImpl = (ProActiveRuntimeImpl) ProActiveRuntimeImpl.getProActiveRuntime();
01713         rrThreadpool = Executors.newCachedThreadPool();
01714     }
01715 
01716     
01717     
01718     
01719 
01724     public synchronized void nodeCreated(NodeCreationEvent event) {
01725         Node newNode = event.getNode();
01726         this.createdNodes.add(newNode);
01727         this.p2pNodes.add(newNode);
01728         nbCreatedNodes++;
01729         nodeCreated = true;
01730 
01731         if (this.technicalService != null) {
01732             this.technicalService.apply(newNode);
01733         }
01734 
01735         
01736         notifyAll();
01737         notifyListeners(this, NodeCreationEvent.NODE_CREATED, newNode,
01738             nbCreatedNodes);
01739     }
01740 
01744     public void addP2PNodesLookup(P2PNodeLookup nodesLookup) {
01745         this.p2pNodeslookupList.add(nodesLookup);
01746         P2P_LOGGER.debug("A P2P nodes lookup added to the vn: " + this.name);
01747     }
01748 
01752     public FileVector fileTransferRetrieve()
01753         throws IOException, ProActiveException {
01754         Node[] nodes;
01755         FileVector fileVector = new FileVector();
01756 
01757         try {
01758             nodes = getNodes();
01759         } catch (NodeException e) {
01760             throw new ProActiveException(
01761                 "Can not Retrieve Files, since no nodes where created for Virtual Node" +
01762                 this.getName());
01763         }
01764 
01765         if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01766             FILETRANSFER_LOGGER.debug("Retrieving files for " + nodes.length +
01767                 " node(s).");
01768         }
01769 
01770         
01771 
01772 
01773 
01774 
01775         for (int i = 0; i < nodes.length; i++) {
01776             String vmName = nodes[i].getNodeInformation().getDescriptorVMName();
01777             VirtualMachine vm = getVirtualMachine(vmName);
01778 
01779             if (vm == null) {
01780                 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01781                     FILETRANSFER_LOGGER.info("No VM found with name: " +
01782                         vmName + " for node: " +
01783                         nodes[i].getNodeInformation().getName());
01784                 }
01785 
01786                 continue;
01787             }
01788 
01789             
01790             ExternalProcess eProcess = vm.getProcess();
01791 
01792             if (eProcess == null) {
01793                 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01794                     FILETRANSFER_LOGGER.debug("No Process linked with VM: " +
01795                         vmName + " for node: " +
01796                         nodes[i].getNodeInformation().getName());
01797                 }
01798 
01799                 continue;
01800             }
01801 
01802             FileTransferWorkShop ftwRetrieve = eProcess.getFileTransferWorkShopRetrieve();
01803             FileDescription[] fd = ftwRetrieve.getAllFileDescriptions();
01804 
01805             File[] srcFile = new File[fd.length];
01806             File[] dstFile = new File[fd.length];
01807 
01808             for (int j = 0; j < fd.length; j++) {
01809                 srcFile[j] = new File(ftwRetrieve.getAbsoluteSrcPath(fd[j]));
01810                 dstFile[j] = new File(ftwRetrieve.getAbsoluteDstPath(fd[j]) +
01811                         "-" + nodes[i].getNodeInformation().getName());
01812             }
01813 
01814             long init = System.currentTimeMillis();
01815             fileVector.add(FileTransfer.pullFiles(nodes[i], srcFile, dstFile,
01816                     fileBlockSize, overlapping));
01817 
01818             if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01819                 FILETRANSFER_LOGGER.debug("Returned pullFiles in:" +
01820                     (System.currentTimeMillis() - init));
01821             }
01822         }
01823 
01824         return fileVector;
01825     }
01826 
01835     private FileVector fileTransferDeploy(Node node) {
01836         FileVector fileWrapper = new FileVector();
01837 
01838         if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01839             DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01840                 "File Transfer Deploy files for node" +
01841                 node.getNodeInformation().getName());
01842         }
01843 
01844         String vmName = node.getNodeInformation().getDescriptorVMName();
01845         VirtualMachine vm = getVirtualMachine(vmName);
01846 
01847         if (vm == null) {
01848             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01849                 DEPLOYMENT_FILETRANSFER_LOGGER.debug("No VM found with name: " +
01850                     vmName + " for node: " +
01851                     node.getNodeInformation().getName());
01852             }
01853 
01854             return fileWrapper;
01855         }
01856 
01857         
01858         ExternalProcess eProcess = vm.getProcess();
01859 
01860         if (eProcess == null) {
01861             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01862                 DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01863                     "No Process linked with VM: " + vmName + " for node: " +
01864                     node.getNodeInformation().getName());
01865             }
01866 
01867             return fileWrapper;
01868         }
01869 
01870         
01871         if (!eProcess.isRequiredFileTransferDeployOnNodeCreation()) {
01872             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01873                 DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01874                     "No ProActive FileTransfer API is required for this node.");
01875             }
01876 
01877             return fileWrapper;
01878         }
01879 
01880         FileTransferWorkShop ftwDeploy = eProcess.getFileTransferWorkShopDeploy();
01881         FileDescription[] fd = ftwDeploy.getAllFileDescriptions();
01882 
01883         if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01884             DEPLOYMENT_FILETRANSFER_LOGGER.debug("Transfering " + fd.length +
01885                 " file(s)");
01886         }
01887 
01888         File[] filesSrc = new File[fd.length];
01889         File[] filesDst = new File[fd.length];
01890 
01891         for (int j = 0; j < fd.length; j++) {
01892             File srcFile = new File(ftwDeploy.getAbsoluteSrcPath(fd[j]));
01893             File dstFile = new File(ftwDeploy.getAbsoluteDstPath(fd[j]));
01894             filesSrc[j] = srcFile;
01895             filesDst[j] = dstFile;
01896         }
01897 
01898         try {
01899             fileWrapper.add(FileTransfer.pushFiles(node, filesSrc, filesDst,
01900                     fileBlockSize, overlapping));
01901         } catch (Exception e) {
01902             logger.error("Unable to pushFile files to node " +
01903                 node.getNodeInformation().getName());
01904 
01905             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01906                 DEPLOYMENT_FILETRANSFER_LOGGER.debug(e.getStackTrace());
01907             }
01908         }
01909 
01910         return fileWrapper;
01911     }
01912 
01913     public void setFileTransferParams(int fileBlockSize, int overlapping) {
01914         this.fileBlockSize = fileBlockSize;
01915         this.overlapping = overlapping;
01916     }
01917 
01918     public void addTechnicalService(TechnicalService technicalWrapper) {
01919         this.technicalService = technicalWrapper;
01920     }
01921 
01922     public ArrayList getVirtualMachines() {
01923         return this.virtualMachines;
01924     }
01925 
01926 }