00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.mpi.control;
00032
00033 import org.apache.log4j.Logger;
00034
00035 import org.objectweb.proactive.ActiveObjectCreationException;
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.ProActiveException;
00038 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00039 import org.objectweb.proactive.core.group.spmd.ProSPMD;
00040 import org.objectweb.proactive.core.mop.ClassNotReifiableException;
00041 import org.objectweb.proactive.core.node.Node;
00042 import org.objectweb.proactive.core.node.NodeException;
00043 import org.objectweb.proactive.core.util.log.Loggers;
00044 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00045 import org.objectweb.proactive.filetransfer.*;
00046 import org.objectweb.proactive.mpi.*;
00047
00048 import java.io.*;
00049
00050 import java.util.ArrayList;
00051 import java.util.Hashtable;
00052 import java.util.Iterator;
00053
00054
00055 public class ProActiveMPIManager implements Serializable {
00056 private final static Logger MPI_IMPL_LOGGER = ProActiveLogger.getLogger(Loggers.MPI_CONTROL_MANAGER);
00057 public final static String DEFAULT_LIBRARY_NAME = "libProActiveMPIComm.so";
00058
00060 private static int currentJobNumber = 0;
00061
00063 private ArrayList mpiSpmdList;
00064
00065
00066 private Hashtable proxyMap;
00067
00068
00069 private Hashtable spmdProxyMap;
00070
00071
00072 private Hashtable userProxyMap;
00073
00074
00075 private int[] ackToStart;
00076
00077
00078 private int[] ackToRecv;
00079
00080 public ProActiveMPIManager() {
00081 }
00082
00083 public void deploy(ArrayList spmdList) {
00084 this.mpiSpmdList = spmdList;
00085 this.proxyMap = new Hashtable();
00086 this.spmdProxyMap = new Hashtable();
00087 this.userProxyMap = new Hashtable();
00088 this.ackToStart = new int[spmdList.size()];
00089 this.ackToRecv = new int[spmdList.size()];
00090
00091
00092 try {
00093 for (int i = 0; i < spmdList.size(); i++) {
00094 VirtualNode vn = (VirtualNode) ((MPISpmd) spmdList.get(currentJobNumber)).getVn();
00095 Node[] allNodes;
00096 allNodes = vn.getNodes();
00097 String remoteLibraryPath = ((MPISpmd) spmdList.get(currentJobNumber)).getRemoteLibraryPath();
00098
00099 ClassLoader cl = this.getClass().getClassLoader();
00100 java.net.URL u = cl.getResource(
00101 "org/objectweb/proactive/mpi/control/" +
00102 DEFAULT_LIBRARY_NAME);
00103
00104 File remoteDest = new File(remoteLibraryPath +
00105 "/libProActiveMPIComm.so");
00106 File localSource = new File(u.getFile());
00107
00108 FileVector filePushed = FileTransfer.pushFile(allNodes[0],
00109 localSource, remoteDest);
00110 filePushed.waitForAll();
00111
00112 ackToStart[i] = allNodes.length - 1;
00113 ackToRecv[i] = allNodes.length - 1;
00114 Object[][] params = new Object[allNodes.length][];
00115
00116
00117
00118 for (int j = 0; j < params.length; j++) {
00119 params[j] = new Object[] {
00120 "ProActiveMPIComm",
00121 (ProActiveMPIManager) ProActive.getStubOnThis(),
00122 new Integer(currentJobNumber)
00123 };
00124 }
00125 MPI_IMPL_LOGGER.info("[MANAGER] Create SPMD Proxy for jobID: " +
00126 currentJobNumber);
00127 ProActiveMPICoupling spmdCouplingProxy = (ProActiveMPICoupling) ProSPMD.newSPMDGroup(ProActiveMPICoupling.class.getName(),
00128 params, vn);
00129
00130
00131 this.spmdProxyMap.put(new Integer(currentJobNumber),
00132 spmdCouplingProxy);
00133 MPI_IMPL_LOGGER.info("[MANAGER] Initialize remote environments");
00134
00135 Ack ack = spmdCouplingProxy.initEnvironment();
00136 ProActive.waitFor(ack);
00137 MPI_IMPL_LOGGER.info(
00138 "[MANAGER] Activate remote thread for communication");
00139
00140 spmdCouplingProxy.createRecvThread();
00141
00142 this.proxyMap.put(new Integer(currentJobNumber),
00143 new ProActiveMPICoupling[allNodes.length]);
00144 this.userProxyMap.put(new Integer(currentJobNumber),
00145 new Hashtable());
00146
00147 currentJobNumber++;
00148 }
00149 } catch (NodeException e) {
00150 e.printStackTrace();
00151 } catch (ClassNotReifiableException e) {
00152 e.printStackTrace();
00153 } catch (ActiveObjectCreationException e) {
00154 e.printStackTrace();
00155 } catch (ClassNotFoundException e) {
00156 e.printStackTrace();
00157 } catch (IOException e) {
00158 e.printStackTrace();
00159 } catch (ProActiveException e) {
00160 e.printStackTrace();
00161 }
00162 }
00163
00164 public void register(int jobID, int rank) {
00165
00166
00167 if (ackToRecv[jobID] == 0) {
00168 for (int i = 0; i < currentJobNumber; i++) {
00169 if (ackToRecv[i] != 0) {
00170 return;
00171 }
00172 }
00173 for (int i = 0; i < currentJobNumber; i++) {
00174 ((ProActiveMPICoupling) spmdProxyMap.get(new Integer(i))).wakeUpThread();
00175 }
00176 } else {
00177
00178 ackToRecv[jobID]--;
00179 }
00180 }
00181
00182
00183 public void register(int jobID, int rank,
00184 ProActiveMPICoupling activeProxyComm) {
00185 if (jobID < currentJobNumber) {
00186 MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00187 " register mpi process #" + rank);
00188
00189 ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(jobID)))[rank] = activeProxyComm;
00190
00191
00192 boolean deployUserSpmdObject = true;
00193 for (int i = 0;
00194 i < ((ProActiveMPICoupling[]) this.proxyMap.get(
00195 new Integer(jobID))).length; i++) {
00196 if (((ProActiveMPICoupling[]) this.proxyMap.get(
00197 new Integer(jobID)))[i] == null) {
00198
00199 deployUserSpmdObject = false;
00200 }
00201 }
00202
00203
00204 if (deployUserSpmdObject) {
00205
00206 Node[] orderedNodes = new Node[((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00207 jobID))).length];
00208 for (int i = 0; i < orderedNodes.length; i++) {
00209 try {
00210 orderedNodes[i] = ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00211 jobID)))[i].getNode();
00212 } catch (NodeException e) {
00213 e.printStackTrace();
00214 }
00215 }
00216 deployUserSpmdClasses(jobID, orderedNodes);
00217 deployUserClasses(jobID, orderedNodes);
00218 }
00219
00220 for (int i = 0; i < currentJobNumber; i++) {
00221 int jobListLength = ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00222 i))).length;
00223 for (int j = 0; j < jobListLength; j++) {
00224 if (((ProActiveMPICoupling[]) this.proxyMap.get(
00225 new Integer(i)))[j] == null) {
00226 return;
00227 }
00228 }
00229 }
00230
00231 for (int i = 0; i < currentJobNumber; i++) {
00232
00233
00234 ((ProActiveMPICoupling) spmdProxyMap.get(new Integer(i))).notifyProxy(this.proxyMap,
00235 this.spmdProxyMap, this.userProxyMap);
00236 }
00237 } else {
00238 throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00239 jobID);
00240 }
00241 }
00242
00243 public void deployUserClasses(int jobID, Node[] orderedNodes) {
00244
00245
00246 ArrayList classes = ((MPISpmd) mpiSpmdList.get(jobID)).getClasses();
00247 if (!classes.isEmpty()) {
00248 MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00249 " deploy user classes");
00250
00251 Hashtable paramsTable = ((MPISpmd) mpiSpmdList.get(jobID)).getClassesParams();
00252 Hashtable userProxyList = new Hashtable();
00253 Iterator iterator = classes.iterator();
00254 while (iterator.hasNext()) {
00255 String cl = (String) iterator.next();
00256 try {
00257 Object[] parameters = (Object[]) paramsTable.get(cl);
00258 Object[] proxyList = new Object[parameters.length];
00259 for (int i = 0; i < parameters.length; i++) {
00260 Object[] params = (Object[]) parameters[i];
00261 if (params != null) {
00262 proxyList[i] = ProActive.newActive(cl, params,
00263 orderedNodes[i]);
00264 }
00265 }
00266 userProxyList.put(cl, proxyList);
00267 this.userProxyMap.put(new Integer(jobID), userProxyList);
00268 } catch (ActiveObjectCreationException e) {
00269 e.printStackTrace();
00270 } catch (NodeException e) {
00271 e.printStackTrace();
00272 }
00273 }
00274 }
00275 }
00276
00277 public void deployUserSpmdClasses(int jobID, Node[] orderedNodes) {
00278
00279
00280 ArrayList classes = ((MPISpmd) mpiSpmdList.get(jobID)).getSpmdClasses();
00281 if (!classes.isEmpty()) {
00282 MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00283 " deploy user SPMD classes");
00284
00285 Hashtable paramsTable = ((MPISpmd) mpiSpmdList.get(jobID)).getSpmdClassesParams();
00286 Hashtable userProxyList = new Hashtable();
00287 Iterator iterator = classes.iterator();
00288 while (iterator.hasNext()) {
00289 String cl = (String) iterator.next();
00290 try {
00291 ArrayList parameters = (ArrayList) paramsTable.remove(cl);
00292
00293
00294 if (parameters.get(0) != null) {
00295 Object[] params = (Object[]) parameters.get(0);
00296 Object[][] p = new Object[orderedNodes.length][];
00297 for (int i = 0; i < orderedNodes.length; i++) {
00298 p[i] = params;
00299 }
00300 userProxyList.put(cl,
00301 ProSPMD.newSPMDGroup(cl, p, orderedNodes));
00302 }
00303 else if (parameters.get(1) != null) {
00304 Object[][] params = (Object[][]) parameters.get(1);
00305 userProxyList.put(cl,
00306 ProSPMD.newSPMDGroup(cl, params, orderedNodes));
00307 }
00308 else {
00309 Object[][] params = new Object[orderedNodes.length][];
00310 userProxyList.put(cl,
00311 ProSPMD.newSPMDGroup(cl, params, orderedNodes));
00312 }
00313 this.userProxyMap.put(new Integer(jobID), userProxyList);
00314 } catch (ClassNotReifiableException e) {
00315 e.printStackTrace();
00316 } catch (ActiveObjectCreationException e) {
00317 e.printStackTrace();
00318 } catch (NodeException e) {
00319 e.printStackTrace();
00320 } catch (ClassNotFoundException e) {
00321 e.printStackTrace();
00322 }
00323 }
00324 }
00325 }
00326
00327 public void register(int jobID) {
00328
00329 if (ackToStart[jobID] == 0) {
00330 MPISpmd mpiSpmd = (MPISpmd) mpiSpmdList.get(jobID);
00331 MPIResult res = mpiSpmd.startMPI();
00332
00333
00334
00335 } else {
00336 ackToStart[jobID]--;
00337 }
00338 }
00339
00340 public void unregister(int jobID, int rank) {
00341 if (jobID < currentJobNumber) {
00342 ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(jobID)))[rank] = null;
00343 MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00344 " unregister mpi process #" + rank);
00345 for (int i = 0; i < currentJobNumber; i++) {
00346 int jobListLength = ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00347 i))).length;
00348 for (int j = 0; j < jobListLength; j++) {
00349 if (((ProActiveMPICoupling[]) this.proxyMap.get(
00350 new Integer(i)))[j] != null) {
00351 return;
00352 }
00353 }
00354 }
00355
00356 for (int i = 0; i < this.mpiSpmdList.size(); i++) {
00357 ((VirtualNode) ((MPISpmd) this.mpiSpmdList.get(i)).getVn()).killAll(false);
00358 }
00359 System.exit(0);
00360 } else {
00361 throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00362 jobID);
00363 }
00364 }
00365
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413 }