org/objectweb/proactive/core/descriptor/data/VirtualNodeImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.descriptor.data;
00032 
00033 import java.io.File;
00034 import java.io.IOException;
00035 import java.io.Serializable;
00036 import java.rmi.AlreadyBoundException;
00037 import java.util.ArrayList;
00038 import java.util.Collection;
00039 import java.util.HashMap;
00040 import java.util.Hashtable;
00041 import java.util.Iterator;
00042 import java.util.Vector;
00043 import java.util.concurrent.ExecutorService;
00044 import java.util.concurrent.Executors;
00045 
00046 import org.apache.log4j.Logger;
00047 import org.objectweb.proactive.ProActive;
00048 import org.objectweb.proactive.core.ProActiveException;
00049 import org.objectweb.proactive.core.descriptor.services.FaultToleranceService;
00050 import org.objectweb.proactive.core.descriptor.services.P2PDescriptorService;
00051 import org.objectweb.proactive.core.descriptor.services.SchedulerLookupService;
00052 import org.objectweb.proactive.core.descriptor.services.ServiceThread;
00053 import org.objectweb.proactive.core.descriptor.services.ServiceUser;
00054 import org.objectweb.proactive.core.descriptor.services.TechnicalService;
00055 import org.objectweb.proactive.core.descriptor.services.UniversalService;
00056 import org.objectweb.proactive.core.event.NodeCreationEvent;
00057 import org.objectweb.proactive.core.event.NodeCreationEventListener;
00058 import org.objectweb.proactive.core.event.NodeCreationEventProducerImpl;
00059 import org.objectweb.proactive.core.event.RuntimeRegistrationEvent;
00060 import org.objectweb.proactive.core.event.RuntimeRegistrationEventListener;
00061 import org.objectweb.proactive.core.node.Node;
00062 import org.objectweb.proactive.core.node.NodeException;
00063 import org.objectweb.proactive.core.node.NodeFactory;
00064 import org.objectweb.proactive.core.node.NodeImpl;
00065 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator;
00066 import org.objectweb.proactive.core.process.AbstractSequentialListProcessDecorator;
00067 import org.objectweb.proactive.core.process.DependentProcess;
00068 import org.objectweb.proactive.core.process.ExternalProcess;
00069 import org.objectweb.proactive.core.process.ExternalProcessDecorator;
00070 import org.objectweb.proactive.core.process.JVMProcess;
00071 import org.objectweb.proactive.core.process.UniversalProcess;
00072 import org.objectweb.proactive.core.process.filetransfer.FileTransferDefinition;
00073 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00074 import org.objectweb.proactive.core.process.filetransfer.FileTransferDefinition.FileDescription;
00075 import org.objectweb.proactive.core.process.mpi.MPIProcess;
00076 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00077 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00078 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00079 import org.objectweb.proactive.core.util.UrlBuilder;
00080 import org.objectweb.proactive.core.util.log.Loggers;
00081 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00082 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00083 import org.objectweb.proactive.filetransfer.FileTransfer;
00084 import org.objectweb.proactive.filetransfer.FileVector;
00085 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00086 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00087 import org.objectweb.proactive.scheduler.SchedulerConstants;
00088 
00089 
00100 public class VirtualNodeImpl extends NodeCreationEventProducerImpl
00101     implements VirtualNode, Serializable, RuntimeRegistrationEventListener,
00102         NodeCreationEventListener, ServiceUser {
00103 
00105     private final static Logger P2P_LOGGER = ProActiveLogger.getLogger(Loggers.P2P_VN);
00106     private final static Logger FILETRANSFER_LOGGER = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00107     private final static Logger DEPLOYMENT_FILETRANSFER_LOGGER = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_FILETRANSFER);
00108     public static int counter = 0;
00109 
00110     //
00111     //  ----- PRIVATE MEMBERS -----------------------------------------------------------------------------------
00112     //
00113 
00115     protected transient ProActiveRuntimeImpl proActiveRuntimeImpl;
00116 
00118     private String name;
00119 
00121     private String property;
00122 
00124     private java.util.ArrayList<VirtualMachine> virtualMachines;
00125 
00127     private java.util.ArrayList<String> localVirtualMachines;
00128 
00130     private int lastVirtualMachineIndex;
00131 
00133     private ArrayList<ProActiveRuntime> createdRuntimeF;
00134 
00136     private java.util.ArrayList<Node> createdNodes;
00137 
00139     private java.util.ArrayList<FileTransferDefinition> fileTransferDeploy;
00140 
00142     private java.util.ArrayList<FileTransferDefinition> fileTransferRetrieve;
00143 
00145     private HashMap<String, FileVector> fileTransferDeployedStatus;
00146     private int fileBlockSize;
00147     private int overlapping;
00148 
00150     private int lastNodeIndex;
00151 
00153     private int nbMappedNodes;
00154 
00158     private int minNumberOfNodes = 0;
00159 
00161     private int nbCreatedNodes;
00162 
00164     private boolean nodeCreated = false;
00165     private boolean isActivated = false;
00166 
00169     private Hashtable<String, VirtualMachine> awaitedVirtualNodes;
00170     private String registrationProtocol;
00171     private boolean registration = false;
00172     private boolean waitForTimeout = false;
00173 
00174     //boolean used to know if the vn is mapped only with a P2P service that request MAX nodes
00175     // indeed the behavior is different when returning nodes
00176     private boolean MAX_P2P = false;
00177 
00178     //protected int MAX_RETRY = 70;
00179 
00181     protected long timeout = 70000;
00182 
00184     protected long globalTimeOut;
00185     private Object uniqueActiveObject = null;
00186 
00187     // Security 
00188     private ProActiveSecurityManager proactiveSecurityManager;
00189     protected String jobID = ProActive.getJobId();
00190 
00191     // FAULT TOLERANCE
00192     private FaultToleranceService ftService;
00193     private Vector<Node> p2pNodes = new Vector<Node>();
00194 
00195     // PAD infos
00196     private boolean mainVirtualNode;
00197     private String padURL;
00198     private Vector<P2PNodeLookup> p2pNodeslookupList = new Vector<P2PNodeLookup>();
00199 
00200     //REGISTRATION ATTEMPTS
00201     private final int REGISTRATION_ATTEMPTS = 2;
00202 
00203     // MPI Process
00204     ExternalProcess mpiProcess = null;
00205     private TechnicalService technicalService;
00206     private String descriptorURL;
00207 
00208     //
00209     //  ----- CONSTRUCTORS -----------------------------------------------------------------------------------
00210     //
00211 
00215     VirtualNodeImpl() {
00216     }
00217 
00221     VirtualNodeImpl(String name,
00222         ProActiveSecurityManager proactiveSecurityManager, String padURL,
00223         boolean isMainVN, ProActiveDescriptor descriptor) {
00224         // if we launch several times the same application 
00225         // we have to change the name of the main VNs because of
00226         // the register, otherwise we will monitor each time all the last
00227         // main VNs with the same name.
00228         if (isMainVN) {
00229             this.name = name + "_" + (counter++);
00230         } else {
00231             this.name = name;
00232         }
00233 
00234         virtualMachines = new java.util.ArrayList<VirtualMachine>(5);
00235         localVirtualMachines = new java.util.ArrayList<String>();
00236         createdNodes = new java.util.ArrayList<Node>();
00237         createdRuntimeF = new ArrayList<ProActiveRuntime>();
00238         awaitedVirtualNodes = new Hashtable<String, VirtualMachine>();
00239         fileTransferDeploy = new ArrayList<FileTransferDefinition>();
00240         fileTransferDeployedStatus = new HashMap<String, FileVector>();
00241         fileTransferRetrieve = new ArrayList<FileTransferDefinition>();
00242         proActiveRuntimeImpl = (ProActiveRuntimeImpl) ProActiveRuntimeImpl.getProActiveRuntime();
00243         fileBlockSize = org.objectweb.proactive.core.filetransfer.FileBlock.DEFAULT_BLOCK_SIZE;
00244         overlapping = org.objectweb.proactive.core.filetransfer.FileTransferService.DEFAULT_MAX_SIMULTANEOUS_BLOCKS;
00245 
00246         if (logger.isDebugEnabled()) {
00247             logger.debug("vn " + this.name + " registered on " +
00248                 proActiveRuntimeImpl.getVMInformation().getVMID().toString());
00249         }
00250 
00251         // SECURITY
00252         this.proactiveSecurityManager = proactiveSecurityManager;
00253 
00254         // added for main infos
00255         this.mainVirtualNode = isMainVN;
00256         this.padURL = padURL;
00257 
00258         this.descriptorURL = descriptor.getProActiveDescriptorURL();
00259 
00260     }
00261 
00262     //
00263     //  ----- PUBLIC METHODS -----------------------------------------------------------------------------------
00264     //
00265 
00270     public void setProperty(String value) {
00271         this.property = value;
00272     }
00273 
00274     public String getProperty() {
00275         return property;
00276     }
00277 
00278     public long getTimeout() {
00279         return timeout;
00280     }
00281 
00291     public void setTimeout(long timeout, boolean waitForTimeout) {
00292         this.timeout = timeout;
00293         this.waitForTimeout = waitForTimeout;
00294     }
00295 
00300     public void setName(String s) {
00301         this.name = s;
00302     }
00303 
00304     public String getName() {
00305         return name;
00306     }
00307 
00308     public void addVirtualMachine(VirtualMachine virtualMachine) {
00309         if (isActivated) {
00310             // Do not call this method when the VN has been activated or it will open the door
00311             // to various race conditions.
00312             logger.error(
00313                 "addVirtualMachine() cannot be called when the virtualNode has been activated");
00314 
00315             return;
00316         }
00317 
00318         virtualMachines.add(virtualMachine);
00319 
00320         if (logger.isDebugEnabled()) {
00321             logger.debug("mapped VirtualNode=" + name +
00322                 " with VirtualMachine=" + virtualMachine.getName());
00323         }
00324     }
00325 
00326     public void addFileTransferDeploy(FileTransferDefinition ft) {
00327         if (ft == null) {
00328             return;
00329         }
00330 
00331         fileTransferDeploy.add(ft);
00332 
00333         if (logger.isDebugEnabled()) {
00334             logger.debug("mapped VirtualNode=" + name +
00335                 " with FileTransferDeploy id=" + ft.getId());
00336         }
00337     }
00338 
00339     public void addFileTransferRetrieve(FileTransferDefinition ft) {
00340         if (ft == null) {
00341             return;
00342         }
00343 
00344         fileTransferRetrieve.add(ft);
00345 
00346         if (logger.isDebugEnabled()) {
00347             logger.debug("mapped VirtualNode=" + name +
00348                 " with FileTransferRetrieve id=" + ft.getId());
00349         }
00350     }
00351 
00352     public VirtualMachine getVirtualMachine() {
00353         if (virtualMachines.isEmpty()) {
00354             return null;
00355         }
00356 
00357         VirtualMachine vm = (VirtualMachine) virtualMachines.get(lastVirtualMachineIndex);
00358 
00359         return vm;
00360     }
00361 
00368     public VirtualMachine getVirtualMachine(String name) {
00369         Iterator it = virtualMachines.iterator();
00370 
00371         while (it.hasNext()) {
00372             VirtualMachine vm = (VirtualMachine) it.next();
00373 
00374             if (vm.getName().equals(name)) {
00375                 return vm;
00376             }
00377         }
00378 
00379         return null;
00380     }
00381 
00385     public void activate() {
00386         if (!isActivated) {
00387             for (Iterator<VirtualMachine> it = virtualMachines.iterator();
00388                     it.hasNext();) {
00389                 VirtualMachine vm = it.next();
00390 
00391                 if (vm.getCreatorId() == null) {
00392                     vm.setCreatorId(this.name);
00393                 } else {
00394                     awaitedVirtualNodes.put(vm.getCreatorId(), vm);
00395                 }
00396             }
00397 
00398             proActiveRuntimeImpl.addRuntimeRegistrationEventListener(this);
00399             proActiveRuntimeImpl.registerLocalVirtualNode(this, this.name);
00400 
00401             for (int i = 0; i < virtualMachines.size(); i++) {
00402                 VirtualMachine vm = getVirtualMachine();
00403 
00404                 // first check if it is a process that is attached to the vm
00405                 if (vm.hasProcess()) {
00406                     boolean vmAlreadyAssigned = !((vm.getCreatorId()).equals(this.name));
00407 
00408                     //boolean vmAlreadyAssigned = vm.isActivated();
00409                     ExternalProcess process = getProcess(vm, vmAlreadyAssigned);
00410 
00411                     /*   //check if it's a gLiteProcess. If it's a gLiteProcess, get the cpu number and run the glite submission command "cpuNumber" times.
00412                        int cpuNumber = checkGLiteProcess(process);
00413                        if (cpuNumber > -1) {
00414                            while (cpuNumber > 0) {
00415                                try {
00416                                    setParameters(process, vm);
00417                                    process.startProcess();
00418                                    process.setStarted(false);
00419                                } catch (IOException e) {
00420                                    e.printStackTrace();
00421                                }
00422                                cpuNumber--;
00423                            }
00424                        }
00425                      */
00426 
00427                     //  get the rank of sequential process - return -1 if it does not exist
00428                     int rankOfSequentialProcess = checkForSequentialProcess(process);
00429 
00430                     // there's a sequential process in the hierarchical process
00431                     if (rankOfSequentialProcess > -1) {
00432                         ExternalProcess deepCopy = (ExternalProcess) makeDeepCopy(process);
00433 
00434                         // there's a process before the sequential one
00435                         if (rankOfSequentialProcess > 0) {
00436                             process = getSequentialProcessInHierarchie(process,
00437                                     rankOfSequentialProcess);
00438                         }
00439 
00440                         // check if the first element is a Service or a process
00441                         if (((AbstractSequentialListProcessDecorator) process).isFirstElementIsService()) {
00442                             UniversalService firstService = (UniversalService) ((AbstractSequentialListProcessDecorator) process).getFirstService();
00443 
00444                             //   It is a service that is mapped to the vm.
00445                             startService(firstService, vm);
00446                             globalTimeOut = System.currentTimeMillis() +
00447                                 timeout;
00448 
00449                             try {
00450                                 waitForAllNodesCreation();
00451                             } catch (NodeException e) {
00452                                 // TODO Auto-generated catch block
00453                                 e.printStackTrace();
00454                             }
00455                         } else {
00456                             ExternalProcess firstProcess = (ExternalProcess) ((AbstractSequentialListProcessDecorator) process).getFirstProcess();
00457 
00458                             // build the process with the rest of hierarchie
00459                             if (rankOfSequentialProcess > 0) {
00460                                 firstProcess = buildProcessWithHierarchie((ExternalProcess) makeDeepCopy(
00461                                             deepCopy), firstProcess,
00462                                         rankOfSequentialProcess);
00463                             }
00464 
00465                             setParameters(firstProcess, vm);
00466 
00467                             try {
00468                                 proActiveRuntimeImpl.createVM(firstProcess);
00469                                 globalTimeOut = System.currentTimeMillis() +
00470                                     timeout;
00471                                 waitForAllNodesCreation();
00472                             } catch (java.io.IOException e) {
00473                                 e.printStackTrace();
00474                             } catch (NodeException e1) {
00475                                 e1.printStackTrace();
00476                             }
00477                         }
00478 
00479                         try {
00480                             ExternalProcess nextProcess = null;
00481 
00482                             // loop on each process in the sequence
00483                             while ((nextProcess = (ExternalProcess) ((AbstractSequentialListProcessDecorator) process).getNextProcess()) != null) {
00484                                 boolean launchProcessManually = false;
00485 
00486                                 /* if process is a dependent process then each process
00487                                  * in the sequence list except the first one have to receive
00488                                  * an array of objects relative to their dependence.
00489                                  * we assume that this dependence is the generation ov nodes
00490                                  */
00491                                 if (process.isDependent()) {
00492                                     ((DependentProcess) nextProcess).setDependencyParameters(getNodes());
00493 
00494                                     if (nextProcess instanceof MPIProcess) {
00495                                         launchProcessManually = true;
00496                                     }
00497                                 }
00498 
00499                                 // rebuild the process with the rest of hierarchie
00500                                 if (rankOfSequentialProcess > 0) {
00501                                     nextProcess = this.buildProcessWithHierarchie((ExternalProcess) makeDeepCopy(
00502                                                 deepCopy), nextProcess,
00503                                             rankOfSequentialProcess);
00504                                 }
00505 
00506                                 if (!launchProcessManually) {
00507                                     setParameters(nextProcess, vm);
00508                                     proActiveRuntimeImpl.createVM(nextProcess);
00509                                     // initialization of the global timeout
00510                                     globalTimeOut = System.currentTimeMillis() +
00511                                         timeout;
00512                                     waitForAllNodesCreation();
00513                                 } else {
00514                                     mpiProcess = nextProcess;
00515                                 }
00516                             }
00517                         } catch (java.io.IOException e) {
00518                             e.printStackTrace();
00519                         } catch (NodeException e1) {
00520                             e1.printStackTrace();
00521                         }
00522                     } else {
00523                         // Test if that is this virtual Node that originates the creation of the vm
00524                         // else the vm was already created by another virtualNode, in that case, nothing is
00525                         // done at this point, nodes creation will occur when the runtime associated with the jvm
00526                         // will register.
00527                         if (!vmAlreadyAssigned) {
00528                             setParameters(process, vm);
00529 
00530                             try {
00531                                 // It is this virtual Node that originates the creation of the vm
00532                                 proActiveRuntimeImpl.createVM(process);
00533                             } catch (java.io.IOException e) {
00534                                 e.printStackTrace();
00535                                 logger.error("cannot activate virtualNode " +
00536                                     this.name + " with the process " +
00537                                     process.getCommand());
00538                             }
00539                         }
00540                     }
00541                 } else {
00542                     // It is a service that is mapped to the vm.
00543                     startService(vm);
00544                 }
00545 
00546                 increaseIndex();
00547             }
00548 
00549             // local nodes creation
00550             for (int i = 0; i < localVirtualMachines.size(); i++) {
00551                 String protocol = localVirtualMachines.get(i);
00552                 internalCreateNodeOnCurrentJvm(protocol);
00553             }
00554 
00555             //initialization of the global timeout
00556             globalTimeOut = System.currentTimeMillis() + timeout;
00557             isActivated = true;
00558 
00559             if (registration) {
00560                 register();
00561             }
00562 
00563             // FAULT TOLERANCE
00564             try {
00565                 if (this.ftService != null) {
00566                     // register nodes only if ressource is not null
00567                     this.ftService.registerRessources(this.getNodes());
00568                 }
00569             } catch (NodeException e) {
00570                 logger.error(e.getMessage());
00571             }
00572         } else {
00573             logger.debug("VirtualNode " + this.name + " already activated");
00574         }
00575     }
00576 
00581     public ExternalProcess getMPIProcess() {
00582         return (ExternalProcess) makeDeepCopy(mpiProcess);
00583     }
00584 
00585     public boolean hasMPIProcess() {
00586         return !(mpiProcess == null);
00587     }
00588 
00589     //returns the sequential process in the process hierarchie
00590     private ExternalProcess getSequentialProcessInHierarchie(
00591         ExternalProcess process, int rank) {
00592         while (rank > 0) {
00593             process = ((AbstractExternalProcessDecorator) process).getTargetProcess();
00594             rank--;
00595         }
00596 
00597         return process;
00598     }
00599 
00600     // returns a process such that the target of p is finalProcess
00601     private ExternalProcess buildProcessWithHierarchie(
00602         ExternalProcess process, ExternalProcess finalProcess, int rank) {
00603         if (rank == 0) {
00604             return finalProcess;
00605         } else {
00606             ((AbstractExternalProcessDecorator) process).setTargetProcess(buildProcessWithHierarchie(
00607                     ((AbstractExternalProcessDecorator) process).getTargetProcess(),
00608                     finalProcess, rank - 1));
00609 
00610             return process;
00611         }
00612     }
00613 
00614     // returns the rank of sequential process in the processes hierarchie, -1 otherwise
00615     private int checkForSequentialProcess(ExternalProcess process) {
00616         int res = 0;
00617 
00618         while (!(process instanceof JVMProcess)) {
00619             // a sequential process was found return its rank
00620             if (process.isSequential()) {
00621                 return res;
00622             } else {
00623                 res++;
00624                 process = ((ExternalProcess) ((ExternalProcessDecorator) process).getTargetProcess());
00625             }
00626         }
00627 
00628         return -1;
00629     }
00630 
00635     public boolean isMainVirtualNode() {
00636         return mainVirtualNode;
00637     }
00638 
00643     public String getPadURL() {
00644         return padURL;
00645     }
00646 
00651     public void setIsMainVirtualNode(boolean isMainVirtualNode) {
00652         mainVirtualNode = isMainVirtualNode;
00653     }
00654 
00659     public void setPadURL(String XMLpadURL) {
00660         this.padURL = XMLpadURL;
00661     }
00662 
00663     /*
00664      *
00665      * @see org.objectweb.proactive.core.descriptor.data.VirtualNode#getNbMappedNodes()
00666      */
00667     public int getNbMappedNodes() {
00668         if (isActivated) {
00669             return nbMappedNodes;
00670         } else {
00671             int nbMappedNodesTmp = 0;
00672 
00673             for (int i = 0; i < virtualMachines.size(); i++) {
00674                 VirtualMachine vm = getVirtualMachine();
00675 
00676                 // first check if it is a process that is attached to the vm
00677                 if (vm.hasProcess()) {
00678                     ExternalProcess process = vm.getProcess();
00679                     int nbNodesPerCreatedVM = new Integer(vm.getNbNodesOnCreatedVMs()).intValue();
00680 
00681                     if (process.getNodeNumber() == UniversalProcess.UNKNOWN_NODE_NUMBER) {
00682                         return UniversalProcess.UNKNOWN_NODE_NUMBER;
00683                     } else {
00684                         nbMappedNodesTmp = nbMappedNodesTmp +
00685                             (process.getNodeNumber() * nbNodesPerCreatedVM);
00686                     }
00687                 }
00688             }
00689 
00690             return nbMappedNodesTmp;
00691         }
00692     }
00693 
00697     public int createdNodeCount() {
00698         throw new RuntimeException(
00699             "This method is deprecated, use getNumberOfCurrentlyCreatedNodes() or getNumberOfCreatedNodesAfterDeployment()");
00700     }
00701 
00702     /*
00703      *  (non-Javadoc)
00704      * @see org.objectweb.proactive.core.descriptor.data.VirtualNode#getNumberOfCurrentlyCreatedNodes()
00705      */
00706     public int getNumberOfCurrentlyCreatedNodes() {
00707         return nbCreatedNodes;
00708     }
00709 
00710     /*
00711      *  (non-Javadoc)
00712      * @see org.objectweb.proactive.core.descriptor.data.VirtualNode#getNumberOfCreatedNodesAfterDeployment()
00713      */
00714     public int getNumberOfCreatedNodesAfterDeployment() {
00715         try {
00716             waitForAllNodesCreation();
00717         } catch (NodeException e) {
00718             logger.error("Problem occured while waiting for nodes creation");
00719         }
00720 
00721         return nbCreatedNodes;
00722     }
00723 
00729     public Node getNode() throws NodeException {
00730         //try first to get the Node from the createdNodes array to be continued
00731         Node node;
00732         waitForNodeCreation();
00733 
00734         if (!createdNodes.isEmpty()) {
00735             node = createdNodes.get(lastNodeIndex);
00736             increaseNodeIndex();
00737 
00738             //wait for pending file transfer
00739             FileVector fw = fileTransferDeployedStatus.get(node.getNodeInformation()
00740                                                                .getName());
00741 
00742             if (fw != null) {
00743                 fw.waitForAll(); //wait-by-necessity
00744             }
00745 
00746             return node;
00747         } else {
00748             throw new NodeException("Cannot get a node from Virtual Node " + this.name+". Descriptor in use : \""+descriptorURL+"\"." );
00749         }
00750     }
00751 
00752     @Deprecated
00753     public Node getNode(int index) throws NodeException {
00754         Node node = createdNodes.get(index);
00755 
00756         if (node == null) {
00757             throw new NodeException(
00758                 "Cannot return the first node, no nodes hava been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00759         }
00760 
00761         FileVector fw = fileTransferDeployedStatus.get(node.getNodeInformation()
00762                                                            .getName());
00763 
00764         if (fw != null) {
00765             fw.waitForAll(); //wait-by-necessity
00766         }
00767 
00768         return node;
00769     }
00770 
00771     public String[] getNodesURL() throws NodeException {
00772         String[] nodeNames;
00773 
00774         try {
00775             waitForAllNodesCreation();
00776         } catch (NodeException e) {
00777             logger.error(e.getMessage());
00778         }
00779 
00780         if (!createdNodes.isEmpty()) {
00781             synchronized (createdNodes) {
00782                 nodeNames = new String[createdNodes.size()];
00783 
00784                 for (int i = 0; i < createdNodes.size(); i++) {
00785                     nodeNames[i] = createdNodes.get(i).getNodeInformation()
00786                                                .getURL();
00787                 }
00788             }
00789         } else {
00790             if (!MAX_P2P) {
00791                 throw new NodeException(
00792                     "Cannot return nodes, no nodes have been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00793             } else {
00794                 logger.warn("WARN: No nodes have yet been created.");
00795                 logger.warn(
00796                     "WARN: This behavior might be normal, since P2P service is used with MAX number of nodes requested");
00797                 logger.warn("WARN: Returning empty array");
00798 
00799                 return new String[0];
00800             }
00801         }
00802 
00803         return nodeNames;
00804     }
00805 
00806     public Node[] getNodes() throws NodeException {
00807         Node[] nodeTab;
00808 
00809         try {
00810             waitForAllNodesCreation();
00811         } catch (NodeException e) {
00812             logger.error(e.getMessage());
00813         }
00814 
00815         if (!createdNodes.isEmpty()) {
00816             synchronized (createdNodes) {
00817                 nodeTab = new Node[createdNodes.size()];
00818 
00819                 for (int i = 0; i < createdNodes.size(); i++) {
00820                     nodeTab[i] = createdNodes.get(i);
00821                 }
00822             }
00823         } else {
00824             if (!MAX_P2P) {
00825                 throw new NodeException(
00826                     "Cannot return nodes, no nodes have been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00827             } else {
00828                 logger.warn("WARN: No nodes have yet been created.");
00829                 logger.warn(
00830                     "WARN: This behavior might be normal, since P2P service is used with MAX number of nodes requested");
00831                 logger.warn("WARN: Returning empty array");
00832 
00833                 return new Node[0];
00834             }
00835         }
00836 
00837         return nodeTab;
00838     }
00839 
00840     public Node getNode(String url) throws NodeException {
00841         Node node = null;
00842 
00843         try {
00844             waitForAllNodesCreation();
00845         } catch (NodeException e) {
00846             logger.error(e.getMessage());
00847         }
00848 
00849         if (!createdNodes.isEmpty()) {
00850             synchronized (createdNodes) {
00851                 for (int i = 0; i < createdNodes.size(); i++) {
00852                     if (createdNodes.get(i).getNodeInformation().getURL()
00853                                         .equals(url)) {
00854                         node = createdNodes.get(i);
00855 
00856                         break;
00857                     }
00858                 }
00859 
00860                 return node;
00861             }
00862         } else {
00863             throw new NodeException(
00864                 "Cannot return nodes, no nodes hava been created. Descriptor in use : \""+descriptorURL+"\".");
00865         }
00866     }
00867 
00868     public void killAll(boolean softly) {
00869         Node node;
00870         ProActiveRuntime part = null;
00871 
00872         if (isActivated) {
00873             // Killing p2p nodes
00874             if (this.p2pNodeslookupList.size() > 0) {
00875                 for (int index = 0; index < this.p2pNodeslookupList.size();
00876                         index++) {
00877                     P2PNodeLookup currentNodesLookup = this.p2pNodeslookupList.get(index);
00878                     currentNodesLookup.killAllNodes();
00879                 }
00880             }
00881 
00882             // Killing other nodes
00883             for (int i = 0; i < createdNodes.size(); i++) {
00884                 node = createdNodes.get(i);
00885                 part = node.getProActiveRuntime();
00886 
00887                 if (this.p2pNodes.contains(node)) {
00888                     continue;
00889                 }
00890 
00891                 //we have to be carefull. Indeed if the node is local, we do not
00892                 // want to kill the runtime, otherwise the application is over
00893                 // so if the node is local, we just unregister this node from any registry
00894                 if (!NodeFactory.isNodeLocal(node)) {
00895                     try {
00896                         part.killRT(softly);
00897                     } catch (ProActiveException e1) {
00898                         e1.printStackTrace();
00899                     } catch (Exception e) {
00900                         logger.info(" Virtual Machine " +
00901                             part.getVMInformation().getVMID() + " on host " +
00902                             UrlBuilder.getHostNameorIP(
00903                                 part.getVMInformation().getInetAddress()) +
00904                             " terminated!!!");
00905                     }
00906                 } else {
00907                     try {
00908                         //                                      the node is local, unregister it.
00909                         part.killNode(node.getNodeInformation().getURL());
00910                     } catch (ProActiveException e) {
00911                         e.printStackTrace();
00912                     }
00913                 }
00914             }
00915 
00916             isActivated = false;
00917 
00918             try {
00919                 //if registered in any regigistry, unregister everything
00920                 if (registration) {
00921                     ProActive.unregisterVirtualNode(this);
00922                 }
00923                 //else unregister just in the local runtime
00924                 else {
00925                     proActiveRuntimeImpl.unregisterVirtualNode(this.name);
00926                 }
00927             } catch (ProActiveException e) {
00928                 e.printStackTrace();
00929             }
00930 
00931             // if not activated unregister it from the local runtime
00932         } else {
00933             proActiveRuntimeImpl.unregisterVirtualNode(this.name);
00934         }
00935 
00936         for (int i = 0; i < createdRuntimeF.size(); i++) {
00937             part = createdRuntimeF.get(i);
00938 
00939             try {
00940                 part.killRT(true);
00941             } catch (Exception e) {
00942                 logger.info(" Forwarder " + part.getVMInformation().getVMID() +
00943                     " on host " +
00944                     UrlBuilder.getHostNameorIP(part.getVMInformation()
00945                                                    .getInetAddress()) +
00946                     " terminated!!!");
00947             }
00948         }
00949     }
00950 
00951     public void createNodeOnCurrentJvm(String protocol) {
00952         if (protocol == null) {
00953             protocol = System.getProperty("proactive.communication.protocol");
00954         }
00955 
00956         localVirtualMachines.add(protocol);
00957     }
00958 
00959     public Object getUniqueAO() throws ProActiveException {
00960         if (!property.equals("unique_singleAO")) {
00961             logger.warn(
00962                 "!!!!!!!!!!WARNING. This VirtualNode is not defined with unique_single_AO property in the XML descriptor. Calling getUniqueAO() on this VirtualNode can lead to unexpected behaviour");
00963         }
00964 
00965         if (uniqueActiveObject == null) {
00966             try {
00967                 Node node = getNode();
00968 
00969                 if (node.getActiveObjects().length > 1) {
00970                     logger.warn(
00971                         "!!!!!!!!!!WARNING. More than one active object is created on this VirtualNode.");
00972                 }
00973 
00974                 uniqueActiveObject = node.getActiveObjects()[0];
00975             } catch (Exception e) {
00976                 e.printStackTrace();
00977             }
00978         }
00979 
00980         if (uniqueActiveObject == null) {
00981             throw new ProActiveException(
00982                 "No active object are registered on this VirtualNode");
00983         }
00984 
00985         return uniqueActiveObject;
00986     }
00987 
00988     public boolean isActivated() {
00989         return isActivated;
00990     }
00991 
00995     public boolean isLookup() {
00996         return false;
00997     }
00998 
00999     //
01000     //-------------------IMPLEMENTS Job-----------------------------------
01001     //
01002 
01006     public String getJobID() {
01007         return this.jobID;
01008     }
01009     
01010     //
01011     //-------------------IMPLEMENTS RuntimeRegistrationEventListener------------
01012     //
01013     private transient ExecutorService rrThreadpool = Executors.newCachedThreadPool();
01014 
01015 
01016     //
01017     //-------------------IMPLEMENTS RuntimeRegistrationEventListener------------
01018     //
01019     public void runtimeRegistered(RuntimeRegistrationEvent event) {
01020         if (event.getType() == RuntimeRegistrationEvent.FORWARDER_RUNTIME_REGISTERED) {
01021             forwarderRuntimeRegisteredPerform(event);
01022         } else {
01023             // Try to avoid the contention at EventListener level
01024             // A listener shouldn't perform long computation, so a thread is spawned
01025             rrThreadpool.execute(new RuntimeRegisteredPerform(event));
01026         }
01027     }
01028 
01029     private class RuntimeRegisteredPerform implements Runnable {
01030         private RuntimeRegistrationEvent event;
01031 
01032         public RuntimeRegisteredPerform(RuntimeRegistrationEvent _event) {
01033             this.event = _event;
01034         }
01035 
01036         public void run() {
01037             runtimeRegisteredPerform(event);
01038         }
01039     }
01040     
01041     private synchronized void forwarderRuntimeRegisteredPerform(
01042         RuntimeRegistrationEvent event) {
01043         VirtualMachine virtualMachine = null;
01044 
01045         for (int i = 0; i < virtualMachines.size(); i++) {
01046             if (((VirtualMachine) virtualMachines.get(i)).getName()
01047                      .equals(event.getVmName())) {
01048                 virtualMachine = (VirtualMachine) virtualMachines.get(i);
01049             }
01050         }
01051 
01052         //Check if it this virtualNode that originates the process
01053         if ((event.getCreatorID().equals(this.name)) &&
01054                 (virtualMachine != null)) {
01055             if (logger.isDebugEnabled()) {
01056                 logger.debug("forwarder " + event.getCreatorID() +
01057                     " registered on virtualnode " + this.name);
01058             }
01059         }
01060 
01061         createdRuntimeF.add(event.getRegisteredRuntime());
01062     }
01063 
01064     private void runtimeRegisteredPerform(RuntimeRegistrationEvent event) {
01065         VirtualMachine virtualMachine = null;
01066 
01067         // Not synchronized because it is now impossible to add a new VirtualMachine
01068         for (int i = 0; i < virtualMachines.size(); i++) {
01069             if (((VirtualMachine) virtualMachines.get(i)).getName()
01070                      .equals(event.getVmName())) {
01071                 virtualMachine = (VirtualMachine) virtualMachines.get(i);
01072             }
01073         }
01074 
01075         //Check if it this virtualNode that originates the process
01076         if ((event.getCreatorID().equals(this.name)) &&
01077                 (virtualMachine != null)) {
01078             if (logger.isDebugEnabled()) {
01079                 logger.debug("runtime " + event.getCreatorID() +
01080                     " registered on virtualnode " + this.name);
01081             }
01082 
01083             createNodes(event, virtualMachine);
01084         }
01085 
01086         // Check if the virtualNode that originates the process is among awaited
01087         // VirtualNodes
01088         if (awaitedVirtualNodes.containsKey(event.getCreatorID())) {
01089             createNodes(event, awaitedVirtualNodes.get(event.getCreatorID()));
01090         }
01091     }
01092 
01093     private void createNodes(RuntimeRegistrationEvent event,
01094         VirtualMachine _virtualMachine) {
01095         String protocol;
01096         ProActiveRuntime proActiveRuntimeRegistered;
01097         String nodeHost;
01098         int port;
01099         String url;
01100 
01101         protocol = event.getProtocol();
01102         proActiveRuntimeRegistered = event.getRegisteredRuntime();
01103         nodeHost = proActiveRuntimeRegistered.getVMInformation().getHostName();
01104         port = UrlBuilder.getPortFromUrl(proActiveRuntimeRegistered.getURL());
01105         url = null;
01106 
01107         try {
01108             int nodeNumber = (new Integer((String) _virtualMachine.getNbNodesOnCreatedVMs())).intValue();
01109 
01110             for (int i = 1; i <= nodeNumber; i++) {
01111                 ProActiveSecurityManager siblingPSM = null;
01112 
01113                 if (proactiveSecurityManager != null) {
01114                     siblingPSM = proactiveSecurityManager.generateSiblingCertificate(this.name);
01115                 }
01116 
01117                 int registerAttempts = REGISTRATION_ATTEMPTS;
01118 
01119                 while (registerAttempts > 0) { // If there is an
01120                                                // AlreadyBoundException, we
01121                                                // will gerate an other
01122                                                // random node's name and
01123                                                // try to register it again
01124                     String nodeName = this.name +
01125                         Integer.toString(ProActiveRuntimeImpl.getNextInt());
01126                     url = buildURL(nodeHost, nodeName, protocol, port);
01127 
01128                     // nodes are created from the registered runtime, since
01129                     // this
01130                     // virtualNode is
01131                     // waiting for runtime registration to perform
01132                     // co-allocation
01133                     // in the jvm.
01134                     try {
01135                         proActiveRuntimeRegistered.createLocalNode(url, false,
01136                             siblingPSM, this.getName(), this.jobID);
01137 
01138                         // Registration has succeded.
01139                         performOperations(proActiveRuntimeRegistered, url,
01140                             protocol, event.getVmName());
01141 
01142                         break;
01143                     } catch (AlreadyBoundException e) {
01144                         registerAttempts--;
01145                     }
01146                 }
01147 
01148                 if (registerAttempts == 0) {
01149                     logger.warn("createLocalNode failed " +
01150                         REGISTRATION_ATTEMPTS + " times for runtime " +
01151                         nodeHost);
01152                 }
01153             }
01154         } catch (ProActiveException e) {
01155             e.printStackTrace();
01156         }
01157     }
01158 
01164     public void setRuntimeInformations(String information, String value)
01165         throws ProActiveException {
01166         //        try {
01167         //            checkProperty(information);
01168         //        } catch (ProActiveException e) {
01169         //            throw new ProActiveException("No property can be set at runtime on this VirtualNode",
01170         //                e);
01171         //        }
01172         //No need to check if the property exist since no property can be set 
01173         // at runtime on a VNImpl. This might change in the future.
01174         throw new ProActiveException(
01175             "No property can be set at runtime on this VirtualNode");
01176     }
01177 
01181     public void setService(UniversalService service) throws ProActiveException {
01182         if (FaultToleranceService.FT_SERVICE_NAME.equals(
01183                     service.getServiceName())) {
01184             this.ftService = (FaultToleranceService) service;
01185         } else {
01186             throw new ProActiveException(
01187                 " Unable to bind the given service to a virtual node");
01188         }
01189     }
01190 
01194     public String getUserClass() {
01195         return this.getClass().getName();
01196     }
01197 
01198     public void setRegistrationProtocol(String protocol) {
01199         setRegistrationValue(true);
01200         this.registrationProtocol = protocol;
01201     }
01202 
01203     public String getRegistrationProtocol() {
01204         return this.registrationProtocol;
01205     }
01206 
01214     public void setMinNumberOfNodes(int min) {
01215         this.minNumberOfNodes = min;
01216     }
01217 
01221     public int getMinNumberOfNodes() {
01222         return minNumberOfNodes;
01223     }
01224 
01228     public boolean isMultiple() {
01229         return ((virtualMachines.size() + localVirtualMachines.size()) > 1);
01230     }
01231 
01232     //
01233     //-------------------PRIVATE METHODS--------------------------------------
01234     //
01235     private void internalCreateNodeOnCurrentJvm(String protocol) {
01236         try {
01237             // this method should be called when in the xml document the tag currenJVM is encountered. It means that one node must be created
01238             // on the jvm that originates the creation of this virtualNode(the current jvm) and mapped on this virtualNode
01239             // we must increase the node count
01240             String url = null;
01241             increaseNumberOfNodes(1);
01242 
01243             // get the Runtime for the given protocol
01244             ProActiveRuntime defaultRuntime = RuntimeFactory.getProtocolSpecificRuntime(checkProtocol(
01245                         protocol));
01246 
01247             //create the node
01248             ProActiveSecurityManager siblingPSM = null;
01249 
01250             if (proactiveSecurityManager != null) {
01251                 siblingPSM = proactiveSecurityManager.generateSiblingCertificate(this.name);
01252             }
01253 
01254             int registrationAttempts = REGISTRATION_ATTEMPTS;
01255 
01256             while (registrationAttempts > 0) { //If there is an AlreadyBoundException, we generate an other random node name
01257                 String nodeName = this.name +
01258                     Integer.toString(ProActiveRuntimeImpl.getNextInt());
01259 
01260                 try {
01261                     url = defaultRuntime.createLocalNode(nodeName, false,
01262                             siblingPSM, this.getName(), ProActive.getJobId());
01263                     registrationAttempts = 0;
01264                 } catch (AlreadyBoundException e) {
01265                     registrationAttempts--;
01266                 }
01267             }
01268 
01269             //add this node to this virtualNode
01270             performOperations(defaultRuntime, url, protocol,
01271                 defaultRuntime.getVMInformation().getDescriptorVMName());
01272         } catch (Exception e) {
01273             e.printStackTrace();
01274         }
01275     }
01276 
01280     private synchronized void waitForNodeCreation() throws NodeException {
01281         while (!nodeCreated) {
01282             if (!timeoutExpired()) {
01283                 try {
01284                     wait(getTimeToSleep());
01285                 } catch (InterruptedException e2) {
01286                     e2.printStackTrace();
01287                 } catch (IllegalStateException e) {
01288                     // it may happen that we entered in the loop and just after
01289                     // the timeToSleep is < 0. It means that the timeout expired
01290                     // that is why we catch the runtime exception
01291                     throw new NodeException(
01292                         "After many retries, not even one node can be found for Virtual Node \"" +
01293                         this.name + "\" in descriptor \"" + this.descriptorURL +
01294                         "\".");
01295                 }
01296             } else {
01297                 throw new NodeException(
01298                     "After many retries, not even one node can be found for Virtual Node \"" +
01299                     this.name + "\" in descriptor \"" + this.descriptorURL + "\".");
01300             }
01301         }
01302 
01303         return;
01304     }
01305 
01309     public void waitForAllNodesCreation() throws NodeException {
01310         int tempNodeCount = nbMappedNodes;
01311 
01312         if (tempNodeCount != P2PConstants.MAX_NODE) {
01313             //nodeCount equal 0 means there is only a P2P service with MAX number of nodes requested
01314             // so if different of 0, we can set to false the boolean
01315             MAX_P2P = false;
01316         }
01317 
01318         if (waitForTimeout) {
01319             try {
01320                 Thread.sleep(timeout);
01321             } catch (InterruptedException e2) {
01322                 e2.printStackTrace();
01323             }
01324         } else {
01325             //behavior has to be moved in a synchronized method to avoid useless lock
01326             // when sleeping
01327             internalWait(tempNodeCount);
01328         }
01329 
01330         //wait for the nodes to complete their deployment file transfer
01331         Collection<FileVector> c = fileTransferDeployedStatus.values();
01332         Iterator<FileVector> it = c.iterator();
01333 
01334         while (it.hasNext()) {
01335             FileVector fw = it.next();
01336             fw.waitForAll(); //wait-by-necessity
01337         }
01338 
01339 //        rrThreadpool.shutdown();
01340 //        while (!rrThreadpool.isTerminated()) {
01341 //              try {
01342 //                Thread.sleep(100);
01343 //            } catch (InterruptedException e) {
01344 //                e.printStackTrace();
01345 //            }
01346 //        }
01347         return;
01348     }
01349 
01350     private synchronized void internalWait(int tempNodeCount)
01351         throws NodeException {
01352         // check if we can release the vn before all nodes expected, are created
01353         // i.e the minNumber of nodes is set
01354         if (minNumberOfNodes != 0) {
01355             tempNodeCount = minNumberOfNodes;
01356         }
01357 
01358         while (nbCreatedNodes < tempNodeCount) {
01359             if (!timeoutExpired()) {
01360                 try {
01361                     wait(getTimeToSleep());
01362                 } catch (InterruptedException e2) {
01363                     e2.printStackTrace();
01364                 } catch (IllegalStateException e) {
01365                     // it may happen that we entered in the loop and just after
01366                     // the timeToSleep is < 0. It means that the timeout expired
01367                     // that is why we catch the runtime exception
01368                     throw new NodeException("After many retries, only " +
01369                         nbCreatedNodes + " nodes are created on " +
01370                         tempNodeCount + " expected for Virtual Node \"" +
01371                         this.name + "\" in descriptor \"" + descriptorURL + "\".");
01372                 }
01373             } else {
01374                 throw new NodeException("After many retries, only " +
01375                     nbCreatedNodes + " nodes are created on " + tempNodeCount +
01376                     " expected for Virtual Node \"" + this.name +
01377                     "\" in descriptor \"" + descriptorURL + "\".");
01378             }
01379         }
01380     }
01381 
01382     private boolean timeoutExpired() {
01383         long currentTime = System.currentTimeMillis();
01384 
01385         return (globalTimeOut < currentTime);
01386     }
01387 
01388     private long getTimeToSleep() {
01389         // if timeToSleep is < 0 we throw an exception
01390         long timeToSleep = globalTimeOut - System.currentTimeMillis();
01391 
01392         if (timeToSleep > 0) {
01393             return timeToSleep;
01394         } else {
01395             throw new IllegalStateException("Timeout expired");
01396         }
01397     }
01398 
01404     private ExternalProcess getProcess(VirtualMachine vm,
01405         boolean vmAlreadyAssigned) {
01406         ExternalProcess copyProcess;
01407 
01408         //VirtualMachine vm = getVirtualMachine();
01409         ExternalProcess process = vm.getProcess();
01410 
01411         // we need to do a deep copy of the process otherwise,
01412         //modifications will be applied on one object that might 
01413         // be referenced by other virtualNodes .i.e check started
01414         if (!vmAlreadyAssigned) {
01415             copyProcess = (ExternalProcess) makeDeepCopy(process);
01416             vm.setProcess(copyProcess);
01417 
01418             return copyProcess;
01419         } else {
01420             //increment the node count by askedNodes
01421             increaseNumberOfNodes(process.getNodeNumber() * new Integer(
01422                     vm.getNbNodesOnCreatedVMs()).intValue());
01423 
01424             return process;
01425         }
01426     }
01427 
01432     private void setParameters(ExternalProcess process, VirtualMachine vm) {
01433         JVMProcess jvmProcess;
01434 
01435         //jobID = ProActive.getJobId();
01436         String protocolId = "";
01437         int nodeNumber = new Integer(vm.getNbNodesOnCreatedVMs()).intValue();
01438 
01439         if (logger.isDebugEnabled()) {
01440             logger.debug("asked for " + nodeNumber + " nodes");
01441         }
01442 
01443         protocolId = process.getProcessId();
01444 
01445         int cnt = process.getNodeNumber();
01446 
01447         if (cnt == UniversalProcess.UNKNOWN_NODE_NUMBER) {
01448             waitForTimeout = true;
01449         } else {
01450             increaseNumberOfNodes(cnt * nodeNumber);
01451         }
01452 
01453         //When the virtualNode will be activated, it has to launch the process
01454         //with such parameter.See StartRuntime
01455         jvmProcess = (JVMProcess) process.getFinalProcess();
01456         jvmProcess.setJvmOptions("-Dproactive.groupInformation=" +
01457             ProActiveRuntimeImpl.getProActiveRuntime().getVMInformation()
01458                                 .getVMID().toString() + "~" +
01459             jvmProcess.getNewGroupId());
01460 
01461         //if the target class is StartRuntime, then give parameters otherwise keep parameters
01462         if (jvmProcess.getClassname()
01463                           .equals("org.objectweb.proactive.core.runtime.StartRuntime")) {
01464             String vnName = this.name;
01465 
01466             String localruntimeURL = null;
01467 
01468             try {
01469                 localruntimeURL = RuntimeFactory.getDefaultRuntime().getURL();
01470 
01471                 if (process.getUsername() != null) {
01472                     localruntimeURL = System.getProperty("user.name") + "@" +
01473                         localruntimeURL;
01474                 }
01475             } catch (ProActiveException e) {
01476                 e.printStackTrace();
01477             }
01478 
01479             if (logger.isDebugEnabled()) {
01480                 logger.debug(localruntimeURL);
01481             }
01482 
01483             // if it is a main node we set the property for retrieving the pad
01484             if (mainVirtualNode || process.isHierarchical()) {
01485                 jvmProcess.setJvmOptions(" -Dproactive.pad=" + padURL);
01486             }
01487 
01488             jvmProcess.setJvmOptions("-Dproactive.jobid=" + jobID);
01489             jvmProcess.setParameters(vnName + " " + localruntimeURL + " " +
01490                 nodeNumber + " " + protocolId + " " + vm.getName());
01491 
01492             // FAULT TOLERANCE settings
01493             if (this.ftService != null) {
01494                 jvmProcess.setJvmOptions(this.ftService.buildParamsLine());
01495             }
01496         }
01497 
01498         /* Setting the file transfer definitions associated with the current process,
01499          * and defined at the process level.
01500          */
01501         FileTransferWorkShop ftsDeploy = process.getFileTransferWorkShopDeploy();
01502         FileTransferWorkShop ftsRetrieve = process.getFileTransferWorkShopRetrieve();
01503 
01504         if ((ftsDeploy != null) && ftsDeploy.isImplicit()) {
01505             for (int i = 0; i < fileTransferDeploy.size(); i++)
01506                 ftsDeploy.addFileTransfer(fileTransferDeploy.get(i));
01507         }
01508 
01509         if ((ftsRetrieve != null) && ftsRetrieve.isImplicit()) {
01510             for (int i = 0; i < fileTransferRetrieve.size(); i++)
01511                 ftsRetrieve.addFileTransfer(fileTransferRetrieve.get(i));
01512         }
01513     }
01514 
01520     private Object makeDeepCopy(Object process) {
01521         //deepCopyTag = true;
01522         Object result = null;
01523 
01524         try {
01525             java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
01526             java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(baos);
01527             oos.writeObject(process);
01528             oos.flush();
01529             oos.close();
01530 
01531             java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(baos.toByteArray());
01532             java.io.ObjectInputStream ois = new java.io.ObjectInputStream(bais);
01533             result = ois.readObject();
01534             ois.close();
01535         } catch (Exception e) {
01536             e.printStackTrace();
01537         }
01538 
01539         //deepCopyTag = false;
01540         return result;
01541     }
01542 
01543     private String buildURL(String host, String name, String protocol, int port) {
01544         if (port != 0) {
01545             return UrlBuilder.buildUrl(host, name, protocol, port);
01546         } else {
01547             return UrlBuilder.buildUrl(host, name, protocol);
01548         }
01549     }
01550 
01551     private void increaseIndex() {
01552         if (virtualMachines.size() > 1) {
01553             lastVirtualMachineIndex = (lastVirtualMachineIndex + 1) % virtualMachines.size();
01554         }
01555     }
01556 
01557     private void increaseNumberOfNodes(int n) {
01558         nbMappedNodes = nbMappedNodes + n;
01559 
01560         if (logger.isDebugEnabled()) {
01561             logger.debug("Number of nodes = " + nbMappedNodes);
01562         }
01563     }
01564 
01565     private void increaseNodeIndex() {
01566         if (createdNodes.size() > 1) {
01567             lastNodeIndex = (lastNodeIndex + 1) % createdNodes.size();
01568         }
01569     }
01570 
01571     private String checkProtocol(String protocol) {
01572         if (protocol.indexOf(":") == -1) {
01573             return protocol.concat(":");
01574         }
01575 
01576         return protocol;
01577     }
01578 
01579     private synchronized void performOperations(ProActiveRuntime part,
01580         String url, String protocol, String vmName) {
01581         Node node = new NodeImpl(part, url, checkProtocol(protocol),
01582                 this.jobID, vmName);
01583        synchronized (createdNodes) {
01584            createdNodes.add(node);      
01585         }
01586        
01587         logger.info("**** Mapping VirtualNode " + this.name + " with Node: " +
01588             url + " done");
01589         nodeCreated = true;
01590         nbCreatedNodes++;
01591 
01592         //Performe FileTransferDeploy (if needed)       
01593         FileVector fw = fileTransferDeploy(node);
01594         this.fileTransferDeployedStatus.put(node.getNodeInformation().getName(),
01595             fw);
01596 
01597         if (this.technicalService != null) {
01598             this.technicalService.apply(node);
01599         }
01600 
01601         // wakes up Thread that are waiting for the node creation 
01602         notifyAll();
01603 
01604         //notify all listeners that a node has been created
01605         notifyListeners(this, NodeCreationEvent.NODE_CREATED, node,
01606             nbCreatedNodes);
01607     }
01608 
01609     private void register() {
01610         try {
01611             waitForAllNodesCreation();
01612 
01613             //          ProActiveRuntime part = RuntimeFactory.getProtocolSpecificRuntime(registrationProtocol);
01614             //          part.registerVirtualnode(this.name,false);
01615             ProActive.registerVirtualNode(this, registrationProtocol, false);
01616         } catch (NodeException e) {
01617             logger.error(e.getMessage());
01618         } catch (ProActiveException e) {
01619             e.printStackTrace();
01620         } catch (AlreadyBoundException e) {
01621             logger.warn("The Virtual Node name " + this.getName() +
01622                 " is already bound in the registry", e);
01623         }
01624     }
01625 
01626     private void setRegistrationValue(boolean value) {
01627         this.registration = value;
01628     }
01629 
01630     private void startService(VirtualMachine vm) {
01631         UniversalService service = vm.getService();
01632 
01633         //we need to perform a deep copy. Indeed if several vm reference
01634         // the same service this might lead to unexpected behaviour
01635         UniversalService copyService = (UniversalService) makeDeepCopy(service);
01636         vm.setService(copyService);
01637 
01638         if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
01639             int nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
01640 
01641             // if it is a P2Pservice we must increase the node count with the number
01642             // of nodes requested
01643             if (nodeRequested != ((P2PDescriptorService) service).getMAX()) {
01644                 increaseNumberOfNodes(nodeRequested);
01645 
01646                 //nodeRequested = MAX means that the service will try to get every nodes 
01647                 // it can. So we can't predict how many nodes will return.
01648             } else {
01649                 MAX_P2P = true;
01650             }
01651         } else if (service.getServiceName()
01652                               .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
01653             int nodeRequested = ((SchedulerLookupService) service).getNodeNumber();
01654             increaseNumberOfNodes(nodeRequested);
01655         } else {
01656             //increase with 1 node
01657             increaseNumberOfNodes(1);
01658         }
01659 
01660         new ServiceThread(this, vm).start();
01661     }
01662 
01663     private void startService(UniversalService service, VirtualMachine vm) {
01664         //we need to perform a deep copy. Indeed if several vm reference
01665         // the same service this might lead to unexpected behaviour
01666         UniversalService copyService = (UniversalService) makeDeepCopy(service);
01667         vm.setService(copyService);
01668 
01669         if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
01670             int nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
01671 
01672             // if it is a P2Pservice we must increase the node count with the number
01673             // of nodes requested
01674             if (nodeRequested != ((P2PDescriptorService) service).getMAX()) {
01675                 increaseNumberOfNodes(nodeRequested);
01676 
01677                 //nodeRequested = MAX means that the service will try to get every nodes 
01678                 // it can. So we can't predict how many nodes will return.
01679             } else {
01680                 MAX_P2P = true;
01681             }
01682         } else if (service.getServiceName()
01683                               .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
01684             int nodeRequested = ((SchedulerLookupService) service).getNodeNumber();
01685             increaseNumberOfNodes(nodeRequested);
01686         } else {
01687             //increase with 1 node
01688             increaseNumberOfNodes(1);
01689         }
01690 
01691         new ServiceThread(this, vm).start();
01692     }
01693 
01694     private void writeObject(java.io.ObjectOutputStream out)
01695         throws java.io.IOException {
01696         if (isActivated) {
01697             try {
01698                 waitForAllNodesCreation();
01699             } catch (NodeException e) {
01700                 out.defaultWriteObject();
01701 
01702                 return;
01703             }
01704         }
01705         
01706         out.defaultWriteObject();
01707     }
01708 
01709     private void readObject(java.io.ObjectInputStream in)
01710         throws java.io.IOException, ClassNotFoundException {
01711         in.defaultReadObject();
01712         this.proActiveRuntimeImpl = (ProActiveRuntimeImpl) ProActiveRuntimeImpl.getProActiveRuntime();
01713         rrThreadpool = Executors.newCachedThreadPool();
01714     }
01715 
01716     // -------------------------------------------------------------------------
01717     // For P2P nodes acquisition
01718     // -------------------------------------------------------------------------
01719 
01724     public synchronized void nodeCreated(NodeCreationEvent event) {
01725         Node newNode = event.getNode();
01726         this.createdNodes.add(newNode);
01727         this.p2pNodes.add(newNode);
01728         nbCreatedNodes++;
01729         nodeCreated = true;
01730 
01731         if (this.technicalService != null) {
01732             this.technicalService.apply(newNode);
01733         }
01734 
01735         //notify all listeners that a node has been created
01736         notifyAll();
01737         notifyListeners(this, NodeCreationEvent.NODE_CREATED, newNode,
01738             nbCreatedNodes);
01739     }
01740 
01744     public void addP2PNodesLookup(P2PNodeLookup nodesLookup) {
01745         this.p2pNodeslookupList.add(nodesLookup);
01746         P2P_LOGGER.debug("A P2P nodes lookup added to the vn: " + this.name);
01747     }
01748 
01752     public FileVector fileTransferRetrieve()
01753         throws IOException, ProActiveException {
01754         Node[] nodes;
01755         FileVector fileVector = new FileVector();
01756 
01757         try {
01758             nodes = getNodes();
01759         } catch (NodeException e) {
01760             throw new ProActiveException(
01761                 "Can not Retrieve Files, since no nodes where created for Virtual Node" +
01762                 this.getName());
01763         }
01764 
01765         if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01766             FILETRANSFER_LOGGER.debug("Retrieving files for " + nodes.length +
01767                 " node(s).");
01768         }
01769 
01770         /* For all the nodes we get the VirtualMachine that spawned it, and
01771          * then the process linked with this VirtualMachine. We then obtain
01772          * the FileTransfer Retrieve Workshop from the process and using
01773          * the FileTransfer API we retrieve the Files.
01774          */
01775         for (int i = 0; i < nodes.length; i++) {
01776             String vmName = nodes[i].getNodeInformation().getDescriptorVMName();
01777             VirtualMachine vm = getVirtualMachine(vmName);
01778 
01779             if (vm == null) {
01780                 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01781                     FILETRANSFER_LOGGER.info("No VM found with name: " +
01782                         vmName + " for node: " +
01783                         nodes[i].getNodeInformation().getName());
01784                 }
01785 
01786                 continue;
01787             }
01788 
01789             //TODO We only get the VN for the first process in the chain. We should check if it is a SSH RSH,etc...
01790             ExternalProcess eProcess = vm.getProcess();
01791 
01792             if (eProcess == null) {
01793                 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01794                     FILETRANSFER_LOGGER.debug("No Process linked with VM: " +
01795                         vmName + " for node: " +
01796                         nodes[i].getNodeInformation().getName());
01797                 }
01798 
01799                 continue;
01800             }
01801 
01802             FileTransferWorkShop ftwRetrieve = eProcess.getFileTransferWorkShopRetrieve();
01803             FileDescription[] fd = ftwRetrieve.getAllFileDescriptions();
01804 
01805             File[] srcFile = new File[fd.length];
01806             File[] dstFile = new File[fd.length];
01807 
01808             for (int j = 0; j < fd.length; j++) {
01809                 srcFile[j] = new File(ftwRetrieve.getAbsoluteSrcPath(fd[j]));
01810                 dstFile[j] = new File(ftwRetrieve.getAbsoluteDstPath(fd[j]) +
01811                         "-" + nodes[i].getNodeInformation().getName());
01812             }
01813 
01814             long init = System.currentTimeMillis();
01815             fileVector.add(FileTransfer.pullFiles(nodes[i], srcFile, dstFile,
01816                     fileBlockSize, overlapping));
01817 
01818             if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01819                 FILETRANSFER_LOGGER.debug("Returned pullFiles in:" +
01820                     (System.currentTimeMillis() - init));
01821             }
01822         }
01823 
01824         return fileVector;
01825     }
01826 
01835     private FileVector fileTransferDeploy(Node node) {
01836         FileVector fileWrapper = new FileVector();
01837 
01838         if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01839             DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01840                 "File Transfer Deploy files for node" +
01841                 node.getNodeInformation().getName());
01842         }
01843 
01844         String vmName = node.getNodeInformation().getDescriptorVMName();
01845         VirtualMachine vm = getVirtualMachine(vmName);
01846 
01847         if (vm == null) {
01848             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01849                 DEPLOYMENT_FILETRANSFER_LOGGER.debug("No VM found with name: " +
01850                     vmName + " for node: " +
01851                     node.getNodeInformation().getName());
01852             }
01853 
01854             return fileWrapper;
01855         }
01856 
01857         //TODO We only get the VN for the first process in the chain. We should check if it is a SSH, SSH, etc...
01858         ExternalProcess eProcess = vm.getProcess();
01859 
01860         if (eProcess == null) {
01861             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01862                 DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01863                     "No Process linked with VM: " + vmName + " for node: " +
01864                     node.getNodeInformation().getName());
01865             }
01866 
01867             return fileWrapper;
01868         }
01869 
01870         //if the process handled the FileTransfer we have nothing to do
01871         if (!eProcess.isRequiredFileTransferDeployOnNodeCreation()) {
01872             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01873                 DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01874                     "No ProActive FileTransfer API is required for this node.");
01875             }
01876 
01877             return fileWrapper;
01878         }
01879 
01880         FileTransferWorkShop ftwDeploy = eProcess.getFileTransferWorkShopDeploy();
01881         FileDescription[] fd = ftwDeploy.getAllFileDescriptions();
01882 
01883         if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01884             DEPLOYMENT_FILETRANSFER_LOGGER.debug("Transfering " + fd.length +
01885                 " file(s)");
01886         }
01887 
01888         File[] filesSrc = new File[fd.length];
01889         File[] filesDst = new File[fd.length];
01890 
01891         for (int j = 0; j < fd.length; j++) {
01892             File srcFile = new File(ftwDeploy.getAbsoluteSrcPath(fd[j]));
01893             File dstFile = new File(ftwDeploy.getAbsoluteDstPath(fd[j]));
01894             filesSrc[j] = srcFile;
01895             filesDst[j] = dstFile;
01896         }
01897 
01898         try {
01899             fileWrapper.add(FileTransfer.pushFiles(node, filesSrc, filesDst,
01900                     fileBlockSize, overlapping));
01901         } catch (Exception e) {
01902             logger.error("Unable to pushFile files to node " +
01903                 node.getNodeInformation().getName());
01904 
01905             if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01906                 DEPLOYMENT_FILETRANSFER_LOGGER.debug(e.getStackTrace());
01907             }
01908         }
01909 
01910         return fileWrapper;
01911     }
01912 
01913     public void setFileTransferParams(int fileBlockSize, int overlapping) {
01914         this.fileBlockSize = fileBlockSize;
01915         this.overlapping = overlapping;
01916     }
01917 
01918     public void addTechnicalService(TechnicalService technicalWrapper) {
01919         this.technicalService = technicalWrapper;
01920     }
01921 
01922     public ArrayList getVirtualMachines() {
01923         return this.virtualMachines;
01924     }
01925 
01926 }

Generated on Mon Jan 22 15:16:07 2007 for ProActive by  doxygen 1.5.1