00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.mpi.control;
00032
00033 import org.apache.log4j.Logger;
00034
00035 import org.objectweb.proactive.ProActive;
00036 import org.objectweb.proactive.core.util.log.Loggers;
00037 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00038
00039 import java.net.UnknownHostException;
00040
00041
00042 public class ProActiveMPIComm {
00043 private static Logger logger = ProActiveLogger.getLogger(Loggers.MPI_CONTROL_COUPLING);
00044 private String hostname = "NULL";
00045 private volatile boolean shouldRun = true;
00046
00047
00048 private ProActiveMPICoupling myProxy;
00049
00050
00051 private int myRank = -1;
00052 private boolean notify = true;
00053 private int jobID;
00054 private int count;
00055
00056
00057 private ProActiveMPIManager manager;
00058
00062 private native int initRecvQueue();
00063
00064 private native int initSendQueue();
00065
00066 private native int sendJobNb(int jobNumber);
00067
00068 private native int init(String userPath, int r);
00069
00070 private native int closeQueue();
00071
00072 private native int closeAllQueues();
00073
00074 private native int sendRequest(ProActiveMPIData m_r, byte[] bs);
00075
00076 private native byte[] recvRequest(ProActiveMPIData m_r);
00077
00078 public native int proActiveSendRequest(ProActiveMPIData m_r, byte[] bs);
00079
00083 public ProActiveMPIComm() {
00084 }
00085
00086 public ProActiveMPIComm(String libName, int uniqueID) {
00087 try {
00088 hostname = java.net.InetAddress.getLocalHost().getHostName();
00089 logger.info("[REMOTE PROXY] [" + this.hostname +
00090 "] Constructor> : Loading library.");
00091 } catch (UnknownHostException e) {
00092 e.printStackTrace();
00093 }
00094 System.loadLibrary(libName);
00095 logger.info("[REMOTE PROXY] [" + this.hostname +
00096 "] Constructor> : Library loaded.");
00097
00098 this.init(uniqueID);
00099 }
00100
00104 public void initQueues() {
00105 logger.info("[REMOTE PROXY] [" + this.hostname +
00106 "] initQueues> : init receiving queue: " + initRecvQueue());
00107
00108 logger.info("[REMOTE PROXY] [" + this.hostname +
00109 "] initQueues> : init sending queue: " + initSendQueue());
00110 }
00111
00112 public void closeQueues() {
00113 logger.info("[REMOTE PROXY] [" + this.hostname +
00114 "] closeQueues> : closeQueue: " + closeQueue());
00115 }
00116
00117 public void closeAllSRQueues() {
00118 logger.info("[REMOTE PROXY] [" + this.hostname +
00119 "] closeAllSRQueues> : closeAllQueues: " + closeAllQueues());
00120 }
00121
00122 public void createRecvThread() {
00123 this.createThread();
00124 }
00125
00126 public void createThread() {
00127 Runnable r = new MessageRecvHandler();
00128 Thread t = new Thread(r, "Thread Message Recv");
00129 t.start();
00130 }
00131
00132 public void sendJobNumberAndRegister() {
00133 sendJobNumber(jobID);
00134 this.manager.register(this.jobID, myRank);
00135 }
00136
00137 public void wakeUpThread() {
00138 logger.info("[REMOTE PROXY] [" + this.hostname +
00139 "] activeThread> : activate thread");
00140 this.notify = true;
00141 }
00142
00143 public void asleepThread() {
00144 this.notify = false;
00145 }
00146
00150 public void setMyProxy(ProActiveMPICoupling myProxy,
00151 ProActiveMPIManager jobManager, int idJob) {
00152 this.myProxy = myProxy;
00153 this.jobID = idJob;
00154 this.manager = jobManager;
00155 }
00156
00157 public void init(int uniqueID) {
00158 logger.info("[REMOTE PROXY] [" + this.hostname + "] init> : init: " +
00159 init(System.getProperty("user.home"), uniqueID));
00160 this.closeAllSRQueues();
00161 }
00162
00166 public void receiveFromProActive(ProActiveMPIData m_r) {
00167 proActiveSendRequest(m_r, m_r.getData());
00168 }
00169
00170 public void sendJobNumber(int jobNumber) {
00171 logger.info("[REMOTE PROXY] [" + this.hostname +
00172 "] sendJobNumber> send job number " + sendJobNb(jobNumber));
00173 }
00174
00175 public void receiveFromMpi(ProActiveMPIData m_r) {
00176 if (m_r.getData() == null) {
00177 throw new RuntimeException("[REMOTE PROXY] !!! DATA are null ");
00178 }
00179
00180
00181 sendRequest(m_r, m_r.getData());
00182 }
00183
00184 public int getMyRank() {
00185 return myRank;
00186 }
00187
00188 public String toString() {
00189 StringBuffer sb = new StringBuffer();
00190 sb.append("\n Class: ");
00191 sb.append(this.getClass().getName());
00192 sb.append("\n Hostname: " + this.hostname);
00193 sb.append("\n rank: " + this.myRank);
00194 return sb.toString();
00195 }
00196
00200 protected class MessageRecvHandler implements Runnable {
00201 public MessageRecvHandler() {
00202 }
00203
00204 public void run() {
00205
00206 myProxy.register();
00207 ProActiveMPIData m_r = new ProActiveMPIData();
00208 byte[] data;
00209 Ack ack = new Ack();
00210 count = 0;
00211 while (shouldRun) {
00212 if (notify) {
00213 try {
00214 if ((data = recvRequest(m_r)) == null) {
00215 throw new RuntimeException(
00216 "[REMOTE PROXY] !!! ERROR data received are NULL from native method");
00217 }
00218
00219
00220 if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_INIT) {
00221 myRank = m_r.getSrc();
00222 myProxy.registerProcess(myRank);
00223 asleepThread();
00224 } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_SEND) {
00225 m_r.setData(data);
00226 int jobRecver = m_r.getjobID();
00227 m_r.setJobID(jobID);
00228 count++;
00229 if ((count % 1000) == 0) {
00230
00231 ProActive.waitFor(ack);
00232
00233 ack = myProxy.sendToMpi(jobRecver, m_r, false);
00234 } else {
00235 myProxy.sendToMpi(jobRecver, m_r);
00236 }
00237 } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_SEND_PROACTIVE) {
00238
00239
00240 m_r.setData(data);
00241 int jobRecver = m_r.getjobID();
00242 m_r.setJobID(jobID);
00243 m_r.parseParameters();
00244 myProxy.sendToProActive(jobRecver, m_r);
00245 } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_ALLSEND) {
00246 m_r.setData(data);
00247 int jobRecver = m_r.getjobID();
00248 m_r.setJobID(jobID);
00249 myProxy.allSendToMpi(jobRecver, m_r);
00250 } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_FINALIZE) {
00251 closeQueues();
00252 myProxy.unregisterProcess(myRank);
00253 shouldRun = false;
00254 } else {
00255 logger.info("[REMOTE PROXY] TAG UNKNOWN ");
00256 }
00257 } catch (Exception e) {
00258 System.out.println("In Java:\n\t" + e);
00259 e.printStackTrace();
00260 }
00261 }
00262 }
00263 }
00264 }
00265 }