00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.mpi.control;
00032
00033 import org.objectweb.proactive.ActiveObjectCreationException;
00034 import org.objectweb.proactive.Body;
00035 import org.objectweb.proactive.InitActive;
00036 import org.objectweb.proactive.ProActive;
00037 import org.objectweb.proactive.core.group.Group;
00038 import org.objectweb.proactive.core.group.ProActiveGroup;
00039 import org.objectweb.proactive.core.node.Node;
00040 import org.objectweb.proactive.core.node.NodeException;
00041 import org.objectweb.proactive.core.node.NodeFactory;
00042
00043 import java.io.IOException;
00044 import java.io.Serializable;
00045
00046 import java.lang.reflect.InvocationTargetException;
00047 import java.lang.reflect.Method;
00048
00049 import java.util.Hashtable;
00050
00051
00052 public class ProActiveMPICoupling implements Serializable, InitActive {
00053
00055 private ProActiveMPIManager manager;
00056
00058 private ProActiveMPIComm target;
00059
00060
00061 private Hashtable userProxyMap;
00062
00063
00064 private int jobID;
00065
00066
00067 private static Hashtable proxyMap;
00068
00069
00070 private Hashtable spmdProxyMap;
00071
00075 public ProActiveMPICoupling() {
00076 }
00077
00078 public ProActiveMPICoupling(String libName, ProActiveMPIManager manager,
00079 Integer jobNum)
00080 throws ActiveObjectCreationException, NodeException,
00081 ClassNotFoundException, InstantiationException,
00082 IllegalAccessException {
00083 this.manager = manager;
00084 this.jobID = jobNum.intValue();
00085 target = new ProActiveMPIComm(libName,
00086 ProActive.getBodyOnThis().getID().hashCode());
00087 }
00088
00089 public void initActivity(Body body) {
00090
00091 this.target.setMyProxy((ProActiveMPICoupling) ProActive.getStubOnThis(),
00092 this.manager, this.jobID);
00093 }
00094
00098 public void registerProcess(int rank) {
00099 this.manager.register(this.jobID, rank,
00100 (ProActiveMPICoupling) ProActive.getStubOnThis());
00101 }
00102
00103 public void register() {
00104 this.manager.register(this.jobID);
00105 }
00106
00107 public void register(int rank) {
00108 this.manager.register(this.jobID, rank);
00109 }
00110
00111 public void unregisterProcess(int rank) {
00112 this.manager.unregister(this.jobID, rank);
00113 }
00114
00115 public void receiveFromMpi(ProActiveMPIData m_r) {
00116 this.target.receiveFromMpi(m_r);
00117 }
00118
00119 public void receiveFromProActive(ProActiveMPIData m_r) {
00120 this.target.receiveFromProActive(m_r);
00121 }
00122
00123 public void sendToMpi(int jobID, ProActiveMPIData m_r)
00124 throws IOException {
00125 int dest = m_r.getDest();
00126 if (jobID < proxyMap.size()) {
00127 ProActiveMPICoupling[] arrayComm = (ProActiveMPICoupling[]) proxyMap.get(new Integer(
00128 jobID));
00129 if ((dest < arrayComm.length) && (arrayComm[dest] != null)) {
00130 arrayComm[dest].receiveFromMpi(m_r);
00131 } else {
00132 throw new IndexOutOfBoundsException(
00133 " ActiveProxyComm destinator " + dest + " is unreachable!");
00134 }
00135 } else {
00136 throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00137 jobID);
00138 }
00139 }
00140
00141 public Ack sendToMpi(int jobID, ProActiveMPIData m_r, boolean b)
00142 throws IOException {
00143 this.sendToMpi(jobID, m_r);
00144 return new Ack();
00145 }
00146
00147 public static void MPISend(byte[] buf, int count, int datatype, int dest,
00148 int tag, int jobID) {
00149
00150 ProActiveMPIData m_r = new ProActiveMPIData();
00151
00152 m_r.setData(buf);
00153 m_r.setCount(count);
00154 m_r.setDatatype(datatype);
00155 m_r.setDest(dest);
00156 m_r.setTag(tag);
00157 m_r.setJobID(jobID);
00158 if (jobID < proxyMap.size()) {
00159 ProActiveMPICoupling[] arrayComm = (ProActiveMPICoupling[]) proxyMap.get(new Integer(
00160 jobID));
00161 if ((dest < arrayComm.length) && (arrayComm[dest] != null)) {
00162 arrayComm[dest].receiveFromProActive(m_r);
00163 } else {
00164 throw new IndexOutOfBoundsException(
00165 " ActiveProxyComm destinator " + dest + " is unreachable!");
00166 }
00167 } else {
00168 throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00169 jobID);
00170 }
00171 }
00172
00176 public Ack initEnvironment() {
00177
00178 this.target.initQueues();
00179 return new Ack();
00180 }
00181
00182 public void createRecvThread() {
00183 this.target.createRecvThread();
00184 }
00185
00186 public void notifyProxy(Hashtable jobList, Hashtable groupList,
00187 Hashtable userProxyMap) {
00188 proxyMap = jobList;
00189 spmdProxyMap = groupList;
00190 this.userProxyMap = userProxyMap;
00191 this.target.sendJobNumberAndRegister();
00192 }
00193
00194 public void wakeUpThread() {
00195 this.target.wakeUpThread();
00196 }
00197
00201 public Node getNode() throws NodeException {
00202 return NodeFactory.getNode(ProActive.getBodyOnThis().getNodeURL());
00203 }
00204
00205 public void allSendToMpi(int jobID, ProActiveMPIData m_r) {
00206 if (jobID < spmdProxyMap.size()) {
00207 ProActiveMPICoupling groupDest = (ProActiveMPICoupling) spmdProxyMap.get(new Integer(
00208 jobID));
00209 groupDest.receiveFromMpi(m_r);
00210 } else {
00211 throw new IndexOutOfBoundsException(" MPI job with such ID: " +
00212 jobID + " doesn't exist");
00213 }
00214 }
00215
00216 public String toString() {
00217 StringBuffer sb = new StringBuffer();
00218 sb.append(target.toString());
00219 sb.append("\n MPIJobNum: " + this.jobID);
00220 return sb.toString();
00221 }
00222
00223 public void sendToProActive(int jobID, ProActiveMPIData m_r)
00224 throws IllegalArgumentException, IllegalAccessException,
00225 InvocationTargetException, SecurityException, NoSuchMethodException,
00226 ClassNotFoundException {
00227 int dest = m_r.getDest();
00228 if (jobID < proxyMap.size()) {
00229 Hashtable proSpmdByClasses = (Hashtable) this.userProxyMap.get(new Integer(
00230 jobID));
00231
00232 Object proSpmdGroup = proSpmdByClasses.get(m_r.getClazz());
00233
00234
00235 if (proSpmdGroup != null) {
00236 Group g = ProActiveGroup.getGroup(proSpmdByClasses.get(
00237 m_r.getClazz()));
00238
00239
00240 if (g != null) {
00241
00242 ((Method) g.get(dest).getClass().getDeclaredMethod(m_r.getMethod(),
00243 new Class[] { ProActiveMPIData.class })).invoke(g.get(dest),
00244 new Object[] { m_r });
00245 } else {
00246 if (((Object[]) proSpmdByClasses.get(m_r.getClazz()))[dest] != null) {
00247 ((Method) ((Object[]) proSpmdByClasses.get(m_r.getClazz()))[dest].getClass()
00248 .getDeclaredMethod(m_r.getMethod(),
00249 new Class[] { ProActiveMPIData.class })).invoke(((Object[]) proSpmdByClasses.get(
00250 m_r.getClazz()))[dest], new Object[] { m_r });
00251 } else {
00252 throw new ClassNotFoundException(
00253 "The Specified User Class *** " + m_r.getClazz() +
00254 "*** doesn't exist !!!");
00255 }
00256 }
00257 }
00258
00259 else {
00260 throw new ClassNotFoundException(
00261 "The Specified User Class *** " + m_r.getClazz() +
00262 "*** doesn't exist !!!");
00263 }
00264 } else {
00265 throw new IndexOutOfBoundsException(" No MPI job exists with num " +
00266 jobID);
00267 }
00268 }
00269 }