org/objectweb/proactive/mpi/control/ProActiveMPIManager.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.mpi.control;
00032 
00033 import org.apache.log4j.Logger;
00034 
00035 import org.objectweb.proactive.ActiveObjectCreationException;
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.ProActiveException;
00038 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00039 import org.objectweb.proactive.core.group.spmd.ProSPMD;
00040 import org.objectweb.proactive.core.mop.ClassNotReifiableException;
00041 import org.objectweb.proactive.core.node.Node;
00042 import org.objectweb.proactive.core.node.NodeException;
00043 import org.objectweb.proactive.core.util.log.Loggers;
00044 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00045 import org.objectweb.proactive.filetransfer.*;
00046 import org.objectweb.proactive.mpi.*;
00047 
00048 import java.io.*;
00049 
00050 import java.util.ArrayList;
00051 import java.util.Hashtable;
00052 import java.util.Iterator;
00053 
00054 
00055 public class ProActiveMPIManager implements Serializable {
00056     private final static Logger MPI_IMPL_LOGGER = ProActiveLogger.getLogger(Loggers.MPI_CONTROL_MANAGER);
00057     public final static String DEFAULT_LIBRARY_NAME = "libProActiveMPIComm.so";
00058 
00060     private static int currentJobNumber = 0;
00061 
00063     private ArrayList mpiSpmdList;
00064 
00065     /*  Hashtable<jobID, ProActiveCoupling []> */
00066     private Hashtable proxyMap;
00067 
00068     /*  Hashtable<jobID, ProSPMD ProActiveMPICoupling> */
00069     private Hashtable spmdProxyMap;
00070 
00071     /*  Hashtable<jobID, Hashtable<class, ProSPMD user class || user proxy array>> */
00072     private Hashtable userProxyMap;
00073 
00074     /*  ackToStart[jobID] = number of proxy registered */
00075     private int[] ackToStart;
00076 
00077     /*  ackToRecvlist[jobID] = number of proxy ready to begin activities */
00078     private int[] ackToRecv;
00079 
00080     public ProActiveMPIManager() {
00081     }
00082 
00083     public void deploy(ArrayList spmdList) {
00084         this.mpiSpmdList = spmdList;
00085         this.proxyMap = new Hashtable();
00086         this.spmdProxyMap = new Hashtable();
00087         this.userProxyMap = new Hashtable();
00088         this.ackToStart = new int[spmdList.size()];
00089         this.ackToRecv = new int[spmdList.size()];
00090 
00091         // loop on the MPISpmd object list
00092         try {
00093             for (int i = 0; i < spmdList.size(); i++) {
00094                 VirtualNode vn = (VirtualNode) ((MPISpmd) spmdList.get(currentJobNumber)).getVn();
00095                 Node[] allNodes;
00096                 allNodes = vn.getNodes();
00097                 String remoteLibraryPath = ((MPISpmd) spmdList.get(currentJobNumber)).getRemoteLibraryPath();
00098 
00099                 ClassLoader cl = this.getClass().getClassLoader();
00100                 java.net.URL u = cl.getResource(
00101                         "org/objectweb/proactive/mpi/control/" +
00102                         DEFAULT_LIBRARY_NAME);
00103 
00104                 File remoteDest = new File(remoteLibraryPath +
00105                         "/libProActiveMPIComm.so");
00106                 File localSource = new File(u.getFile());
00107 
00108                 FileVector filePushed = FileTransfer.pushFile(allNodes[0],
00109                         localSource, remoteDest);
00110                 filePushed.waitForAll();
00111 
00112                 ackToStart[i] = allNodes.length - 1;
00113                 ackToRecv[i] = allNodes.length - 1;
00114                 Object[][] params = new Object[allNodes.length][];
00115 
00116                 // create parameters
00117                 // "Comm" is the name of the JNI Library
00118                 for (int j = 0; j < params.length; j++) {
00119                     params[j] = new Object[] {
00120                             "ProActiveMPIComm",
00121                             (ProActiveMPIManager) ProActive.getStubOnThis(),
00122                             new Integer(currentJobNumber)
00123                         };
00124                 }
00125                 MPI_IMPL_LOGGER.info("[MANAGER] Create SPMD Proxy for jobID: " +
00126                     currentJobNumber);
00127                 ProActiveMPICoupling spmdCouplingProxy = (ProActiveMPICoupling) ProSPMD.newSPMDGroup(ProActiveMPICoupling.class.getName(),
00128                         params, vn);
00129 
00130                 // create ProSPMD proxy
00131                 this.spmdProxyMap.put(new Integer(currentJobNumber),
00132                     spmdCouplingProxy);
00133                 MPI_IMPL_LOGGER.info("[MANAGER] Initialize remote environments");
00134                 // initialize queues & semaphores and start thread
00135                 Ack ack = spmdCouplingProxy.initEnvironment();
00136                 ProActive.waitFor(ack);
00137                 MPI_IMPL_LOGGER.info(
00138                     "[MANAGER] Activate remote thread for communication");
00139                 // once environment is ready, start thread to get mpi process rank  
00140                 spmdCouplingProxy.createRecvThread();
00141                 // initialize joblist & and userProxyList table
00142                 this.proxyMap.put(new Integer(currentJobNumber),
00143                     new ProActiveMPICoupling[allNodes.length]);
00144                 this.userProxyMap.put(new Integer(currentJobNumber),
00145                     new Hashtable());
00146 
00147                 currentJobNumber++;
00148             }
00149         } catch (NodeException e) {
00150             e.printStackTrace();
00151         } catch (ClassNotReifiableException e) {
00152             e.printStackTrace();
00153         } catch (ActiveObjectCreationException e) {
00154             e.printStackTrace();
00155         } catch (ClassNotFoundException e) {
00156             e.printStackTrace();
00157         } catch (IOException e) {
00158             e.printStackTrace();
00159         } catch (ProActiveException e) {
00160             e.printStackTrace();
00161         }
00162     }
00163 
00164     public void register(int jobID, int rank) {
00165         // ack of corresponding job is null means that the 
00166         // job is ready to recv message from another job
00167         if (ackToRecv[jobID] == 0) {
00168             for (int i = 0; i < currentJobNumber; i++) {
00169                 if (ackToRecv[i] != 0) {
00170                     return;
00171                 }
00172             }
00173             for (int i = 0; i < currentJobNumber; i++) {
00174                 ((ProActiveMPICoupling) spmdProxyMap.get(new Integer(i))).wakeUpThread();
00175             }
00176         } else {
00177             // we decrease the number of daemon rest
00178             ackToRecv[jobID]--;
00179         }
00180     }
00181 
00182     // insert Comm Active Object at the correct location
00183     public void register(int jobID, int rank,
00184         ProActiveMPICoupling activeProxyComm) {
00185         if (jobID < currentJobNumber) {
00186             MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00187                 " register mpi process #" + rank);
00188 
00189             ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(jobID)))[rank] = activeProxyComm;
00190 
00191             // test if this job is totally registered
00192             boolean deployUserSpmdObject = true;
00193             for (int i = 0;
00194                     i < ((ProActiveMPICoupling[]) this.proxyMap.get(
00195                         new Integer(jobID))).length; i++) {
00196                 if (((ProActiveMPICoupling[]) this.proxyMap.get(
00197                             new Integer(jobID)))[i] == null) {
00198                     // not totally registered
00199                     deployUserSpmdObject = false;
00200                 }
00201             }
00202 
00203             //  all proxy are registered
00204             if (deployUserSpmdObject) {
00205                 // create a new array of nodes well ordered
00206                 Node[] orderedNodes = new Node[((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00207                             jobID))).length];
00208                 for (int i = 0; i < orderedNodes.length; i++) {
00209                     try {
00210                         orderedNodes[i] = ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00211                                     jobID)))[i].getNode();
00212                     } catch (NodeException e) {
00213                         e.printStackTrace();
00214                     }
00215                 }
00216                 deployUserSpmdClasses(jobID, orderedNodes);
00217                 deployUserClasses(jobID, orderedNodes);
00218             }
00219 
00220             for (int i = 0; i < currentJobNumber; i++) {
00221                 int jobListLength = ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00222                             i))).length;
00223                 for (int j = 0; j < jobListLength; j++) {
00224                     if (((ProActiveMPICoupling[]) this.proxyMap.get(
00225                                 new Integer(i)))[j] == null) {
00226                         return;
00227                     }
00228                 }
00229             }
00230 
00231             for (int i = 0; i < currentJobNumber; i++) {
00232                 // send the table of User ProSpmd object to all the Proxy 
00233                 //     ((ProActiveMPICoupling) proxySpmdTabByJob.get(new Integer(i))).setUserProSPMDList(this.userSpmdTabByJob);
00234                 ((ProActiveMPICoupling) spmdProxyMap.get(new Integer(i))).notifyProxy(this.proxyMap,
00235                     this.spmdProxyMap, this.userProxyMap);
00236             }
00237         } else {
00238             throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00239                 jobID);
00240         }
00241     }
00242 
00243     public void deployUserClasses(int jobID, Node[] orderedNodes) {
00244         //    get the list of classes to instanciate for this MPISpmd object
00245         //        and send it as parameter.
00246         ArrayList classes = ((MPISpmd) mpiSpmdList.get(jobID)).getClasses();
00247         if (!classes.isEmpty()) {
00248             MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00249                 " deploy user classes");
00250             // get the table of parameters
00251             Hashtable paramsTable = ((MPISpmd) mpiSpmdList.get(jobID)).getClassesParams();
00252             Hashtable userProxyList = new Hashtable();
00253             Iterator iterator = classes.iterator();
00254             while (iterator.hasNext()) {
00255                 String cl = (String) iterator.next();
00256                 try {
00257                     Object[] parameters = (Object[]) paramsTable.get(cl);
00258                     Object[] proxyList = new Object[parameters.length];
00259                     for (int i = 0; i < parameters.length; i++) {
00260                         Object[] params = (Object[]) parameters[i];
00261                         if (params != null) {
00262                             proxyList[i] = ProActive.newActive(cl, params,
00263                                     orderedNodes[i]);
00264                         }
00265                     }
00266                     userProxyList.put(cl, proxyList);
00267                     this.userProxyMap.put(new Integer(jobID), userProxyList);
00268                 } catch (ActiveObjectCreationException e) {
00269                     e.printStackTrace();
00270                 } catch (NodeException e) {
00271                     e.printStackTrace();
00272                 }
00273             }
00274         }
00275     }
00276 
00277     public void deployUserSpmdClasses(int jobID, Node[] orderedNodes) {
00278         //  get the list of SPMD class to instanciate for this MPISpmd object
00279         //        and send it as parameter.
00280         ArrayList classes = ((MPISpmd) mpiSpmdList.get(jobID)).getSpmdClasses();
00281         if (!classes.isEmpty()) {
00282             MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00283                 " deploy user SPMD classes");
00284             // get the table of parameters
00285             Hashtable paramsTable = ((MPISpmd) mpiSpmdList.get(jobID)).getSpmdClassesParams();
00286             Hashtable userProxyList = new Hashtable();
00287             Iterator iterator = classes.iterator();
00288             while (iterator.hasNext()) {
00289                 String cl = (String) iterator.next();
00290                 try {
00291                     ArrayList parameters = (ArrayList) paramsTable.remove(cl);
00292 
00293                     // simple array parameter
00294                     if (parameters.get(0) != null) {
00295                         Object[] params = (Object[]) parameters.get(0);
00296                         Object[][] p = new Object[orderedNodes.length][];
00297                         for (int i = 0; i < orderedNodes.length; i++) {
00298                             p[i] = params;
00299                         }
00300                         userProxyList.put(cl,
00301                             ProSPMD.newSPMDGroup(cl, p, orderedNodes));
00302                     } // matrix parameter 
00303                     else if (parameters.get(1) != null) {
00304                         Object[][] params = (Object[][]) parameters.get(1);
00305                         userProxyList.put(cl,
00306                             ProSPMD.newSPMDGroup(cl, params, orderedNodes));
00307                     } // no parameters 
00308                     else {
00309                         Object[][] params = new Object[orderedNodes.length][];
00310                         userProxyList.put(cl,
00311                             ProSPMD.newSPMDGroup(cl, params, orderedNodes));
00312                     }
00313                     this.userProxyMap.put(new Integer(jobID), userProxyList);
00314                 } catch (ClassNotReifiableException e) {
00315                     e.printStackTrace();
00316                 } catch (ActiveObjectCreationException e) {
00317                     e.printStackTrace();
00318                 } catch (NodeException e) {
00319                     e.printStackTrace();
00320                 } catch (ClassNotFoundException e) {
00321                     e.printStackTrace();
00322                 } // end_try
00323             } // end_while
00324         } // end_if_classes
00325     }
00326 
00327     public void register(int jobID) {
00328         // ack of job is null means we can start MPI application
00329         if (ackToStart[jobID] == 0) {
00330             MPISpmd mpiSpmd = (MPISpmd) mpiSpmdList.get(jobID);
00331             MPIResult res = mpiSpmd.startMPI();
00332 
00333             // the prinln generate a deadlock
00334             //System.out.println(mpiSpmd);
00335         } else {
00336             ackToStart[jobID]--;
00337         }
00338     }
00339 
00340     public void unregister(int jobID, int rank) {
00341         if (jobID < currentJobNumber) {
00342             ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(jobID)))[rank] = null;
00343             MPI_IMPL_LOGGER.info("[MANAGER] JobID #" + jobID +
00344                 " unregister mpi process #" + rank);
00345             for (int i = 0; i < currentJobNumber; i++) {
00346                 int jobListLength = ((ProActiveMPICoupling[]) this.proxyMap.get(new Integer(
00347                             i))).length;
00348                 for (int j = 0; j < jobListLength; j++) {
00349                     if (((ProActiveMPICoupling[]) this.proxyMap.get(
00350                                 new Integer(i)))[j] != null) {
00351                         return;
00352                     }
00353                 }
00354             }
00355 
00356             for (int i = 0; i < this.mpiSpmdList.size(); i++) {
00357                 ((VirtualNode) ((MPISpmd) this.mpiSpmdList.get(i)).getVn()).killAll(false);
00358             }
00359             System.exit(0);
00360         } else {
00361             throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00362                 jobID);
00363         }
00364     }
00365 
00369     //    public void sendMessageToComm(int jobID, MessageRecv m_r) {
00370     //        int dest = m_r.getDest();
00371     //        if (jobID < proxyTabByJob.size()) {
00372     //            ProActiveMPICoupling[] tabOfComm = (ProActiveMPICoupling[]) proxyTabByJob.get(new Integer(
00373     //                        jobID));
00374     //            if ((dest < tabOfComm.length) && (tabOfComm[dest] != null)) {
00375     //                tabOfComm[dest].receiveFromMpi(m_r);
00376     //
00377     //                //                System.out.println(
00378     //                //                    "[JOBMANAGER]sendMessageToComm> One message received from : " +
00379     //                //                    m_r.getSrc() + " Destinator is :" + dest + " Job: " +
00380     //                //                    jobID);
00381     //                // System.out.println(" Message is :" + m_r);
00382     //            } else {
00383     //                throw new IndexOutOfBoundsException(
00384     //                    " ActiveProxyComm destinator " + dest + " is unreachable!");
00385     //            }
00386     //        } else {
00387     //            throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00388     //                jobID);
00389     //        }
00390     //    }
00391     //
00392     //    public void allSendMessageToComm(int jobID, MessageRecv m_r) {
00393     //        if (jobID < proxyTabByJob.size()) {
00394     //            ProActiveMPICoupling[] allDest = (ProActiveMPICoupling[]) proxyTabByJob.get(new Integer(
00395     //                        jobID));
00396     //            for (int i = 0; i < allDest.length; i++) {
00397     //                if (allDest[i] != null) {
00398     //                    allDest[i].receiveFromMpi(m_r);
00399     //                } else {
00400     //                    System.out.println(
00401     //                        "[JOBMANAGER]allSendMessageToComm> on destinator is null  : " +
00402     //                        i + " Job: " + jobID);
00403     //                }
00404     //            }
00405     //            System.out.println("[JOBMANAGER]allSendMessageToComm>  to Job: " +
00406     //                jobID);
00407     //        } else {
00408     //            throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00409     //                jobID);
00410     //        }
00411     //    }
00412     //    
00413 }

Generated on Mon Jan 22 15:16:11 2007 for ProActive by  doxygen 1.5.1