00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.descriptor.data;
00032
00033 import java.io.File;
00034 import java.io.IOException;
00035 import java.io.Serializable;
00036 import java.rmi.AlreadyBoundException;
00037 import java.util.ArrayList;
00038 import java.util.Collection;
00039 import java.util.HashMap;
00040 import java.util.Hashtable;
00041 import java.util.Iterator;
00042 import java.util.Vector;
00043 import java.util.concurrent.ExecutorService;
00044 import java.util.concurrent.Executors;
00045
00046 import org.apache.log4j.Logger;
00047 import org.objectweb.proactive.ProActive;
00048 import org.objectweb.proactive.core.ProActiveException;
00049 import org.objectweb.proactive.core.descriptor.services.FaultToleranceService;
00050 import org.objectweb.proactive.core.descriptor.services.P2PDescriptorService;
00051 import org.objectweb.proactive.core.descriptor.services.SchedulerLookupService;
00052 import org.objectweb.proactive.core.descriptor.services.ServiceThread;
00053 import org.objectweb.proactive.core.descriptor.services.ServiceUser;
00054 import org.objectweb.proactive.core.descriptor.services.TechnicalService;
00055 import org.objectweb.proactive.core.descriptor.services.UniversalService;
00056 import org.objectweb.proactive.core.event.NodeCreationEvent;
00057 import org.objectweb.proactive.core.event.NodeCreationEventListener;
00058 import org.objectweb.proactive.core.event.NodeCreationEventProducerImpl;
00059 import org.objectweb.proactive.core.event.RuntimeRegistrationEvent;
00060 import org.objectweb.proactive.core.event.RuntimeRegistrationEventListener;
00061 import org.objectweb.proactive.core.node.Node;
00062 import org.objectweb.proactive.core.node.NodeException;
00063 import org.objectweb.proactive.core.node.NodeFactory;
00064 import org.objectweb.proactive.core.node.NodeImpl;
00065 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator;
00066 import org.objectweb.proactive.core.process.AbstractSequentialListProcessDecorator;
00067 import org.objectweb.proactive.core.process.DependentProcess;
00068 import org.objectweb.proactive.core.process.ExternalProcess;
00069 import org.objectweb.proactive.core.process.ExternalProcessDecorator;
00070 import org.objectweb.proactive.core.process.JVMProcess;
00071 import org.objectweb.proactive.core.process.UniversalProcess;
00072 import org.objectweb.proactive.core.process.filetransfer.FileTransferDefinition;
00073 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00074 import org.objectweb.proactive.core.process.filetransfer.FileTransferDefinition.FileDescription;
00075 import org.objectweb.proactive.core.process.mpi.MPIProcess;
00076 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00077 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
00078 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00079 import org.objectweb.proactive.core.util.UrlBuilder;
00080 import org.objectweb.proactive.core.util.log.Loggers;
00081 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00082 import org.objectweb.proactive.ext.security.ProActiveSecurityManager;
00083 import org.objectweb.proactive.filetransfer.FileTransfer;
00084 import org.objectweb.proactive.filetransfer.FileVector;
00085 import org.objectweb.proactive.p2p.service.node.P2PNodeLookup;
00086 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00087 import org.objectweb.proactive.scheduler.SchedulerConstants;
00088
00089
00100 public class VirtualNodeImpl extends NodeCreationEventProducerImpl
00101 implements VirtualNode, Serializable, RuntimeRegistrationEventListener,
00102 NodeCreationEventListener, ServiceUser {
00103
00105 private final static Logger P2P_LOGGER = ProActiveLogger.getLogger(Loggers.P2P_VN);
00106 private final static Logger FILETRANSFER_LOGGER = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00107 private final static Logger DEPLOYMENT_FILETRANSFER_LOGGER = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_FILETRANSFER);
00108 public static int counter = 0;
00109
00110
00111
00112
00113
00115 protected transient ProActiveRuntimeImpl proActiveRuntimeImpl;
00116
00118 private String name;
00119
00121 private String property;
00122
00124 private java.util.ArrayList<VirtualMachine> virtualMachines;
00125
00127 private java.util.ArrayList<String> localVirtualMachines;
00128
00130 private int lastVirtualMachineIndex;
00131
00133 private ArrayList<ProActiveRuntime> createdRuntimeF;
00134
00136 private java.util.ArrayList<Node> createdNodes;
00137
00139 private java.util.ArrayList<FileTransferDefinition> fileTransferDeploy;
00140
00142 private java.util.ArrayList<FileTransferDefinition> fileTransferRetrieve;
00143
00145 private HashMap<String, FileVector> fileTransferDeployedStatus;
00146 private int fileBlockSize;
00147 private int overlapping;
00148
00150 private int lastNodeIndex;
00151
00153 private int nbMappedNodes;
00154
00158 private int minNumberOfNodes = 0;
00159
00161 private int nbCreatedNodes;
00162
00164 private boolean nodeCreated = false;
00165 private boolean isActivated = false;
00166
00169 private Hashtable<String, VirtualMachine> awaitedVirtualNodes;
00170 private String registrationProtocol;
00171 private boolean registration = false;
00172 private boolean waitForTimeout = false;
00173
00174
00175
00176 private boolean MAX_P2P = false;
00177
00178
00179
00181 protected long timeout = 70000;
00182
00184 protected long globalTimeOut;
00185 private Object uniqueActiveObject = null;
00186
00187
00188 private ProActiveSecurityManager proactiveSecurityManager;
00189 protected String jobID = ProActive.getJobId();
00190
00191
00192 private FaultToleranceService ftService;
00193 private Vector<Node> p2pNodes = new Vector<Node>();
00194
00195
00196 private boolean mainVirtualNode;
00197 private String padURL;
00198 private Vector<P2PNodeLookup> p2pNodeslookupList = new Vector<P2PNodeLookup>();
00199
00200
00201 private final int REGISTRATION_ATTEMPTS = 2;
00202
00203
00204 ExternalProcess mpiProcess = null;
00205 private TechnicalService technicalService;
00206 private String descriptorURL;
00207
00208
00209
00210
00211
00215 VirtualNodeImpl() {
00216 }
00217
00221 VirtualNodeImpl(String name,
00222 ProActiveSecurityManager proactiveSecurityManager, String padURL,
00223 boolean isMainVN, ProActiveDescriptor descriptor) {
00224
00225
00226
00227
00228 if (isMainVN) {
00229 this.name = name + "_" + (counter++);
00230 } else {
00231 this.name = name;
00232 }
00233
00234 virtualMachines = new java.util.ArrayList<VirtualMachine>(5);
00235 localVirtualMachines = new java.util.ArrayList<String>();
00236 createdNodes = new java.util.ArrayList<Node>();
00237 createdRuntimeF = new ArrayList<ProActiveRuntime>();
00238 awaitedVirtualNodes = new Hashtable<String, VirtualMachine>();
00239 fileTransferDeploy = new ArrayList<FileTransferDefinition>();
00240 fileTransferDeployedStatus = new HashMap<String, FileVector>();
00241 fileTransferRetrieve = new ArrayList<FileTransferDefinition>();
00242 proActiveRuntimeImpl = (ProActiveRuntimeImpl) ProActiveRuntimeImpl.getProActiveRuntime();
00243 fileBlockSize = org.objectweb.proactive.core.filetransfer.FileBlock.DEFAULT_BLOCK_SIZE;
00244 overlapping = org.objectweb.proactive.core.filetransfer.FileTransferService.DEFAULT_MAX_SIMULTANEOUS_BLOCKS;
00245
00246 if (logger.isDebugEnabled()) {
00247 logger.debug("vn " + this.name + " registered on " +
00248 proActiveRuntimeImpl.getVMInformation().getVMID().toString());
00249 }
00250
00251
00252 this.proactiveSecurityManager = proactiveSecurityManager;
00253
00254
00255 this.mainVirtualNode = isMainVN;
00256 this.padURL = padURL;
00257
00258 this.descriptorURL = descriptor.getProActiveDescriptorURL();
00259
00260 }
00261
00262
00263
00264
00265
00270 public void setProperty(String value) {
00271 this.property = value;
00272 }
00273
00274 public String getProperty() {
00275 return property;
00276 }
00277
00278 public long getTimeout() {
00279 return timeout;
00280 }
00281
00291 public void setTimeout(long timeout, boolean waitForTimeout) {
00292 this.timeout = timeout;
00293 this.waitForTimeout = waitForTimeout;
00294 }
00295
00300 public void setName(String s) {
00301 this.name = s;
00302 }
00303
00304 public String getName() {
00305 return name;
00306 }
00307
00308 public void addVirtualMachine(VirtualMachine virtualMachine) {
00309 if (isActivated) {
00310
00311
00312 logger.error(
00313 "addVirtualMachine() cannot be called when the virtualNode has been activated");
00314
00315 return;
00316 }
00317
00318 virtualMachines.add(virtualMachine);
00319
00320 if (logger.isDebugEnabled()) {
00321 logger.debug("mapped VirtualNode=" + name +
00322 " with VirtualMachine=" + virtualMachine.getName());
00323 }
00324 }
00325
00326 public void addFileTransferDeploy(FileTransferDefinition ft) {
00327 if (ft == null) {
00328 return;
00329 }
00330
00331 fileTransferDeploy.add(ft);
00332
00333 if (logger.isDebugEnabled()) {
00334 logger.debug("mapped VirtualNode=" + name +
00335 " with FileTransferDeploy id=" + ft.getId());
00336 }
00337 }
00338
00339 public void addFileTransferRetrieve(FileTransferDefinition ft) {
00340 if (ft == null) {
00341 return;
00342 }
00343
00344 fileTransferRetrieve.add(ft);
00345
00346 if (logger.isDebugEnabled()) {
00347 logger.debug("mapped VirtualNode=" + name +
00348 " with FileTransferRetrieve id=" + ft.getId());
00349 }
00350 }
00351
00352 public VirtualMachine getVirtualMachine() {
00353 if (virtualMachines.isEmpty()) {
00354 return null;
00355 }
00356
00357 VirtualMachine vm = (VirtualMachine) virtualMachines.get(lastVirtualMachineIndex);
00358
00359 return vm;
00360 }
00361
00368 public VirtualMachine getVirtualMachine(String name) {
00369 Iterator it = virtualMachines.iterator();
00370
00371 while (it.hasNext()) {
00372 VirtualMachine vm = (VirtualMachine) it.next();
00373
00374 if (vm.getName().equals(name)) {
00375 return vm;
00376 }
00377 }
00378
00379 return null;
00380 }
00381
00385 public void activate() {
00386 if (!isActivated) {
00387 for (Iterator<VirtualMachine> it = virtualMachines.iterator();
00388 it.hasNext();) {
00389 VirtualMachine vm = it.next();
00390
00391 if (vm.getCreatorId() == null) {
00392 vm.setCreatorId(this.name);
00393 } else {
00394 awaitedVirtualNodes.put(vm.getCreatorId(), vm);
00395 }
00396 }
00397
00398 proActiveRuntimeImpl.addRuntimeRegistrationEventListener(this);
00399 proActiveRuntimeImpl.registerLocalVirtualNode(this, this.name);
00400
00401 for (int i = 0; i < virtualMachines.size(); i++) {
00402 VirtualMachine vm = getVirtualMachine();
00403
00404
00405 if (vm.hasProcess()) {
00406 boolean vmAlreadyAssigned = !((vm.getCreatorId()).equals(this.name));
00407
00408
00409 ExternalProcess process = getProcess(vm, vmAlreadyAssigned);
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428 int rankOfSequentialProcess = checkForSequentialProcess(process);
00429
00430
00431 if (rankOfSequentialProcess > -1) {
00432 ExternalProcess deepCopy = (ExternalProcess) makeDeepCopy(process);
00433
00434
00435 if (rankOfSequentialProcess > 0) {
00436 process = getSequentialProcessInHierarchie(process,
00437 rankOfSequentialProcess);
00438 }
00439
00440
00441 if (((AbstractSequentialListProcessDecorator) process).isFirstElementIsService()) {
00442 UniversalService firstService = (UniversalService) ((AbstractSequentialListProcessDecorator) process).getFirstService();
00443
00444
00445 startService(firstService, vm);
00446 globalTimeOut = System.currentTimeMillis() +
00447 timeout;
00448
00449 try {
00450 waitForAllNodesCreation();
00451 } catch (NodeException e) {
00452
00453 e.printStackTrace();
00454 }
00455 } else {
00456 ExternalProcess firstProcess = (ExternalProcess) ((AbstractSequentialListProcessDecorator) process).getFirstProcess();
00457
00458
00459 if (rankOfSequentialProcess > 0) {
00460 firstProcess = buildProcessWithHierarchie((ExternalProcess) makeDeepCopy(
00461 deepCopy), firstProcess,
00462 rankOfSequentialProcess);
00463 }
00464
00465 setParameters(firstProcess, vm);
00466
00467 try {
00468 proActiveRuntimeImpl.createVM(firstProcess);
00469 globalTimeOut = System.currentTimeMillis() +
00470 timeout;
00471 waitForAllNodesCreation();
00472 } catch (java.io.IOException e) {
00473 e.printStackTrace();
00474 } catch (NodeException e1) {
00475 e1.printStackTrace();
00476 }
00477 }
00478
00479 try {
00480 ExternalProcess nextProcess = null;
00481
00482
00483 while ((nextProcess = (ExternalProcess) ((AbstractSequentialListProcessDecorator) process).getNextProcess()) != null) {
00484 boolean launchProcessManually = false;
00485
00486
00487
00488
00489
00490
00491 if (process.isDependent()) {
00492 ((DependentProcess) nextProcess).setDependencyParameters(getNodes());
00493
00494 if (nextProcess instanceof MPIProcess) {
00495 launchProcessManually = true;
00496 }
00497 }
00498
00499
00500 if (rankOfSequentialProcess > 0) {
00501 nextProcess = this.buildProcessWithHierarchie((ExternalProcess) makeDeepCopy(
00502 deepCopy), nextProcess,
00503 rankOfSequentialProcess);
00504 }
00505
00506 if (!launchProcessManually) {
00507 setParameters(nextProcess, vm);
00508 proActiveRuntimeImpl.createVM(nextProcess);
00509
00510 globalTimeOut = System.currentTimeMillis() +
00511 timeout;
00512 waitForAllNodesCreation();
00513 } else {
00514 mpiProcess = nextProcess;
00515 }
00516 }
00517 } catch (java.io.IOException e) {
00518 e.printStackTrace();
00519 } catch (NodeException e1) {
00520 e1.printStackTrace();
00521 }
00522 } else {
00523
00524
00525
00526
00527 if (!vmAlreadyAssigned) {
00528 setParameters(process, vm);
00529
00530 try {
00531
00532 proActiveRuntimeImpl.createVM(process);
00533 } catch (java.io.IOException e) {
00534 e.printStackTrace();
00535 logger.error("cannot activate virtualNode " +
00536 this.name + " with the process " +
00537 process.getCommand());
00538 }
00539 }
00540 }
00541 } else {
00542
00543 startService(vm);
00544 }
00545
00546 increaseIndex();
00547 }
00548
00549
00550 for (int i = 0; i < localVirtualMachines.size(); i++) {
00551 String protocol = localVirtualMachines.get(i);
00552 internalCreateNodeOnCurrentJvm(protocol);
00553 }
00554
00555
00556 globalTimeOut = System.currentTimeMillis() + timeout;
00557 isActivated = true;
00558
00559 if (registration) {
00560 register();
00561 }
00562
00563
00564 try {
00565 if (this.ftService != null) {
00566
00567 this.ftService.registerRessources(this.getNodes());
00568 }
00569 } catch (NodeException e) {
00570 logger.error(e.getMessage());
00571 }
00572 } else {
00573 logger.debug("VirtualNode " + this.name + " already activated");
00574 }
00575 }
00576
00581 public ExternalProcess getMPIProcess() {
00582 return (ExternalProcess) makeDeepCopy(mpiProcess);
00583 }
00584
00585 public boolean hasMPIProcess() {
00586 return !(mpiProcess == null);
00587 }
00588
00589
00590 private ExternalProcess getSequentialProcessInHierarchie(
00591 ExternalProcess process, int rank) {
00592 while (rank > 0) {
00593 process = ((AbstractExternalProcessDecorator) process).getTargetProcess();
00594 rank--;
00595 }
00596
00597 return process;
00598 }
00599
00600
00601 private ExternalProcess buildProcessWithHierarchie(
00602 ExternalProcess process, ExternalProcess finalProcess, int rank) {
00603 if (rank == 0) {
00604 return finalProcess;
00605 } else {
00606 ((AbstractExternalProcessDecorator) process).setTargetProcess(buildProcessWithHierarchie(
00607 ((AbstractExternalProcessDecorator) process).getTargetProcess(),
00608 finalProcess, rank - 1));
00609
00610 return process;
00611 }
00612 }
00613
00614
00615 private int checkForSequentialProcess(ExternalProcess process) {
00616 int res = 0;
00617
00618 while (!(process instanceof JVMProcess)) {
00619
00620 if (process.isSequential()) {
00621 return res;
00622 } else {
00623 res++;
00624 process = ((ExternalProcess) ((ExternalProcessDecorator) process).getTargetProcess());
00625 }
00626 }
00627
00628 return -1;
00629 }
00630
00635 public boolean isMainVirtualNode() {
00636 return mainVirtualNode;
00637 }
00638
00643 public String getPadURL() {
00644 return padURL;
00645 }
00646
00651 public void setIsMainVirtualNode(boolean isMainVirtualNode) {
00652 mainVirtualNode = isMainVirtualNode;
00653 }
00654
00659 public void setPadURL(String XMLpadURL) {
00660 this.padURL = XMLpadURL;
00661 }
00662
00663
00664
00665
00666
00667 public int getNbMappedNodes() {
00668 if (isActivated) {
00669 return nbMappedNodes;
00670 } else {
00671 int nbMappedNodesTmp = 0;
00672
00673 for (int i = 0; i < virtualMachines.size(); i++) {
00674 VirtualMachine vm = getVirtualMachine();
00675
00676
00677 if (vm.hasProcess()) {
00678 ExternalProcess process = vm.getProcess();
00679 int nbNodesPerCreatedVM = new Integer(vm.getNbNodesOnCreatedVMs()).intValue();
00680
00681 if (process.getNodeNumber() == UniversalProcess.UNKNOWN_NODE_NUMBER) {
00682 return UniversalProcess.UNKNOWN_NODE_NUMBER;
00683 } else {
00684 nbMappedNodesTmp = nbMappedNodesTmp +
00685 (process.getNodeNumber() * nbNodesPerCreatedVM);
00686 }
00687 }
00688 }
00689
00690 return nbMappedNodesTmp;
00691 }
00692 }
00693
00697 public int createdNodeCount() {
00698 throw new RuntimeException(
00699 "This method is deprecated, use getNumberOfCurrentlyCreatedNodes() or getNumberOfCreatedNodesAfterDeployment()");
00700 }
00701
00702
00703
00704
00705
00706 public int getNumberOfCurrentlyCreatedNodes() {
00707 return nbCreatedNodes;
00708 }
00709
00710
00711
00712
00713
00714 public int getNumberOfCreatedNodesAfterDeployment() {
00715 try {
00716 waitForAllNodesCreation();
00717 } catch (NodeException e) {
00718 logger.error("Problem occured while waiting for nodes creation");
00719 }
00720
00721 return nbCreatedNodes;
00722 }
00723
00729 public Node getNode() throws NodeException {
00730
00731 Node node;
00732 waitForNodeCreation();
00733
00734 if (!createdNodes.isEmpty()) {
00735 node = createdNodes.get(lastNodeIndex);
00736 increaseNodeIndex();
00737
00738
00739 FileVector fw = fileTransferDeployedStatus.get(node.getNodeInformation()
00740 .getName());
00741
00742 if (fw != null) {
00743 fw.waitForAll();
00744 }
00745
00746 return node;
00747 } else {
00748 throw new NodeException("Cannot get a node from Virtual Node " + this.name+". Descriptor in use : \""+descriptorURL+"\"." );
00749 }
00750 }
00751
00752 @Deprecated
00753 public Node getNode(int index) throws NodeException {
00754 Node node = createdNodes.get(index);
00755
00756 if (node == null) {
00757 throw new NodeException(
00758 "Cannot return the first node, no nodes hava been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00759 }
00760
00761 FileVector fw = fileTransferDeployedStatus.get(node.getNodeInformation()
00762 .getName());
00763
00764 if (fw != null) {
00765 fw.waitForAll();
00766 }
00767
00768 return node;
00769 }
00770
00771 public String[] getNodesURL() throws NodeException {
00772 String[] nodeNames;
00773
00774 try {
00775 waitForAllNodesCreation();
00776 } catch (NodeException e) {
00777 logger.error(e.getMessage());
00778 }
00779
00780 if (!createdNodes.isEmpty()) {
00781 synchronized (createdNodes) {
00782 nodeNames = new String[createdNodes.size()];
00783
00784 for (int i = 0; i < createdNodes.size(); i++) {
00785 nodeNames[i] = createdNodes.get(i).getNodeInformation()
00786 .getURL();
00787 }
00788 }
00789 } else {
00790 if (!MAX_P2P) {
00791 throw new NodeException(
00792 "Cannot return nodes, no nodes have been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00793 } else {
00794 logger.warn("WARN: No nodes have yet been created.");
00795 logger.warn(
00796 "WARN: This behavior might be normal, since P2P service is used with MAX number of nodes requested");
00797 logger.warn("WARN: Returning empty array");
00798
00799 return new String[0];
00800 }
00801 }
00802
00803 return nodeNames;
00804 }
00805
00806 public Node[] getNodes() throws NodeException {
00807 Node[] nodeTab;
00808
00809 try {
00810 waitForAllNodesCreation();
00811 } catch (NodeException e) {
00812 logger.error(e.getMessage());
00813 }
00814
00815 if (!createdNodes.isEmpty()) {
00816 synchronized (createdNodes) {
00817 nodeTab = new Node[createdNodes.size()];
00818
00819 for (int i = 0; i < createdNodes.size(); i++) {
00820 nodeTab[i] = createdNodes.get(i);
00821 }
00822 }
00823 } else {
00824 if (!MAX_P2P) {
00825 throw new NodeException(
00826 "Cannot return nodes, no nodes have been created for Virtual Node "+this.name+". Descriptor in use : \""+descriptorURL+"\".");
00827 } else {
00828 logger.warn("WARN: No nodes have yet been created.");
00829 logger.warn(
00830 "WARN: This behavior might be normal, since P2P service is used with MAX number of nodes requested");
00831 logger.warn("WARN: Returning empty array");
00832
00833 return new Node[0];
00834 }
00835 }
00836
00837 return nodeTab;
00838 }
00839
00840 public Node getNode(String url) throws NodeException {
00841 Node node = null;
00842
00843 try {
00844 waitForAllNodesCreation();
00845 } catch (NodeException e) {
00846 logger.error(e.getMessage());
00847 }
00848
00849 if (!createdNodes.isEmpty()) {
00850 synchronized (createdNodes) {
00851 for (int i = 0; i < createdNodes.size(); i++) {
00852 if (createdNodes.get(i).getNodeInformation().getURL()
00853 .equals(url)) {
00854 node = createdNodes.get(i);
00855
00856 break;
00857 }
00858 }
00859
00860 return node;
00861 }
00862 } else {
00863 throw new NodeException(
00864 "Cannot return nodes, no nodes hava been created. Descriptor in use : \""+descriptorURL+"\".");
00865 }
00866 }
00867
00868 public void killAll(boolean softly) {
00869 Node node;
00870 ProActiveRuntime part = null;
00871
00872 if (isActivated) {
00873
00874 if (this.p2pNodeslookupList.size() > 0) {
00875 for (int index = 0; index < this.p2pNodeslookupList.size();
00876 index++) {
00877 P2PNodeLookup currentNodesLookup = this.p2pNodeslookupList.get(index);
00878 currentNodesLookup.killAllNodes();
00879 }
00880 }
00881
00882
00883 for (int i = 0; i < createdNodes.size(); i++) {
00884 node = createdNodes.get(i);
00885 part = node.getProActiveRuntime();
00886
00887 if (this.p2pNodes.contains(node)) {
00888 continue;
00889 }
00890
00891
00892
00893
00894 if (!NodeFactory.isNodeLocal(node)) {
00895 try {
00896 part.killRT(softly);
00897 } catch (ProActiveException e1) {
00898 e1.printStackTrace();
00899 } catch (Exception e) {
00900 logger.info(" Virtual Machine " +
00901 part.getVMInformation().getVMID() + " on host " +
00902 UrlBuilder.getHostNameorIP(
00903 part.getVMInformation().getInetAddress()) +
00904 " terminated!!!");
00905 }
00906 } else {
00907 try {
00908
00909 part.killNode(node.getNodeInformation().getURL());
00910 } catch (ProActiveException e) {
00911 e.printStackTrace();
00912 }
00913 }
00914 }
00915
00916 isActivated = false;
00917
00918 try {
00919
00920 if (registration) {
00921 ProActive.unregisterVirtualNode(this);
00922 }
00923
00924 else {
00925 proActiveRuntimeImpl.unregisterVirtualNode(this.name);
00926 }
00927 } catch (ProActiveException e) {
00928 e.printStackTrace();
00929 }
00930
00931
00932 } else {
00933 proActiveRuntimeImpl.unregisterVirtualNode(this.name);
00934 }
00935
00936 for (int i = 0; i < createdRuntimeF.size(); i++) {
00937 part = createdRuntimeF.get(i);
00938
00939 try {
00940 part.killRT(true);
00941 } catch (Exception e) {
00942 logger.info(" Forwarder " + part.getVMInformation().getVMID() +
00943 " on host " +
00944 UrlBuilder.getHostNameorIP(part.getVMInformation()
00945 .getInetAddress()) +
00946 " terminated!!!");
00947 }
00948 }
00949 }
00950
00951 public void createNodeOnCurrentJvm(String protocol) {
00952 if (protocol == null) {
00953 protocol = System.getProperty("proactive.communication.protocol");
00954 }
00955
00956 localVirtualMachines.add(protocol);
00957 }
00958
00959 public Object getUniqueAO() throws ProActiveException {
00960 if (!property.equals("unique_singleAO")) {
00961 logger.warn(
00962 "!!!!!!!!!!WARNING. This VirtualNode is not defined with unique_single_AO property in the XML descriptor. Calling getUniqueAO() on this VirtualNode can lead to unexpected behaviour");
00963 }
00964
00965 if (uniqueActiveObject == null) {
00966 try {
00967 Node node = getNode();
00968
00969 if (node.getActiveObjects().length > 1) {
00970 logger.warn(
00971 "!!!!!!!!!!WARNING. More than one active object is created on this VirtualNode.");
00972 }
00973
00974 uniqueActiveObject = node.getActiveObjects()[0];
00975 } catch (Exception e) {
00976 e.printStackTrace();
00977 }
00978 }
00979
00980 if (uniqueActiveObject == null) {
00981 throw new ProActiveException(
00982 "No active object are registered on this VirtualNode");
00983 }
00984
00985 return uniqueActiveObject;
00986 }
00987
00988 public boolean isActivated() {
00989 return isActivated;
00990 }
00991
00995 public boolean isLookup() {
00996 return false;
00997 }
00998
00999
01000
01001
01002
01006 public String getJobID() {
01007 return this.jobID;
01008 }
01009
01010
01011
01012
01013 private transient ExecutorService rrThreadpool = Executors.newCachedThreadPool();
01014
01015
01016
01017
01018
01019 public void runtimeRegistered(RuntimeRegistrationEvent event) {
01020 if (event.getType() == RuntimeRegistrationEvent.FORWARDER_RUNTIME_REGISTERED) {
01021 forwarderRuntimeRegisteredPerform(event);
01022 } else {
01023
01024
01025 rrThreadpool.execute(new RuntimeRegisteredPerform(event));
01026 }
01027 }
01028
01029 private class RuntimeRegisteredPerform implements Runnable {
01030 private RuntimeRegistrationEvent event;
01031
01032 public RuntimeRegisteredPerform(RuntimeRegistrationEvent _event) {
01033 this.event = _event;
01034 }
01035
01036 public void run() {
01037 runtimeRegisteredPerform(event);
01038 }
01039 }
01040
01041 private synchronized void forwarderRuntimeRegisteredPerform(
01042 RuntimeRegistrationEvent event) {
01043 VirtualMachine virtualMachine = null;
01044
01045 for (int i = 0; i < virtualMachines.size(); i++) {
01046 if (((VirtualMachine) virtualMachines.get(i)).getName()
01047 .equals(event.getVmName())) {
01048 virtualMachine = (VirtualMachine) virtualMachines.get(i);
01049 }
01050 }
01051
01052
01053 if ((event.getCreatorID().equals(this.name)) &&
01054 (virtualMachine != null)) {
01055 if (logger.isDebugEnabled()) {
01056 logger.debug("forwarder " + event.getCreatorID() +
01057 " registered on virtualnode " + this.name);
01058 }
01059 }
01060
01061 createdRuntimeF.add(event.getRegisteredRuntime());
01062 }
01063
01064 private void runtimeRegisteredPerform(RuntimeRegistrationEvent event) {
01065 VirtualMachine virtualMachine = null;
01066
01067
01068 for (int i = 0; i < virtualMachines.size(); i++) {
01069 if (((VirtualMachine) virtualMachines.get(i)).getName()
01070 .equals(event.getVmName())) {
01071 virtualMachine = (VirtualMachine) virtualMachines.get(i);
01072 }
01073 }
01074
01075
01076 if ((event.getCreatorID().equals(this.name)) &&
01077 (virtualMachine != null)) {
01078 if (logger.isDebugEnabled()) {
01079 logger.debug("runtime " + event.getCreatorID() +
01080 " registered on virtualnode " + this.name);
01081 }
01082
01083 createNodes(event, virtualMachine);
01084 }
01085
01086
01087
01088 if (awaitedVirtualNodes.containsKey(event.getCreatorID())) {
01089 createNodes(event, awaitedVirtualNodes.get(event.getCreatorID()));
01090 }
01091 }
01092
01093 private void createNodes(RuntimeRegistrationEvent event,
01094 VirtualMachine _virtualMachine) {
01095 String protocol;
01096 ProActiveRuntime proActiveRuntimeRegistered;
01097 String nodeHost;
01098 int port;
01099 String url;
01100
01101 protocol = event.getProtocol();
01102 proActiveRuntimeRegistered = event.getRegisteredRuntime();
01103 nodeHost = proActiveRuntimeRegistered.getVMInformation().getHostName();
01104 port = UrlBuilder.getPortFromUrl(proActiveRuntimeRegistered.getURL());
01105 url = null;
01106
01107 try {
01108 int nodeNumber = (new Integer((String) _virtualMachine.getNbNodesOnCreatedVMs())).intValue();
01109
01110 for (int i = 1; i <= nodeNumber; i++) {
01111 ProActiveSecurityManager siblingPSM = null;
01112
01113 if (proactiveSecurityManager != null) {
01114 siblingPSM = proactiveSecurityManager.generateSiblingCertificate(this.name);
01115 }
01116
01117 int registerAttempts = REGISTRATION_ATTEMPTS;
01118
01119 while (registerAttempts > 0) {
01120
01121
01122
01123
01124 String nodeName = this.name +
01125 Integer.toString(ProActiveRuntimeImpl.getNextInt());
01126 url = buildURL(nodeHost, nodeName, protocol, port);
01127
01128
01129
01130
01131
01132
01133
01134 try {
01135 proActiveRuntimeRegistered.createLocalNode(url, false,
01136 siblingPSM, this.getName(), this.jobID);
01137
01138
01139 performOperations(proActiveRuntimeRegistered, url,
01140 protocol, event.getVmName());
01141
01142 break;
01143 } catch (AlreadyBoundException e) {
01144 registerAttempts--;
01145 }
01146 }
01147
01148 if (registerAttempts == 0) {
01149 logger.warn("createLocalNode failed " +
01150 REGISTRATION_ATTEMPTS + " times for runtime " +
01151 nodeHost);
01152 }
01153 }
01154 } catch (ProActiveException e) {
01155 e.printStackTrace();
01156 }
01157 }
01158
01164 public void setRuntimeInformations(String information, String value)
01165 throws ProActiveException {
01166
01167
01168
01169
01170
01171
01172
01173
01174 throw new ProActiveException(
01175 "No property can be set at runtime on this VirtualNode");
01176 }
01177
01181 public void setService(UniversalService service) throws ProActiveException {
01182 if (FaultToleranceService.FT_SERVICE_NAME.equals(
01183 service.getServiceName())) {
01184 this.ftService = (FaultToleranceService) service;
01185 } else {
01186 throw new ProActiveException(
01187 " Unable to bind the given service to a virtual node");
01188 }
01189 }
01190
01194 public String getUserClass() {
01195 return this.getClass().getName();
01196 }
01197
01198 public void setRegistrationProtocol(String protocol) {
01199 setRegistrationValue(true);
01200 this.registrationProtocol = protocol;
01201 }
01202
01203 public String getRegistrationProtocol() {
01204 return this.registrationProtocol;
01205 }
01206
01214 public void setMinNumberOfNodes(int min) {
01215 this.minNumberOfNodes = min;
01216 }
01217
01221 public int getMinNumberOfNodes() {
01222 return minNumberOfNodes;
01223 }
01224
01228 public boolean isMultiple() {
01229 return ((virtualMachines.size() + localVirtualMachines.size()) > 1);
01230 }
01231
01232
01233
01234
01235 private void internalCreateNodeOnCurrentJvm(String protocol) {
01236 try {
01237
01238
01239
01240 String url = null;
01241 increaseNumberOfNodes(1);
01242
01243
01244 ProActiveRuntime defaultRuntime = RuntimeFactory.getProtocolSpecificRuntime(checkProtocol(
01245 protocol));
01246
01247
01248 ProActiveSecurityManager siblingPSM = null;
01249
01250 if (proactiveSecurityManager != null) {
01251 siblingPSM = proactiveSecurityManager.generateSiblingCertificate(this.name);
01252 }
01253
01254 int registrationAttempts = REGISTRATION_ATTEMPTS;
01255
01256 while (registrationAttempts > 0) {
01257 String nodeName = this.name +
01258 Integer.toString(ProActiveRuntimeImpl.getNextInt());
01259
01260 try {
01261 url = defaultRuntime.createLocalNode(nodeName, false,
01262 siblingPSM, this.getName(), ProActive.getJobId());
01263 registrationAttempts = 0;
01264 } catch (AlreadyBoundException e) {
01265 registrationAttempts--;
01266 }
01267 }
01268
01269
01270 performOperations(defaultRuntime, url, protocol,
01271 defaultRuntime.getVMInformation().getDescriptorVMName());
01272 } catch (Exception e) {
01273 e.printStackTrace();
01274 }
01275 }
01276
01280 private synchronized void waitForNodeCreation() throws NodeException {
01281 while (!nodeCreated) {
01282 if (!timeoutExpired()) {
01283 try {
01284 wait(getTimeToSleep());
01285 } catch (InterruptedException e2) {
01286 e2.printStackTrace();
01287 } catch (IllegalStateException e) {
01288
01289
01290
01291 throw new NodeException(
01292 "After many retries, not even one node can be found for Virtual Node \"" +
01293 this.name + "\" in descriptor \"" + this.descriptorURL +
01294 "\".");
01295 }
01296 } else {
01297 throw new NodeException(
01298 "After many retries, not even one node can be found for Virtual Node \"" +
01299 this.name + "\" in descriptor \"" + this.descriptorURL + "\".");
01300 }
01301 }
01302
01303 return;
01304 }
01305
01309 public void waitForAllNodesCreation() throws NodeException {
01310 int tempNodeCount = nbMappedNodes;
01311
01312 if (tempNodeCount != P2PConstants.MAX_NODE) {
01313
01314
01315 MAX_P2P = false;
01316 }
01317
01318 if (waitForTimeout) {
01319 try {
01320 Thread.sleep(timeout);
01321 } catch (InterruptedException e2) {
01322 e2.printStackTrace();
01323 }
01324 } else {
01325
01326
01327 internalWait(tempNodeCount);
01328 }
01329
01330
01331 Collection<FileVector> c = fileTransferDeployedStatus.values();
01332 Iterator<FileVector> it = c.iterator();
01333
01334 while (it.hasNext()) {
01335 FileVector fw = it.next();
01336 fw.waitForAll();
01337 }
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347 return;
01348 }
01349
01350 private synchronized void internalWait(int tempNodeCount)
01351 throws NodeException {
01352
01353
01354 if (minNumberOfNodes != 0) {
01355 tempNodeCount = minNumberOfNodes;
01356 }
01357
01358 while (nbCreatedNodes < tempNodeCount) {
01359 if (!timeoutExpired()) {
01360 try {
01361 wait(getTimeToSleep());
01362 } catch (InterruptedException e2) {
01363 e2.printStackTrace();
01364 } catch (IllegalStateException e) {
01365
01366
01367
01368 throw new NodeException("After many retries, only " +
01369 nbCreatedNodes + " nodes are created on " +
01370 tempNodeCount + " expected for Virtual Node \"" +
01371 this.name + "\" in descriptor \"" + descriptorURL + "\".");
01372 }
01373 } else {
01374 throw new NodeException("After many retries, only " +
01375 nbCreatedNodes + " nodes are created on " + tempNodeCount +
01376 " expected for Virtual Node \"" + this.name +
01377 "\" in descriptor \"" + descriptorURL + "\".");
01378 }
01379 }
01380 }
01381
01382 private boolean timeoutExpired() {
01383 long currentTime = System.currentTimeMillis();
01384
01385 return (globalTimeOut < currentTime);
01386 }
01387
01388 private long getTimeToSleep() {
01389
01390 long timeToSleep = globalTimeOut - System.currentTimeMillis();
01391
01392 if (timeToSleep > 0) {
01393 return timeToSleep;
01394 } else {
01395 throw new IllegalStateException("Timeout expired");
01396 }
01397 }
01398
01404 private ExternalProcess getProcess(VirtualMachine vm,
01405 boolean vmAlreadyAssigned) {
01406 ExternalProcess copyProcess;
01407
01408
01409 ExternalProcess process = vm.getProcess();
01410
01411
01412
01413
01414 if (!vmAlreadyAssigned) {
01415 copyProcess = (ExternalProcess) makeDeepCopy(process);
01416 vm.setProcess(copyProcess);
01417
01418 return copyProcess;
01419 } else {
01420
01421 increaseNumberOfNodes(process.getNodeNumber() * new Integer(
01422 vm.getNbNodesOnCreatedVMs()).intValue());
01423
01424 return process;
01425 }
01426 }
01427
01432 private void setParameters(ExternalProcess process, VirtualMachine vm) {
01433 JVMProcess jvmProcess;
01434
01435
01436 String protocolId = "";
01437 int nodeNumber = new Integer(vm.getNbNodesOnCreatedVMs()).intValue();
01438
01439 if (logger.isDebugEnabled()) {
01440 logger.debug("asked for " + nodeNumber + " nodes");
01441 }
01442
01443 protocolId = process.getProcessId();
01444
01445 int cnt = process.getNodeNumber();
01446
01447 if (cnt == UniversalProcess.UNKNOWN_NODE_NUMBER) {
01448 waitForTimeout = true;
01449 } else {
01450 increaseNumberOfNodes(cnt * nodeNumber);
01451 }
01452
01453
01454
01455 jvmProcess = (JVMProcess) process.getFinalProcess();
01456 jvmProcess.setJvmOptions("-Dproactive.groupInformation=" +
01457 ProActiveRuntimeImpl.getProActiveRuntime().getVMInformation()
01458 .getVMID().toString() + "~" +
01459 jvmProcess.getNewGroupId());
01460
01461
01462 if (jvmProcess.getClassname()
01463 .equals("org.objectweb.proactive.core.runtime.StartRuntime")) {
01464 String vnName = this.name;
01465
01466 String localruntimeURL = null;
01467
01468 try {
01469 localruntimeURL = RuntimeFactory.getDefaultRuntime().getURL();
01470
01471 if (process.getUsername() != null) {
01472 localruntimeURL = System.getProperty("user.name") + "@" +
01473 localruntimeURL;
01474 }
01475 } catch (ProActiveException e) {
01476 e.printStackTrace();
01477 }
01478
01479 if (logger.isDebugEnabled()) {
01480 logger.debug(localruntimeURL);
01481 }
01482
01483
01484 if (mainVirtualNode || process.isHierarchical()) {
01485 jvmProcess.setJvmOptions(" -Dproactive.pad=" + padURL);
01486 }
01487
01488 jvmProcess.setJvmOptions("-Dproactive.jobid=" + jobID);
01489 jvmProcess.setParameters(vnName + " " + localruntimeURL + " " +
01490 nodeNumber + " " + protocolId + " " + vm.getName());
01491
01492
01493 if (this.ftService != null) {
01494 jvmProcess.setJvmOptions(this.ftService.buildParamsLine());
01495 }
01496 }
01497
01498
01499
01500
01501 FileTransferWorkShop ftsDeploy = process.getFileTransferWorkShopDeploy();
01502 FileTransferWorkShop ftsRetrieve = process.getFileTransferWorkShopRetrieve();
01503
01504 if ((ftsDeploy != null) && ftsDeploy.isImplicit()) {
01505 for (int i = 0; i < fileTransferDeploy.size(); i++)
01506 ftsDeploy.addFileTransfer(fileTransferDeploy.get(i));
01507 }
01508
01509 if ((ftsRetrieve != null) && ftsRetrieve.isImplicit()) {
01510 for (int i = 0; i < fileTransferRetrieve.size(); i++)
01511 ftsRetrieve.addFileTransfer(fileTransferRetrieve.get(i));
01512 }
01513 }
01514
01520 private Object makeDeepCopy(Object process) {
01521
01522 Object result = null;
01523
01524 try {
01525 java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
01526 java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(baos);
01527 oos.writeObject(process);
01528 oos.flush();
01529 oos.close();
01530
01531 java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(baos.toByteArray());
01532 java.io.ObjectInputStream ois = new java.io.ObjectInputStream(bais);
01533 result = ois.readObject();
01534 ois.close();
01535 } catch (Exception e) {
01536 e.printStackTrace();
01537 }
01538
01539
01540 return result;
01541 }
01542
01543 private String buildURL(String host, String name, String protocol, int port) {
01544 if (port != 0) {
01545 return UrlBuilder.buildUrl(host, name, protocol, port);
01546 } else {
01547 return UrlBuilder.buildUrl(host, name, protocol);
01548 }
01549 }
01550
01551 private void increaseIndex() {
01552 if (virtualMachines.size() > 1) {
01553 lastVirtualMachineIndex = (lastVirtualMachineIndex + 1) % virtualMachines.size();
01554 }
01555 }
01556
01557 private void increaseNumberOfNodes(int n) {
01558 nbMappedNodes = nbMappedNodes + n;
01559
01560 if (logger.isDebugEnabled()) {
01561 logger.debug("Number of nodes = " + nbMappedNodes);
01562 }
01563 }
01564
01565 private void increaseNodeIndex() {
01566 if (createdNodes.size() > 1) {
01567 lastNodeIndex = (lastNodeIndex + 1) % createdNodes.size();
01568 }
01569 }
01570
01571 private String checkProtocol(String protocol) {
01572 if (protocol.indexOf(":") == -1) {
01573 return protocol.concat(":");
01574 }
01575
01576 return protocol;
01577 }
01578
01579 private synchronized void performOperations(ProActiveRuntime part,
01580 String url, String protocol, String vmName) {
01581 Node node = new NodeImpl(part, url, checkProtocol(protocol),
01582 this.jobID, vmName);
01583 synchronized (createdNodes) {
01584 createdNodes.add(node);
01585 }
01586
01587 logger.info("**** Mapping VirtualNode " + this.name + " with Node: " +
01588 url + " done");
01589 nodeCreated = true;
01590 nbCreatedNodes++;
01591
01592
01593 FileVector fw = fileTransferDeploy(node);
01594 this.fileTransferDeployedStatus.put(node.getNodeInformation().getName(),
01595 fw);
01596
01597 if (this.technicalService != null) {
01598 this.technicalService.apply(node);
01599 }
01600
01601
01602 notifyAll();
01603
01604
01605 notifyListeners(this, NodeCreationEvent.NODE_CREATED, node,
01606 nbCreatedNodes);
01607 }
01608
01609 private void register() {
01610 try {
01611 waitForAllNodesCreation();
01612
01613
01614
01615 ProActive.registerVirtualNode(this, registrationProtocol, false);
01616 } catch (NodeException e) {
01617 logger.error(e.getMessage());
01618 } catch (ProActiveException e) {
01619 e.printStackTrace();
01620 } catch (AlreadyBoundException e) {
01621 logger.warn("The Virtual Node name " + this.getName() +
01622 " is already bound in the registry", e);
01623 }
01624 }
01625
01626 private void setRegistrationValue(boolean value) {
01627 this.registration = value;
01628 }
01629
01630 private void startService(VirtualMachine vm) {
01631 UniversalService service = vm.getService();
01632
01633
01634
01635 UniversalService copyService = (UniversalService) makeDeepCopy(service);
01636 vm.setService(copyService);
01637
01638 if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
01639 int nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
01640
01641
01642
01643 if (nodeRequested != ((P2PDescriptorService) service).getMAX()) {
01644 increaseNumberOfNodes(nodeRequested);
01645
01646
01647
01648 } else {
01649 MAX_P2P = true;
01650 }
01651 } else if (service.getServiceName()
01652 .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
01653 int nodeRequested = ((SchedulerLookupService) service).getNodeNumber();
01654 increaseNumberOfNodes(nodeRequested);
01655 } else {
01656
01657 increaseNumberOfNodes(1);
01658 }
01659
01660 new ServiceThread(this, vm).start();
01661 }
01662
01663 private void startService(UniversalService service, VirtualMachine vm) {
01664
01665
01666 UniversalService copyService = (UniversalService) makeDeepCopy(service);
01667 vm.setService(copyService);
01668
01669 if (service.getServiceName().equals(P2PConstants.P2P_NODE_NAME)) {
01670 int nodeRequested = ((P2PDescriptorService) service).getNodeNumber();
01671
01672
01673
01674 if (nodeRequested != ((P2PDescriptorService) service).getMAX()) {
01675 increaseNumberOfNodes(nodeRequested);
01676
01677
01678
01679 } else {
01680 MAX_P2P = true;
01681 }
01682 } else if (service.getServiceName()
01683 .equals(SchedulerConstants.SCHEDULER_NODE_NAME)) {
01684 int nodeRequested = ((SchedulerLookupService) service).getNodeNumber();
01685 increaseNumberOfNodes(nodeRequested);
01686 } else {
01687
01688 increaseNumberOfNodes(1);
01689 }
01690
01691 new ServiceThread(this, vm).start();
01692 }
01693
01694 private void writeObject(java.io.ObjectOutputStream out)
01695 throws java.io.IOException {
01696 if (isActivated) {
01697 try {
01698 waitForAllNodesCreation();
01699 } catch (NodeException e) {
01700 out.defaultWriteObject();
01701
01702 return;
01703 }
01704 }
01705
01706 out.defaultWriteObject();
01707 }
01708
01709 private void readObject(java.io.ObjectInputStream in)
01710 throws java.io.IOException, ClassNotFoundException {
01711 in.defaultReadObject();
01712 this.proActiveRuntimeImpl = (ProActiveRuntimeImpl) ProActiveRuntimeImpl.getProActiveRuntime();
01713 rrThreadpool = Executors.newCachedThreadPool();
01714 }
01715
01716
01717
01718
01719
01724 public synchronized void nodeCreated(NodeCreationEvent event) {
01725 Node newNode = event.getNode();
01726 this.createdNodes.add(newNode);
01727 this.p2pNodes.add(newNode);
01728 nbCreatedNodes++;
01729 nodeCreated = true;
01730
01731 if (this.technicalService != null) {
01732 this.technicalService.apply(newNode);
01733 }
01734
01735
01736 notifyAll();
01737 notifyListeners(this, NodeCreationEvent.NODE_CREATED, newNode,
01738 nbCreatedNodes);
01739 }
01740
01744 public void addP2PNodesLookup(P2PNodeLookup nodesLookup) {
01745 this.p2pNodeslookupList.add(nodesLookup);
01746 P2P_LOGGER.debug("A P2P nodes lookup added to the vn: " + this.name);
01747 }
01748
01752 public FileVector fileTransferRetrieve()
01753 throws IOException, ProActiveException {
01754 Node[] nodes;
01755 FileVector fileVector = new FileVector();
01756
01757 try {
01758 nodes = getNodes();
01759 } catch (NodeException e) {
01760 throw new ProActiveException(
01761 "Can not Retrieve Files, since no nodes where created for Virtual Node" +
01762 this.getName());
01763 }
01764
01765 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01766 FILETRANSFER_LOGGER.debug("Retrieving files for " + nodes.length +
01767 " node(s).");
01768 }
01769
01770
01771
01772
01773
01774
01775 for (int i = 0; i < nodes.length; i++) {
01776 String vmName = nodes[i].getNodeInformation().getDescriptorVMName();
01777 VirtualMachine vm = getVirtualMachine(vmName);
01778
01779 if (vm == null) {
01780 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01781 FILETRANSFER_LOGGER.info("No VM found with name: " +
01782 vmName + " for node: " +
01783 nodes[i].getNodeInformation().getName());
01784 }
01785
01786 continue;
01787 }
01788
01789
01790 ExternalProcess eProcess = vm.getProcess();
01791
01792 if (eProcess == null) {
01793 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01794 FILETRANSFER_LOGGER.debug("No Process linked with VM: " +
01795 vmName + " for node: " +
01796 nodes[i].getNodeInformation().getName());
01797 }
01798
01799 continue;
01800 }
01801
01802 FileTransferWorkShop ftwRetrieve = eProcess.getFileTransferWorkShopRetrieve();
01803 FileDescription[] fd = ftwRetrieve.getAllFileDescriptions();
01804
01805 File[] srcFile = new File[fd.length];
01806 File[] dstFile = new File[fd.length];
01807
01808 for (int j = 0; j < fd.length; j++) {
01809 srcFile[j] = new File(ftwRetrieve.getAbsoluteSrcPath(fd[j]));
01810 dstFile[j] = new File(ftwRetrieve.getAbsoluteDstPath(fd[j]) +
01811 "-" + nodes[i].getNodeInformation().getName());
01812 }
01813
01814 long init = System.currentTimeMillis();
01815 fileVector.add(FileTransfer.pullFiles(nodes[i], srcFile, dstFile,
01816 fileBlockSize, overlapping));
01817
01818 if (FILETRANSFER_LOGGER.isDebugEnabled()) {
01819 FILETRANSFER_LOGGER.debug("Returned pullFiles in:" +
01820 (System.currentTimeMillis() - init));
01821 }
01822 }
01823
01824 return fileVector;
01825 }
01826
01835 private FileVector fileTransferDeploy(Node node) {
01836 FileVector fileWrapper = new FileVector();
01837
01838 if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01839 DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01840 "File Transfer Deploy files for node" +
01841 node.getNodeInformation().getName());
01842 }
01843
01844 String vmName = node.getNodeInformation().getDescriptorVMName();
01845 VirtualMachine vm = getVirtualMachine(vmName);
01846
01847 if (vm == null) {
01848 if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01849 DEPLOYMENT_FILETRANSFER_LOGGER.debug("No VM found with name: " +
01850 vmName + " for node: " +
01851 node.getNodeInformation().getName());
01852 }
01853
01854 return fileWrapper;
01855 }
01856
01857
01858 ExternalProcess eProcess = vm.getProcess();
01859
01860 if (eProcess == null) {
01861 if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01862 DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01863 "No Process linked with VM: " + vmName + " for node: " +
01864 node.getNodeInformation().getName());
01865 }
01866
01867 return fileWrapper;
01868 }
01869
01870
01871 if (!eProcess.isRequiredFileTransferDeployOnNodeCreation()) {
01872 if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01873 DEPLOYMENT_FILETRANSFER_LOGGER.debug(
01874 "No ProActive FileTransfer API is required for this node.");
01875 }
01876
01877 return fileWrapper;
01878 }
01879
01880 FileTransferWorkShop ftwDeploy = eProcess.getFileTransferWorkShopDeploy();
01881 FileDescription[] fd = ftwDeploy.getAllFileDescriptions();
01882
01883 if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01884 DEPLOYMENT_FILETRANSFER_LOGGER.debug("Transfering " + fd.length +
01885 " file(s)");
01886 }
01887
01888 File[] filesSrc = new File[fd.length];
01889 File[] filesDst = new File[fd.length];
01890
01891 for (int j = 0; j < fd.length; j++) {
01892 File srcFile = new File(ftwDeploy.getAbsoluteSrcPath(fd[j]));
01893 File dstFile = new File(ftwDeploy.getAbsoluteDstPath(fd[j]));
01894 filesSrc[j] = srcFile;
01895 filesDst[j] = dstFile;
01896 }
01897
01898 try {
01899 fileWrapper.add(FileTransfer.pushFiles(node, filesSrc, filesDst,
01900 fileBlockSize, overlapping));
01901 } catch (Exception e) {
01902 logger.error("Unable to pushFile files to node " +
01903 node.getNodeInformation().getName());
01904
01905 if (DEPLOYMENT_FILETRANSFER_LOGGER.isDebugEnabled()) {
01906 DEPLOYMENT_FILETRANSFER_LOGGER.debug(e.getStackTrace());
01907 }
01908 }
01909
01910 return fileWrapper;
01911 }
01912
01913 public void setFileTransferParams(int fileBlockSize, int overlapping) {
01914 this.fileBlockSize = fileBlockSize;
01915 this.overlapping = overlapping;
01916 }
01917
01918 public void addTechnicalService(TechnicalService technicalWrapper) {
01919 this.technicalService = technicalWrapper;
01920 }
01921
01922 public ArrayList getVirtualMachines() {
01923 return this.virtualMachines;
01924 }
01925
01926 }