org/objectweb/proactive/mpi/control/ProActiveMPIComm.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.mpi.control;
00032 
00033 import org.apache.log4j.Logger;
00034 
00035 import org.objectweb.proactive.ProActive;
00036 import org.objectweb.proactive.core.util.log.Loggers;
00037 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00038 
00039 import java.net.UnknownHostException;
00040 
00041 
00042 public class ProActiveMPIComm {
00043     private static Logger logger = ProActiveLogger.getLogger(Loggers.MPI_CONTROL_COUPLING);
00044     private String hostname = "NULL";
00045     private volatile boolean shouldRun = true;
00046 
00047     /* my proxy */
00048     private ProActiveMPICoupling myProxy;
00049 
00050     // rank of mpi process in the job
00051     private int myRank = -1;
00052     private boolean notify = true;
00053     private int jobID;
00054     private int count;
00055 
00056     /* job manager */
00057     private ProActiveMPIManager manager;
00058 
00062     private native int initRecvQueue();
00063 
00064     private native int initSendQueue();
00065 
00066     private native int sendJobNb(int jobNumber);
00067 
00068     private native int init(String userPath, int r);
00069 
00070     private native int closeQueue();
00071 
00072     private native int closeAllQueues();
00073 
00074     private native int sendRequest(ProActiveMPIData m_r, byte[] bs);
00075 
00076     private native byte[] recvRequest(ProActiveMPIData m_r);
00077 
00078     public native int proActiveSendRequest(ProActiveMPIData m_r, byte[] bs);
00079 
00083     public ProActiveMPIComm() {
00084     }
00085 
00086     public ProActiveMPIComm(String libName, int uniqueID) {
00087         try {
00088             hostname = java.net.InetAddress.getLocalHost().getHostName();
00089             logger.info("[REMOTE PROXY] [" + this.hostname +
00090                 "] Constructor> : Loading library.");
00091         } catch (UnknownHostException e) {
00092             e.printStackTrace();
00093         }
00094         System.loadLibrary(libName);
00095         logger.info("[REMOTE PROXY] [" + this.hostname +
00096             "] Constructor> : Library loaded.");
00097         // initialize semaphores & log files
00098         this.init(uniqueID);
00099     }
00100 
00104     public void initQueues() {
00105         logger.info("[REMOTE PROXY] [" + this.hostname +
00106             "] initQueues> : init receiving queue: " + initRecvQueue());
00107 
00108         logger.info("[REMOTE PROXY] [" + this.hostname +
00109             "] initQueues> : init sending queue: " + initSendQueue());
00110     }
00111 
00112     public void closeQueues() {
00113         logger.info("[REMOTE PROXY] [" + this.hostname +
00114             "] closeQueues> : closeQueue: " + closeQueue());
00115     }
00116 
00117     public void closeAllSRQueues() {
00118         logger.info("[REMOTE PROXY] [" + this.hostname +
00119             "] closeAllSRQueues> : closeAllQueues: " + closeAllQueues());
00120     }
00121 
00122     public void createRecvThread() {
00123         this.createThread();
00124     }
00125 
00126     public void createThread() {
00127         Runnable r = new MessageRecvHandler();
00128         Thread t = new Thread(r, "Thread Message Recv");
00129         t.start();
00130     }
00131 
00132     public void sendJobNumberAndRegister() {
00133         sendJobNumber(jobID);
00134         this.manager.register(this.jobID, myRank);
00135     }
00136 
00137     public void wakeUpThread() {
00138         logger.info("[REMOTE PROXY] [" + this.hostname +
00139             "] activeThread> : activate thread");
00140         this.notify = true;
00141     }
00142 
00143     public void asleepThread() {
00144         this.notify = false;
00145     }
00146 
00150     public void setMyProxy(ProActiveMPICoupling myProxy,
00151         ProActiveMPIManager jobManager, int idJob) {
00152         this.myProxy = myProxy;
00153         this.jobID = idJob;
00154         this.manager = jobManager;
00155     }
00156 
00157     public void init(int uniqueID) {
00158         logger.info("[REMOTE PROXY] [" + this.hostname + "] init> : init: " +
00159             init(System.getProperty("user.home"), uniqueID));
00160         this.closeAllSRQueues();
00161     }
00162 
00166     public void receiveFromProActive(ProActiveMPIData m_r) {
00167         proActiveSendRequest(m_r, m_r.getData());
00168     }
00169 
00170     public void sendJobNumber(int jobNumber) {
00171         logger.info("[REMOTE PROXY] [" + this.hostname +
00172             "] sendJobNumber> send job number " + sendJobNb(jobNumber));
00173     }
00174 
00175     public void receiveFromMpi(ProActiveMPIData m_r) {
00176         if (m_r.getData() == null) {
00177             throw new RuntimeException("[REMOTE PROXY] !!! DATA are null ");
00178         }
00179 
00180         // byte[]
00181         sendRequest(m_r, m_r.getData());
00182     }
00183 
00184     public int getMyRank() {
00185         return myRank;
00186     }
00187 
00188     public String toString() {
00189         StringBuffer sb = new StringBuffer();
00190         sb.append("\n Class: ");
00191         sb.append(this.getClass().getName());
00192         sb.append("\n Hostname: " + this.hostname);
00193         sb.append("\n rank: " + this.myRank);
00194         return sb.toString();
00195     }
00196 
00200     protected class MessageRecvHandler implements Runnable {
00201         public MessageRecvHandler() {
00202         }
00203 
00204         public void run() {
00205             //      signal the job manager that this daemon is ok to recv message
00206             myProxy.register();
00207             ProActiveMPIData m_r = new ProActiveMPIData();
00208             byte[] data;
00209             Ack ack = new Ack();
00210             count = 0;
00211             while (shouldRun) {
00212                 if (notify) {
00213                     try {
00214                         if ((data = recvRequest(m_r)) == null) {
00215                             throw new RuntimeException(
00216                                 "[REMOTE PROXY] !!! ERROR data received are NULL from native method");
00217                         }
00218 
00219                         //check TAG1
00220                         if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_INIT) {
00221                             myRank = m_r.getSrc();
00222                             myProxy.registerProcess(myRank);
00223                             asleepThread();
00224                         } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_SEND) {
00225                             m_r.setData(data);
00226                             int jobRecver = m_r.getjobID();
00227                             m_r.setJobID(jobID);
00228                             count++;
00229                             if ((count % 1000) == 0) {
00230                                 // wait for old acknowledge
00231                                 ProActive.waitFor(ack);
00232                                 // create new Acknowledge
00233                                 ack = myProxy.sendToMpi(jobRecver, m_r, false);
00234                             } else {
00235                                 myProxy.sendToMpi(jobRecver, m_r);
00236                             }
00237                         } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_SEND_PROACTIVE) {
00238                             //                                  System.out.println("[" + hostname +
00239                             //                                  "] TREAD] RECVING MESSAGE-> SENDING");
00240                             m_r.setData(data);
00241                             int jobRecver = m_r.getjobID();
00242                             m_r.setJobID(jobID);
00243                             m_r.parseParameters();
00244                             myProxy.sendToProActive(jobRecver, m_r);
00245                         } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_ALLSEND) {
00246                             m_r.setData(data);
00247                             int jobRecver = m_r.getjobID();
00248                             m_r.setJobID(jobID);
00249                             myProxy.allSendToMpi(jobRecver, m_r);
00250                         } else if (m_r.getTag1() == ProActiveMPIConstants.COMM_MSG_FINALIZE) {
00251                             closeQueues();
00252                             myProxy.unregisterProcess(myRank);
00253                             shouldRun = false;
00254                         } else {
00255                             logger.info("[REMOTE PROXY]  TAG UNKNOWN ");
00256                         }
00257                     } catch (Exception e) {
00258                         System.out.println("In Java:\n\t" + e);
00259                         e.printStackTrace();
00260                     }
00261                 }
00262             }
00263         }
00264     }
00265 }

Generated on Mon Jan 22 15:16:11 2007 for ProActive by  doxygen 1.5.1