org/objectweb/proactive/mpi/control/ProActiveMPICoupling.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.mpi.control;
00032 
00033 import org.objectweb.proactive.ActiveObjectCreationException;
00034 import org.objectweb.proactive.Body;
00035 import org.objectweb.proactive.InitActive;
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.group.Group;
00038 import org.objectweb.proactive.core.group.ProActiveGroup;
00039 import org.objectweb.proactive.core.node.Node;
00040 import org.objectweb.proactive.core.node.NodeException;
00041 import org.objectweb.proactive.core.node.NodeFactory;
00042 
00043 import java.io.IOException;
00044 import java.io.Serializable;
00045 
00046 import java.lang.reflect.InvocationTargetException;
00047 import java.lang.reflect.Method;
00048 
00049 import java.util.Hashtable;
00050 
00051 
00052 public class ProActiveMPICoupling implements Serializable, InitActive {
00053 
00055     private ProActiveMPIManager manager;
00056 
00058     private ProActiveMPIComm target;
00059 
00060     /*  Hashtable<jobID, Hashtable<class, ProSPMD user class || user proxy array>> */
00061     private Hashtable userProxyMap;
00062 
00063     // job # managed by the Job Manager
00064     private int jobID;
00065 
00066     /*  Hashtable<jobID, ProActiveCoupling []> */
00067     private static Hashtable proxyMap;
00068 
00069     /*  Hashtable<jobID, ProSPMD ProActiveMPICoupling> */
00070     private Hashtable spmdProxyMap;
00071 
00075     public ProActiveMPICoupling() {
00076     }
00077 
00078     public ProActiveMPICoupling(String libName, ProActiveMPIManager manager,
00079         Integer jobNum)
00080         throws ActiveObjectCreationException, NodeException, 
00081             ClassNotFoundException, InstantiationException, 
00082             IllegalAccessException {
00083         this.manager = manager;
00084         this.jobID = jobNum.intValue();
00085         target = new ProActiveMPIComm(libName,
00086                 ProActive.getBodyOnThis().getID().hashCode());
00087     }
00088 
00089     public void initActivity(Body body) {
00090         // update proxy ref 
00091         this.target.setMyProxy((ProActiveMPICoupling) ProActive.getStubOnThis(),
00092             this.manager, this.jobID);
00093     }
00094 
00098     public void registerProcess(int rank) {
00099         this.manager.register(this.jobID, rank,
00100             (ProActiveMPICoupling) ProActive.getStubOnThis());
00101     }
00102 
00103     public void register() {
00104         this.manager.register(this.jobID);
00105     }
00106 
00107     public void register(int rank) {
00108         this.manager.register(this.jobID, rank);
00109     }
00110 
00111     public void unregisterProcess(int rank) {
00112         this.manager.unregister(this.jobID, rank);
00113     }
00114 
00115     public void receiveFromMpi(ProActiveMPIData m_r) {
00116         this.target.receiveFromMpi(m_r);
00117     }
00118 
00119     public void receiveFromProActive(ProActiveMPIData m_r) {
00120         this.target.receiveFromProActive(m_r);
00121     }
00122 
00123     public void sendToMpi(int jobID, ProActiveMPIData m_r)
00124         throws IOException {
00125         int dest = m_r.getDest();
00126         if (jobID < proxyMap.size()) {
00127             ProActiveMPICoupling[] arrayComm = (ProActiveMPICoupling[]) proxyMap.get(new Integer(
00128                         jobID));
00129             if ((dest < arrayComm.length) && (arrayComm[dest] != null)) {
00130                 arrayComm[dest].receiveFromMpi(m_r);
00131             } else {
00132                 throw new IndexOutOfBoundsException(
00133                     " ActiveProxyComm destinator " + dest + " is unreachable!");
00134             }
00135         } else {
00136             throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00137                 jobID);
00138         }
00139     }
00140 
00141     public Ack sendToMpi(int jobID, ProActiveMPIData m_r, boolean b)
00142         throws IOException {
00143         this.sendToMpi(jobID, m_r);
00144         return new Ack();
00145     }
00146 
00147     public static void MPISend(byte[] buf, int count, int datatype, int dest,
00148         int tag, int jobID) {
00149         //create Message to send and use the native method
00150         ProActiveMPIData m_r = new ProActiveMPIData();
00151 
00152         m_r.setData(buf);
00153         m_r.setCount(count);
00154         m_r.setDatatype(datatype);
00155         m_r.setDest(dest);
00156         m_r.setTag(tag);
00157         m_r.setJobID(jobID);
00158         if (jobID < proxyMap.size()) {
00159             ProActiveMPICoupling[] arrayComm = (ProActiveMPICoupling[]) proxyMap.get(new Integer(
00160                         jobID));
00161             if ((dest < arrayComm.length) && (arrayComm[dest] != null)) {
00162                 arrayComm[dest].receiveFromProActive(m_r);
00163             } else {
00164                 throw new IndexOutOfBoundsException(
00165                     " ActiveProxyComm destinator " + dest + " is unreachable!");
00166             }
00167         } else {
00168             throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00169                 jobID);
00170         }
00171     }
00172 
00176     public Ack initEnvironment() {
00177         //      initialize the send & recv queues
00178         this.target.initQueues();
00179         return new Ack();
00180     }
00181 
00182     public void createRecvThread() {
00183         this.target.createRecvThread();
00184     }
00185 
00186     public void notifyProxy(Hashtable jobList, Hashtable groupList,
00187         Hashtable userProxyMap) {
00188         proxyMap = jobList;
00189         spmdProxyMap = groupList;
00190         this.userProxyMap = userProxyMap;
00191         this.target.sendJobNumberAndRegister();
00192     }
00193 
00194     public void wakeUpThread() {
00195         this.target.wakeUpThread();
00196     }
00197 
00201     public Node getNode() throws NodeException {
00202         return NodeFactory.getNode(ProActive.getBodyOnThis().getNodeURL());
00203     }
00204 
00205     public void allSendToMpi(int jobID, ProActiveMPIData m_r) {
00206         if (jobID < spmdProxyMap.size()) {
00207             ProActiveMPICoupling groupDest = (ProActiveMPICoupling) spmdProxyMap.get(new Integer(
00208                         jobID));
00209             groupDest.receiveFromMpi(m_r);
00210         } else {
00211             throw new IndexOutOfBoundsException(" MPI job with such ID: " +
00212                 jobID + " doesn't exist");
00213         }
00214     }
00215 
00216     public String toString() {
00217         StringBuffer sb = new StringBuffer();
00218         sb.append(target.toString());
00219         sb.append("\n MPIJobNum: " + this.jobID);
00220         return sb.toString();
00221     }
00222 
00223     public void sendToProActive(int jobID, ProActiveMPIData m_r)
00224         throws IllegalArgumentException, IllegalAccessException, 
00225             InvocationTargetException, SecurityException, NoSuchMethodException, 
00226             ClassNotFoundException {
00227         int dest = m_r.getDest();
00228         if (jobID < proxyMap.size()) {
00229             Hashtable proSpmdByClasses = (Hashtable) this.userProxyMap.get(new Integer(
00230                         jobID));
00231 
00232             Object proSpmdGroup = proSpmdByClasses.get(m_r.getClazz());
00233 
00234             // if the corresponding object exists, its a -ProSpmd object- or a -proxy-
00235             if (proSpmdGroup != null) {
00236                 Group g = ProActiveGroup.getGroup(proSpmdByClasses.get(
00237                             m_r.getClazz()));
00238 
00239                 // its a ProSpmd Object
00240                 if (g != null) {
00241                     // extract the specified object from the group and call method on it
00242                     ((Method) g.get(dest).getClass().getDeclaredMethod(m_r.getMethod(),
00243                         new Class[] { ProActiveMPIData.class })).invoke(g.get(dest),
00244                         new Object[] { m_r });
00245                 } else {
00246                     if (((Object[]) proSpmdByClasses.get(m_r.getClazz()))[dest] != null) {
00247                         ((Method) ((Object[]) proSpmdByClasses.get(m_r.getClazz()))[dest].getClass()
00248                                                                                          .getDeclaredMethod(m_r.getMethod(),
00249                             new Class[] { ProActiveMPIData.class })).invoke(((Object[]) proSpmdByClasses.get(
00250                                 m_r.getClazz()))[dest], new Object[] { m_r });
00251                     } else {
00252                         throw new ClassNotFoundException(
00253                             "The Specified User Class *** " + m_r.getClazz() +
00254                             "*** doesn't exist !!!");
00255                     }
00256                 }
00257             }
00258             // the specified class doesn't exist  
00259             else {
00260                 throw new ClassNotFoundException(
00261                     "The Specified User Class *** " + m_r.getClazz() +
00262                     "*** doesn't exist !!!");
00263             }
00264         } else {
00265             throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00266                 jobID);
00267         }
00268     }
00269 }

Generated on Mon Jan 22 15:16:11 2007 for ProActive by  doxygen 1.5.1