Chapter 41. Wrapping MPI Legacy code

The Message Passing Interface (MPI) is a widely adopted communication library for parallel and distributed computing. This work has been designed to make it easier to wrap, deploy and couple several MPI legacy codes, especially on the Grid.

On one hand, we propose a simple wrapping method designed to automatically deploy MPI applications on clusters or desktop Grid through the use of deployment descriptor, allowing an MPI application to be deployed using most protocols and schedulers (LSF, PBS, SSH, SunGRID, etc) . The proposed wrapping also permits programmers to develop conventional stand-alone Java applications using some MPI legacy codes.

On the other hand, we propose a wrapping method with control designed to let SPMD processes associated with one code communicate with the SPMD processors associated with another simulation code. This feature adds the parallel capability of MPI on the Grid with the support of ProActive for inter-process communication between MPI processes at different Grid points. A special feature of the proposed wrapping is the support of "MPI to/from Java application" communications which permit users to exchange data between the two worlds.

The API is organized in the package org.objectweb.proactive.mpi, with the class org.objectweb.proactive.mpi.MPI gathering static methods and the class org.objectweb.proactive.mpi.MPISpmd whose, instances represent and allow to control a given deployed MPI code.

In sum, the following features are proposed:

41.1. Simple Wrapping

41.1.1. Principles

This work is mainly intended to deploy automatically and transparently MPI parallel applications on clusters. Transparency means that the deployer does not know what particular resources provide computer power. So the deployer should have to finalize the deployment descriptor file and to get back the result of the application without worrying about resources selections, resource locations and types, or mapping processes on resources.

One of the main principle is to specify and wrap the MPI code in an XML descriptor.

File transfer and asking for resources

Figure 41.1. File transfer and asking for resources

Main Features for Deployment:

  • File Transfer [using XML deployment descriptor]

    The primary objective is to provide deployer an automatic deployment of his application through an XML deployment descriptor. In fact, ProActive provides support for File Transfer. In this way, deployer can transfer MPI application input data and/or MPI application code to the remote host. The File Transfer happens before the deployer launches his application. For more details about File Transfer see Section 23.1, “Introduction and Concepts”.

  • Asking for resources [using XML deployment descriptor]

    Deployer describes MPI job requirements in the file deployment descriptor using one or several Virtual Node. He gets back a set of Nodes corresponding to the remote available hosts for the MPI Job execution. For more details (or usage example) about resources booking, have a look at Section 41.1.4, “Using the Infrastructure” .

  • Control MPI process [using ProActive API]

    After deployment, deployer obtains the Virtual Node containing resources required for the MPI job, that is a set of Nodes. The MPI API provides programmer with the ability to create a stateful MPISpmd object from the Virtual Node obtained. To this end the programmer is able to control the MPI program, that is: trigger the job execution, kill the job, synchronize the job, get the object status/result etc..). This API is detailed in the next chapter.

41.1.2. API For Deploying MPI Codes

41.1.2.1. API Definition

  • What is an MPISpmd object ?

    An MPISpmd object is regarded as an MPI code wrapper. It has the following features :

    • it holds a state (which can take different value, and reflects the MPI code status)

    • it can be controlled through an API (presented in the next section)

  • MPISpmd object creation methods

    import org.objectweb.proactive.mpi;
    
    /**
     * creates an MPISpmd object from a Virtual Node which represents the deployment of an MPI code.
     * Activates the virtual node (i.e activates all the Nodes mapped to this VirtualNode
     * in the XML Descriptor) if not already activated, and returns an object representing
     * the MPI deployment process.
     * The MPI code being deployed is specified in the XML descriptor where the Virtual Node is
     defined.
     */
    
    static public MPISpmd MPI.newMPISpmd(VirtualNode virtualNode);
    
  • MPISpmd object control methods

    /**
     * Triggers the process execution represented by the MPISpmd object on the resources previously
     * allocated. This method call is an asynchronous request, thus the call does not
     * block until the result (MPI result) is used or explicit synchronization is required. The method
     * immediately returns a future object, more specially a future on an MPIResult object.
     * As a consequence, the application can go on with executing its code, as long as it doesn't need
     * to invoke methods on this MPIResult returned object, in which case the calling thread is
     * automatically blocked if the result of the method invocation is not yet available, i.e.
     * In practice, mpirun is also called
     */
    
    public MPIResult startMPI();
    
    /**
     * Restarts the process represented by the MPISpmd object on the same resources. This process has
     * to previously been started once with the start method, otherwise the method throws an
     * IllegalMPIStateException. If current state is Running, the 
     * process is killed and a new independent computation is triggered,
     * and a new MPIResult object is created. It is also an asynchronous method which returns a future
     * on an MPIResult object.
     */
    
    public MPIResult reStartMPI();
    
    /** 
     * Kills the process and OS MPI processes represented by the MPISpmd object. 
     * It returns true if the process was running when it has been killed, false otherwise.
     */						
    
    public boolean killMPI();
    
    /**
     * Returns the current status of the MPISpmd object. The different status are listed below.
     */
    
    public String getStatus();
    
    /**
     * Add or modify the MPI command parameters. It allows programmers to specify arguments to the MPI
     code.
     * This method has to be called before startMPI or reStartMPI. 
     */
    
    public void setCommandArguments(String arguments);
    
  • MPIResult object

    An MPIResult object is obtained with the startMPI/reStartMPI methods call. Rather, these methods return a future on an MPIResult object that does not block application as long as no method is called on this MPIResult object. On the contrary, when a MPIResult object is used, the application is blocked until the MPIResult object is updated, meaning that the MPI program is terminated. The following method gets the exit value of the MPI program.

    import org.objectweb.proactive.mpi.MPIResult; 
    
    /**
     * Returns the exit value of the MPI program. 
     * By usual convention, the value 0 indicates normal termination.
     */
    
    public int getReturnValue();
    
  • MPISpmd object status

    import org.objectweb.proactive.mpi;
    
    MPIConstants.MPI_UNSTARTED; // default status - MPISpmd object creation (newMPISpmd)
    MPIConstants.MPI_RUNNING;   // MPISpmd object has been started or restarted
    MPIConstants.MPI_FINISHED;  // MPISpmd object has finished
    MPIConstants.MPI_KILLED;    // MPISpmd object has been killed
    

    Each status defines the current state of the MPISpmd object. It provides the guarantee of application consistency and a better control of the application in case of multiple MPISpmd objects.

    State transition diagram

    Figure 41.2. State transition diagram

41.1.3. How to write an application with the XML and the API

First finalize the xml file descriptor to specify the MPI code, and files that have to be transfered on the remote hosts and resources requirement as it is explained at Section 41.1.4, “Using the Infrastructure”. Then, in a Java file import the package org.objectweb.proactive.mpi. In an attempt to keep application consistency, the MPISpmd object makes use of status. It guarantees that either a method called on object is coherent or an exception is thrown. Especially the exception IllegalMPIStateException signals a method that has been called at an illegal or inappropriate time. In other words, an application is not in an appropriate state for the requested operation.

An application does not require to declare in its throws clause because IllegalMPIStateException is a subclass of RuntimeException. The graph above presents a kind of finite state machine or finite automaton, that is a model of behavior composed of states (status of the MPISpmd object) and transition actions (methods of the API). Once the MPISpmd object is created (newMPISpmd), the object enters in the initial state: ProActiveMPIConstants.MPI_UNSTARTED.

Sample of code (available in the release) These few lines show how to execute the MPI executive jacobi, and show how to get its return value once finished. No modification have to be made to the source code.

import org.objectweb.proactive.mpi.*;

...
// load the file descriptor 
ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml');

// get the Virtual Node that references the jacobi MPI code you want to execute
VirtualNode jacobiVN = pad.getVirtualNode('JACOBIVN');

// activate Virtual Node (it's not mandatory because the MPI.newMPISpmd method does
// it automatically if it has not been already done)
jacobiVN.activate();

// create the MPISpmd object with the Virtual Node
MPISpmd jacobiSpmd = MPI.newMPISpmd(jacobiVN);

// trigger jacobi mpi code execution and get a future on the MPIResult
MPIResult jacobiResult = jacobiSpmd.startMPI();

// print current status
logger.info("Current status: "+jacobiSpmd.getStatus());


// get return value (block the thread until the jacobiResult is available)
logger.info("Return value: "+jacobiResult.getReturnValue());

// print the MPISpmd object caracteristics (name, current status, processes number ...)
logger.info(jacobiSpmd);

...
	

41.1.4. Using the Infrastructure

Resources booking and MPI code are specified using ProActive Descriptors. We have explained the operation with an example included in the release. The deployment goes through sh, then PBS, before launching the MPI code on 16 nodes of a cluster. The entire file is available in Example C.37, “MPI Wrapping: mpi_files/MPIRemote-descriptor.xml”.

  • File Transfer: specify all the files which have to be transferred on the remote host like binary code and input data. In the following example, jacobi is the binary of the MPI program. For further details about File Transfer see Section 23.1, “Introduction and Concepts”.

    <componentDefinition>
        <virtualNodesDefinition>
            <virtualNode name="JACOBIVN" />
        </virtualNodesDefinition>
    </componentDefinition>
    <deployment>
        ...
    </deployment>
    <fileTransferDefinitions>
        <fileTransfer id="jacobiCodeTransfer">
            <file src="jacobi" dest="jacobi" />
        </fileTransfer>
    </fileTransferDefinitions>
    
  • Resource allocation: define processes for resource reservation. See Section 21.7, “Infrastructure and processes” for more details on processes.

    • SSHProcess: first define the process used to contact the remote host on which resources will be reserved. Link the reference ID of the file transfer with the FileTransfer previously defined, and link the reference ID to the DependentProcessSequence process explained below.

      <processDefinition id="sshProcess">
          <sshProcess class="org.objectweb.proactive.core.process.ssh.SSHProcess" 
              hostname="nef.inria.fr"
              username="user">
              <processReference refid="jacobiDependentProcess"  />
              <fileTransferDeploy refid="jacobiCodeTransfer">
                  <copyProtocol>scp</copyProtocol>
                  <sourceInfo prefix=
      "/user/user/home/ProActive/src/org/objectweb/proactive/examples/mpi" />
                <destinationInfo prefix="/home/user/MyApp"/>
              </fileTransferDeploy>
          </sshProcess>
      </processDefinition>
      
    • DependentProcessSequence: This process is used when a process is dependent on another process. The first process of the list can be any process of the infrastructure of processes in ProActive, but the second has to be imperatively a DependentProcess, that is to implement the org.objectweb.proactive.core.process.DependentProcess interface. The following lines express that the mpiProcess is dependent on the resources allocated by the pbsProcess.

      <processDefinition id="jacobiDependentProcess">
          <dependentProcessSequence class="org.objectweb.proactive.core.process.DependentListProcess">
              <processReference refid="jacobiPBSProcess"/>
              <processReference refid="jacobiMPIProcess"/>
         </dependentProcessSequence>
      </processDefinition>
      
    • PBS Process: note that you can use any services defined in ProActive to allocate resources instead of the PBS one.

      <processDefinition id="jacobiPBSProcess">
          <pbsProcess class="org.objectweb.proactive.core.process.pbs.PBSSubProcess">
              <processReference refid="jvmProcess" />
              <commandPath value="/opt/torque/bin/qsub" />
              <pbsOption>
                  <hostsNumber>16</hostsNumber>
                  <processorPerNode>1</processorPerNode>
                  <bookingDuration>00:02:00</bookingDuration>
                  <scriptPath>
                      <absolutePath value="/home/smariani/pbsStartRuntime.sh" />
                  </scriptPath>
              </pbsOption>
          </pbsProcess>
      </processDefinition>
      

  • MPI process: defines the MPI actual code to be deployed (executable) and its attributes. It is possible to pass a command option to mpirun by filling the attribute mpiCommandOptions. Specify the number of hosts you wish the application to be deployed on, and at least the MPI code local path. The local path is the path from which you start the application.

    <processDefinition id="jacobiMPIProcess">
        <mpiProcess class="org.objectweb.proactive.core.process.mpi.MPIDependentProcess"
            mpiFileName="jacobi"
            mpiCommandOptions="input_file.dat output_file.dat">
            <commandPath value="/usr/src/redhat/BUILD/mpich-1.2.6/bin/mpirun" />
            <mpiOptions>
                <processNumber>16</processNumber>
                <localRelativePath>
                    <relativePath origin="user.home" value="/ProActive/scripts/unix"/>
                </localRelativePath>
                <remoteAbsolutePath>
                    <absolutePath value="/home/smariani/MyApp"/>
                </remoteAbsolutePath>
            </mpiOptions>
        </mpiProcess>
    </processDefinition>
    

41.1.5. Example with several codes

Let's assume we want to interconnect together several modules (VibroToAcous, AcousToVibro, Vibro, Acous, CheckConvergency) which are each a parallel MPI binary code.

import org.objectweb.proactive.ProActive;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.config.ProActiveConfiguration;
import org.objectweb.proactive.core.descriptor.data.ProActiveDescriptor;
import org.objectweb.proactive.core.descriptor.data.VirtualNode;

...
// load the file descriptor 
ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml');

// get the Virtual Nodes which references all the MPI code we want to use
VirtualNode VibToAc = pad.getVirtualNode("VibToAc");
VirtualNode AcToVib = pad.getVirtualNode("AcToVib");
VirtualNode Vibro = pad.getVirtualNode("Vibro");
VirtualNode Acous = pad.getVirtualNode("Acous");
VirtualNode CheckConvergency = pad.getVirtualNode("CheckConvergency");

// it's not necessary to activate manually each Virtual Node because it's done
// when creating the MPISpmd object with the Virtual Node

// create MPISpmd objects from Virtual Nodes
MPISpmd vibToAc = MPI.newMPISpmd(VibToAc);
MPISpmd acToVib = MPI.newMPISpmd(AcToVib);
MPISpmd vibro = MPI.newMPISpmd(Vibro);
MPISpmd acous = MPI.newMPISpmd(Acous);

// create two different MPISpmd objects from a  single Virtual Node 
MPISpmd checkVibro = MPI.newMPISpmd(CheckConvergency);
MPISpmd checkAcous = MPI.newMPISpmd(CheckConvergency);

 // create MPIResult object for each MPISpmd object
MPIResult vibToAcRes, acToVibRes, vibroRes, acousRes, checkVibroRes, checkAcousRes;

boolean convergence = false;
boolean firstLoop = true;

While (!convergence)
{
	//  trigger execution of vibToAc and acToVib MPISpmd object
	if (firstLoop){
		vibToAcRes = vibToAc.startMPI();
		acToVibRes = acToVib.startMPI();
	}else{
		vibToAcRes = vibToAc.reStartMPI();
		acToVibRes = acToVib.reStartMPI();
	}
	
	// good termination?
	if (( vibToACRes.getReturnValue() < 0 ) || ( acToVibRes.getReturnValue() < 0 ))
		System.exit(-1);
   
	//  trigger execution of vibro and acous MPISpmd object
	if (firstLoop){
		vibroRes = vibro.startMPI();
		acousRes = acous.startMPI();
	}else{
		vibroRes = vibro.reStartMPI();
		acousRes = acous.reStartMPI();
	}

	
	// good termination?
	if (( vibroRes.getReturnValue() < 0 ) || ( acousRes.getReturnValue() < 0 ))
		System.exit(-1);
	
		
	// Check convergency of acoustic part and structure part
	if (firstLoop){
		// modify argument  
		checkVibro.setCommandArguments("oldVibro.res newVibro.res");
		checkAcous.setCommandArguments("oldAcous.res newAcous.res");
		checkVibroRes = checkVibro.startMPI();
		checkAcousRes = checkAcous.startMPI();
	}else{
		checkVibroRes = checkVibro.reStartMPI();
		checkAcousRes = checkAcous.reStartMPI();
	}

	
	// Convergency?
	if (( checkVibroRes.getReturnValue() == 0 ) || ( checkAcousRes.getReturnValue() == 0 ))
	{
		convergence = true;
	}
	firstLoop = false;
}

	
// free resources
VibToAc.killAll(false);
AcToVib.killAll(false);
Vibro.killAll(false);
Acous.killAll(false);
CheckConvergency.killAll(false);

				

41.2. Wrapping with control

Some MPI applications may decompose naturally into components that are better suited to execute on different plateforms, e.g., a simulation component and a visualization component; other applications may be too large to fit on one system. If each subsystem is a parallel system, then MPI is likely to be used for "intra-system" communication, in order to achieve better performance thanks to MPI vendor MPI libraries, as compared to the generic TCP/IP implementations.

ProActive makes it possible to deploy at once a set of MPI applications on a set of clusters or desktop machines. Moreover, this section will also demonstrate how to deploy at the same time a set of ProActive JVMs, to be used mainly for the sake of two aspects:

  • communicating between the different codes,

  • controlling, and synchronizing the execution of several (coupled) MPI codes.

"Inter-system" message passing is implemented by ProActive asynchronous remote method invocations. An MPI process may participate both in intra-system communication, using the native MPI implementation, and in inter-system communication, with ProActive through JNI (Java Native Interface) layered on top of IPC system V.

This wrapping defines a cross implementation protocol for MPI that enables MPI implementations to run very efficiently on each subsystem, and ProActive to allow interoperability between each subsystem. A parallel computation will be able to span multiple systems both using the native vendor message passing library and ProActive on each system. New ProActive specific MPI API are supporting these features. The goal is to support some point-to-point communication functions for communication across systems, as well as some collectives. This binding assume that inter-system communication uses ProActive between each pair of communicating systems, while intra-system communication uses proprietary protocols, at the discretion of each vendor MPI implementation.

The API for the wrapping with control is organized in the package org.objectweb.proactive.mpi.control, with the class org.objectweb.proactive.mpi.control.ProActiveMPI gathering static method for deployment.

41.2.1. One Active Object per MPI process

First the principle to wrap MPI code is similar to the Simple Wrapping method: deployer describes MPI job requirements in the file deployment descriptor using a Virtual Node and gets back a set of Nodes corresponding to the remote available hosts for the MPI Job execution. After deployment, deployer obtains the Virtual Node containing a set of Nodes on which the whole MPI processes will be mapped.

Further, to ensure control, an Active Object is deployed on each Node where an MPI process resides. The Active Object has a role of wrapper/proxy, redirecting respectively local MPI process output messages to the remote recipient(s) and incoming messages to the local MPI process. For more details, please refer to Section 41.2.4, “MPI to MPI Communications through ProActive”.

This approach provides programmer with the ability to deploy some instances of his own classes on any Node(s) using the API defined below. It permits programmer to capture output messages of MPI process towards his own classes, and to send new messages towards any MPI process of the whole application. For more details, please refer to Section 41.2.2, “MPI to ProActive Communications” and Section 41.2.3, “ProActive to MPI Communications”. The deployment of Java Active Objects takes place after all MPI processes have started and once the ProActiveMPI_Init() function has been called. That way the implementation can ensure that, when an SPMD group of Active Objects is created by calling the newActiveSpmd function on an MPISpmd object, then programmer SPMD instance ranks will match with the MPI process ones.

41.2.1.1. Java API

  • MPISpmd object methods

    For more details about MPISpmd object creation, please refer to Section 41.1.2, “API For Deploying MPI Codes”.

    import org.objectweb.proactive.mpi;
    
    /**
     * Builds (and deploys) an 'SPMD' group of Active objects with all references between them
     * to communicate. This method creates objects of type class on the same nodes on which
     * this MPISpmd object has deployed the MPI application, with no parameters.
     * There's a bijection between mpi process rank of the application deployed by this
     * MPISpmd object and the rank of each active object of the 'SPMD' group.
     */
    
    public void  newActiveSpmd(String class);
    
    import org.objectweb.proactive.mpi;
    
    /**
     * Builds (and deploys) an 'SPMD' group of Active objects class on the same nodes on which
     * this MPISpmd object has deployed the MPI application. 
     * Params contains the parameters used to build the group's member.
     * There's a bijection between mpi process rank of the application deployed by this
     * MPISpmd object and the rank of each active object of the 'SPMD' group
     */
    
    public void newActiveSpmd(String class, Object[] params);
    
    import org.objectweb.proactive.mpi;
    
    /**
     * Builds (and deploys) an 'SPMD' group of Active objects of type class on the same
     * nodes on which this MPISpmd object has deployed the MPI application. 
     * Params contains the parameters used to build the group's member.
     * There's a bijection between mpi process rank of the application deployed by this
     * MPISpmd object and the rank of each active object of the 'SPMD' group
     */
    
    public void newActiveSpmd(String class, Object[][] params);
    
    import org.objectweb.proactive.mpi;
    
    /**
     * Builds (and deploys) an Active object of type class on the same node where the mpi process
     * of the application deployed with this MPISpmd object has rank rank.
     * Params contains the parameters used to build the active object
     */
    
    public void newActive(String class, Object[] params, int rank);
    	throws ArrayIndexOutOfBoundsException - if the specified rank is greater than number of nodes
    
  • Deployment method

    The MPI API in the package org.objectweb.proactive.mpi provides programmer with the ability to create an MPISpmd object from the Virtual Node obtained. The following static method is used to achieve MPI processes registration and job number attribution. Each MPI process belongs to a global job, which permits to make difference between two MPI processes with same rank in the whole application. For instance, it would exist a first root process which belongs to job 0 (the first MPI application) and a second root process which belongs to job 1 (the second MPI application). The JobID of an MPI code is directly given by the rank of the MPISpmd Object in the ArrayList at deployment time.

    import org.objectweb.proactive.mpi;
    
    /**
     * Deploys and starts (startMPI() being called) all MPISpmd objects contained in the list 
    mpiSpmdObjectList.
     */
    
    static public void ProActiveMPI.deploy(ArrayList mpiSpmdObjectList);
    

41.2.1.2. Example

The following piece of code is an example of a java main program which shows how to use the wrapping with control feature with two codes. The xml file descriptor is finalized exactly in the same manner that for the Simple Wrapping. For more details about writing a file descriptor, please refer to Section 41.1.4, “Using the Infrastructure”.

import org.objectweb.proactive.mpi.*;

...
// load the file descriptor 
ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml');

// get the Virtual Nodes which reference the different MPI codes
VirtualNode vnA = pad.getVirtualNode("CLUSTER_A");
VirtualNode vnB = pad.getVirtualNode("CLUSTER_B");

// create the MPISpmd objects with the Virtual Nodes
MPISpmd spmdA = MPI.newMPISpmd(vnA);
MPISpmd spmdB = MPI.newMPISpmd(vnB);

Object[][] params = new Object[][]{{param_on_node_1},{param_on_node_2}, {param_on_node_3}};

// deploy "MyClass" as an 'SPMD' group on same nodes that spmdA object, with the list of parameters
// defined above
spmdA.newActiveSpmd("MyClass", params);

// deploy "AnotherClass" on the node where the mpi process of the application is rank 0,
// with no parameters
spmdB.newActiveSpmd("AnotherClass", new Object[]{}, 0);

// create the list of MPISpmd objects (First MPI job is job with value 0, second is job with value
 1 etc... )
ArrayList spmdList = new ArrayList();
spmdList.add(spmdA); spmdList.add(spmdB);

// deploy and start the listed MPISpmd objects
ProActiveMPI.deploy(spmdList);

...

41.2.2. MPI to ProActive Communications

The wrapping with control allows the programmer to send messages from MPI to Java Objects. Of course these classes have to be previously deployed using the API seen above. This feature could be useful for example if a simulation code is an MPI computation and the visualization component is a java code. All MPI Code that need to be controled or communicate through ProActive needs to call the ProActiveMPI_Init() function detailed in the Section 41.2.4, “MPI to MPI Communications through ProActive”

41.2.2.1. MPI API

ProActiveSend
    Performs a basic send from mpi side to a ProActive java class

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveSend(void* buf, int count, MPI_Datatype datatype, int dest, char* className, char*
 methodName, int jobID, ...);

Input Parameters
    buf      initial address of send buffer  
    count    number of elements in send buffer (nonnegative integer) 
    datatype datatype of each send buffer element  
    dest     rank of destination(integer) 
    classNamename of class 
    methodNamename of the method to be called
    jobID    remote or local job (integer)
    variable arguments string parameters to be passed to the method

41.2.2.2. ProActiveMPIData Object

The ProActiveMPIData class belongs to the package org.objectweb.proactive.mpi.control. While a message is sent from MPI side, a corresponding object ProActiveMPIData is created on java side and is passed as parameter to the method which name is specified in the ProActiveSend method, called by MPI. The ProActiveMPIData object contains severals fields that can be useful to the programmer. The following methods are available:

import org.objectweb.proactive.mpi.control;

/**
 * return the rank of the MPI process that sent this message
 */

public int getSrc();
/**
 * return the sender job ID
 */

public int getJobID();
/**
 * return the type of elements in the buffer data contained in the message. 
 * The type can be compared with the constants defined in the class ProActiveMPIConstants
 * in the same package.
 */

public int getDatatype();
/**
 * return the parameters as an array of String specified in the ProActiveSend method call.
 */

public String [] getParameters();

/**
 * return the data buffer as an array of primitive type byte.
 */

public byte [] getData();
/**
 * return the number of elements in the buffer.
 */

public int getCount();

41.2.2.3. ProActiveMPIUtil Class

The ProActiveMPIUtil class in the package org.objectweb.proactive.mpi.control.util brings together a set of static function for conversion. In fact, the programmer may use the following functions to convert an array of bytes into an array of elements with a different type:

/* Given a byte array, restore it as an int
 * param bytes the byte array
 * param startIndex the starting index of the place the int is stored
 */
 
 public static int bytesToInt(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a float
 * param bytes the byte array
 * param startIndex the starting index of the place the float is stored
 */
  
 public static float bytesToFloat(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a short
 * param bytes the byte array
 * param startIndex the starting index of the place the short is stored
 */
  
 public static short bytesToShort(byte[] bytes, int startIndex);
/*
 * Given a byte array, restore a String out of it.
 * the first cell stores the length of the String
 * param bytes the byte array
 * param startIndex the starting index where the string is stored,
 * the first cell stores the length
 * ret the string out of the byte array.
 */

 public static String bytesToString(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a long
 * param bytes the byte array
 * param startIndex the starting index of the place the long is stored
 */
  
 public static long bytesToLong(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a double
 * param bytes the byte array
 * param startIndex the starting index of the place the double is stored
 */
  
 public static double bytesToDouble(byte[] bytes, int startIndex);

41.2.2.4. Example

  • Main program [ProActive deployment part]

    import org.objectweb.proactive.mpi.*;
    
    ...
    // load the file descriptor 
    ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml');
    
    // get the Virtual Nodes which reference the different MPI codes
    VirtualNode vnA = pad.getVirtualNode("CLUSTER_A");
    
    // create the MPISpmd object with the Virtual Node
    MPISpmd spmdA = MPI.newMPISpmd(vnA);
    
    // deploy "MyClass" on same node that mpi process #3
    spmdA.newActive("MyClass", new Object[]{}, 3);
    
    // create the list of MPISpmd objects
    ArrayList spmdList = new ArrayList();
    spmdList.add(spmdA);
    
    // deploy and start the listed MPISpmd objects
    ProActiveMPI.deploy(spmdList);
    
    ...
    
  • Programmer class definition

    public class MyClass{
    
        public MyClass() {
        }
        
        // create a method with a ProActiveMPIData parameter which will be called by the MPI part
        public void foo(ProActiveMPIData data){ 
          int icnt = m_r.getCount();
          for (int start = 0; start < data.getData().length; start = start + 8) {
              // print the buffer received by converting the bytes array to an array of doubles
              System.out.print(" buf["+(icnt++)+"]= " +
                               ProActiveMPIUtil.bytesToDouble(data.getData(), start));
          }
        }
    }
    
  • MPI Side

    #include <stdio.h>
    #include "mpi.h"
    #include "ProActiveMPI.h"
    
    
    // variables declaration
        ...
       
    // initialize MPI environment
        MPI_Init( &argc, &argv );
        MPI_Comm_rank( MPI_COMM_WORLD, &rank );
        MPI_Comm_size( MPI_COMM_WORLD, &size);
    
    // initialize MPI with ProActive environment
        ProActiveMPI_Init(rank);
    
    // get this process job number
        ProActiveMPI_Job(&myjob);	
    
    // send a buffer of maxn doubles to MyClass"Active Object, located on the same
    // host that mpi process #3 of job #0, by calling method "foo" with some parameters.
        if ((rank == 0) && (myjob == 0)){ 
            error = ProActiveSend(xlocal[0], maxn, MPI_DOUBLE, 3, "MyClass", "foo", 0, "params1", 
    "params2", NULL );
            if (error < 0){
                printf("!!! Error Method call ProActiveSend \n");
            }
        }
    
        ProActiveMPI_Finalize();
        MPI_Finalize( );
        return 0;
    }
    
  • Snapshot of this example

    MPI to ProActive communication

    Figure 41.3. MPI to ProActive communication

41.2.3. ProActive to MPI Communications

The wrapping with control allows programmer to pass some messages from his own classes to the MPI computation. Of course these classes have to be previously deployed using the API seen at Section 41.2.1.1, “Java API”. This feature could be useful for example if the programmer want to control the MPI code by sending some "start" or "stop" messages during computation.

41.2.3.1. ProActive API

  • Send Function

    import org.objectweb.proactive.mpi.control;
    
    /**
     * Sends a buffer of bytes containing count elements of type datatype
     * to destination dest of job jobID
     * The datatypes are listed below
     */
    
    static public void ProActiveMPICoupling.MPISend(byte[] buf, int count, int datatype, int dest, int
     tag, int jobID);
    
  • Datatypes

    The following constants have to be used with the ProActiveMPICoupling.MPISend method to fill the datatype parameter.

    import org.objectweb.proactive.mpi.control;
    
    MPIConstants.MPI_CHAR;
    eMPIConstants.MPI_UNSIGNED_CHAR;
    MPIConstants.MPI_BYTE;
    MPIConstants.MPI_SHORT;
    MPIConstants.MPI_UNSIGNED_SHORT;
    MPIConstants.MPI_INT;
    MPIConstants.MPI_UNSIGNED;
    MPIConstants.MPI_LONG;
    MPIConstants.MPI_UNSIGNED_LONG;
    MPIConstants.MPI_FLOAT;
    MPIConstants.MPI_DOUBLE;
    MPIConstants.MPI_LONG_DOUBLE;
    MPIConstants.MPI_LONG_LONG_INT;
    

41.2.3.2. MPI API

ProActiveRecv
    Performs a blocking receive from mpi side to receive data from a ProActive java class

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID);

Output Parameters
    buf   initial address of receive buffer  

Input Parameters
    count    number of elements in send buffer (nonnegative integer) 
    datatype datatype of each recv buffer element  
    src      rank of source (integer) 
    tag      message tag (integer) 
    jobID    remote job (integer)
ProActiveIRecv
    Performs a non blocking receive from mpi side to receive data from a ProActive java class

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveIRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID,
 ProActiveMPI_Request *request);

Output Parameters
    request   communication request (handle)  

Input Parameters
    buf      initial address of receive buffer  
    count    number of elements in send buffer (nonnegative integer) 
    datatype datatype of each recv buffer element  
    src      rank of source (integer) 
    tag      message tag (integer) 
    jobID    remote job (integer)
ProActiveTest
    Tests for the completion of receive from a ProActive java class
Synopsis
    #include "ProActiveMPI.h"
    int ProActiveTest(ProActiveMPI_Request *request, int *flag);

Output Parameters
    flag     true if operation completed (logical)  

Input Parameters
    request  communication request (handle)  
ProActiveWait
    Waits for an MPI receive from a ProActive java class to complete
Synopsis
    #include "ProActiveMPI.h"
    int ProActiveWait(ProActiveMPI_Request *request);

Input Parameters
    request  communication request (handle)  

41.2.3.3. Example

The following example shows how to send some messages from a ProActive class to his MPI computation.

  • Main program [ProActive deployment part]

    import org.objectweb.proactive.mpi.*;
    
    ...
    // load the file descriptor 
    ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml');
    
    // get the Virtual Nodes which reference the different MPI codes
    VirtualNode vnA = pad.getVirtualNode("CLUSTER_A");
    
    // create the MPISpmd object with the Virtual Node
    MPISpmd spmdA = MPI.newMPISpmd(vnA);
    
    // deploy "MyClass" on same node that mpi process #3
    spmdA.newActive("MyClass", new Object[]{}, 3);
    
    // create the list of MPISpmd objects
    ArrayList spmdList = new ArrayList();
    spmdList.add(spmdA);
    
    // deploy and start the listed MPISpmd objects
    ProActiveMPI.deploy(spmdList);
    
    ...
    
  • Programmer class definition

    Assume for example the "postTreatmentForVisualization" method. It is called at each iteration from MPI part, gets the current array of doubles generated by the MPI computation and makes a java post treatment in order to visualize them in a java viewer. If the java computation fails, the method sends a message to MPI side to abort the computation.

    import org.objectweb.proactive.mpi.control;
    
    public class MyClass{
    
        public MyClass() {
        }
    
        // create a method with a ProActiveMPIData parameter
        public void postTreatmentForVisualization(ProActiveMPIData data){ 
          int icnt = m_r.getCount();
          double [] buf = new double [icnt];
          int error = 0;
          for (int start = 0; start < data.getData().length; start = start + 8) {
              // save double in a buffer
              buf[start/8]=ProActiveMPIUtil.bytesToDouble(data.getData(), start);
          }
    
          // make data post-treatment for visualization 
          ...
    		
    
          if (error == -1){
                // convert int to double
                byte [] byteArray = new byte [4];
                ProActiveMPIUtil.intToBytes(error, byteArray, 0);
    
                // send message to the local MPI process to Abort computation
                ProActiveMPICoupling.MPISend(byteArray, 1, ProActiveMPIConstants.MPI_INT, 3, 0, 0);
         }
    }
    
  • MPI Side

    #include <stdio.h>
    #include "mpi.h"
    #include "ProActiveMPI.h"
    
    
    // variables declaration
        short buf;
        ProActiveMPI_Request request;
    	int flag;
    
    // initialize MPI environment
        MPI_Init( &argc, &argv );
        MPI_Comm_rank( MPI_COMM_WORLD, &rank );
        MPI_Comm_size( MPI_COMM_WORLD, &size);
    
    // initialize MPI with ProActive environment
        ProActiveMPI_Init(rank);
    
    // get this process job number
        ProActiveMPI_Job(&myjob);	
    
    // computation
        for (itcnt=0; itcnt<10000; itcnt++){
    
            // call the "postTreatmentForVisualization" method in "MyClass" Active Object,
            // located on the same host that root process of job #0 and send the current data
            // generated by the computation
            if ((rank == 0) && (myjob == 0)){ 
                error = ProActiveSend(xlocal[0], 1, MPI_DOUBLE, 3, "MyClass", 
    "postTreatmentForVisualization", 0,NULL );
                if (error < 0){
                    printf("!!! Error Method call ProActiveSend \n");
                }
            }
    
            // perform a non-blocking recv
            if ((rank == 3) && (myjob == 0)){
                error = ProActiveIRecv(&buf, 1 , MPI_INT, 3, 0, 0, &request);
                if (error < 0){
                    printf("!!! Error Method call ProActiveIRecv \n");
                }
            }
    
            // do computation
            ...
    		
            // check if a message arrived from ProActive side
            if ((rank == 3) && (myjob == 0)){
                error = ProActiveTest(&request, &flag);
                if (error < 0){
                    printf("!!! Error Method call ProActiveTest \n");
                }
    
                // if a message is captured, flag is true and buf contains message
                // it is not mandatory to check the value of the buffer because we know that
                // the reception of a message is due to a failure of java side computation.
                if (flag == 1){
                       MPI_Abort(MPI_COMM_WORLD, 1); 
                }
            }
        }
     
        ProActiveMPI_Finalize();
        MPI_Finalize( );
        return 0;
    }
    
  • Snapshot of this example

    ProActive to MPI communication

    Figure 41.4. ProActive to MPI communication

41.2.4. MPI to MPI Communications through ProActive

The ProActiveMPI features handles the details of starting and shutting down processes on different system and coordinating execution. However passing data between the processes is explicitly specified by the programmer in the source code, depending on whether messages are being passed between local or remote systems, programmer would choose respectively either the MPI API or the ProActiveMPI API defined below.

File transfer and asking for resources

Figure 41.5. File transfer and asking for resources

41.2.4.1. MPI API

ProActiveMPI_Init
    Initializes the MPI with ProActive execution environment

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Init(int rank);

Input Parameters
    rank	the rank of the mpi process previously well initialized with MPI_Init
ProActiveMPI_Job
    Initializes the job environment variable

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Job(int *job);

Output Parameters
    job	job the mpi process belongs to
ProActiveMPI_Finalize
    Terminates MPI with ProActive execution environment

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Finalize();
ProActiveMPI_Send
    Performs a basic send

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, int jobID
 );

Input Parameters
    buf      initial address of send buffer  
    count    number of elements in send buffer (nonnegative integer) 
    datatype datatype of each send buffer element  
    dest     rank of destination (integer) 
    tag      message tag (integer) 
    jobID    remote job (integer)
ProActiveMPI_Recv
    Performs a basic Recv

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int
 jobID);

Output Parameters
    buf   initial address of receive buffer (choice) 

Input Parameters
    count    number of elements in recv buffer (nonnegative integer) 
    datatype datatype of each recv buffer element  
    src      rank of source (integer) 
    tag      message tag (integer) 
    jobID    remote job (integer)
ProActiveMPI_IRecv
    Performs a non blocking receive

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_IRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int
 jobID, ProActiveMPI_Request *request);

Output Parameters
    request   communication request (handle)  

Input Parameters
    buf      initial address of receive buffer  
    count    number of elements in send buffer (nonnegative integer) 
    datatype datatype of each recv buffer element  
    src      rank of source (integer) 
    tag      message tag (integer) 
    jobID    remote job (integer)
ProActiveMPI_Test
    Tests for the completion of receive
Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Test(ProActiveMPI_Request *request, int *flag);

Output Parameters
    flag     true if operation completed (logical)  

Input Parameters
    request  communication request (handle)  
ProActiveMPI_Wait
    Waits for an MPI receive to complete
Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Wait(ProActiveMPI_Request *request);

Input Parameters
    request  communication request (handle)  
ProActiveMPI_AllSend
    Performs a basic send to all processes of a remote job

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_AllSend(void *buf, int count, MPI_Datatype datatype, int tag, int jobID);

Input Parameters
    buf      initial address of send buffer  
    count    number of elements in send buffer (nonnegative integer) 
    datatype datatype of each recv buffer element  
    tag      message tag (integer) 
    jobID    remote job (integer)
ProActiveMPI_Barrier
    Blocks until all process of the specified job have reached this routine
    No synchronization is enforced if jobID is different from current jobID, and -1 is returned.

Synopsis
    #include "ProActiveMPI.h"
    int ProActiveMPI_Barrier(int jobID);

Input Parameters
    jobID	jobID for which the caller is blocked until all members have entered the call.

41.2.4.2. Example

#include <stdio.h>
#include "mpi.h"
#include "ProActiveMPI.h"


// variables declaration
    ...
   
// initialize MPI environment
    MPI_Init( &argc, &argv );
    MPI_Comm_rank( MPI_COMM_WORLD, &rank );
    MPI_Comm_size( MPI_COMM_WORLD, &size);

// initialize MPI with ProActive environment
    ProActiveMPI_Init(rank);

// get this process job number
    ProActiveMPI_Job(&myjob);	

// send from process (#size, #0) to (#0, #1)  [#num_process, #num_job]
    if ((rank == size-1) && (myjob==0)){ 
        error = ProActiveMPI_Send(xlocal[maxn/size], maxn, MPI_DOUBLE, 0, 0, 1);
        if (error < 0){
           printf(" Error while sending from #%d-%d \n", rank, myjob);}
    }
// recv (#0, #1) from (#size, #0) 
    if ((rank == 0) && (myjob==1)) {
        error = ProActiveMPI_Recv(xlocal[0], maxn, MPI_DOUBLE, size-1, 0, 0);
        if (error < 0){
           printf(" Error while recving with #%d-%d \n", rank, myjob);}
    }

    ProActiveMPI_Finalize();
    MPI_Finalize( );
    return 0;
}

41.2.5. USER STEPS - The Jacobi Relaxation example

The Jacobi relaxation method for solving the Poisson equation has become a classic example of applying domain decomposition to parallelize a problem. Briefly, the original domain is divided into sub-domains. Figure below illustrates dividing a 12x12 domain into two domains with two 12x3 sub-domains (one-dimensional decomposition). Each sub-domain is associated with a single cpu of a cluster, but one can divide the original domain into as many domains as there are clusters and as many sub-domains as there are cpu's. The iteration in the interior (green) cells can proceed independently of each other. Only the perimeter (red) cells need information from the neighbouring sub-domains. Thus, the values of the solution in the perimeter must be sent to the "ghost" (blue) cells of the neighbours, as indicated by the arrows. The amount of data that must be transferred between cells (and the corresponding nodes) is proportional to the number of cells in one dimension, N.

Jacobi Relaxation - Domain Decomposition

Figure 41.6. Jacobi Relaxation - Domain Decomposition

In example below, the domain decomposition is applied on two clusters. The domain is a 1680x1680 mesh divided in 16 sub-domains of 1680x280 on each cluster.

41.2.5.1. Compiling the ProActiveMPI package

To compile the ProActiveMPI package, you may enter the ProActive/compile directory and type:

linux> build clean ProActiveMPI

[Note]Note

The compilation requires an implementation of MPI installed on your machine otherwise it leads an error.

If build is successful, it will:

  • compile recursively all java classes in the org.objectweb.proactive.mpi package.

  • generate the native library that all wrapper/proxy Active Objects will load in their JVM.

  • execute the configure script in directory org/objectweb/proactive/mpi/control/config. The script -configure- generates a Makefile in same directory. The Makefile permits to compile MPI source code which contains the ProActiveMPI functions.

41.2.5.2. Defining the infrastructure

For more details about writing a file descriptor, please refer to Section 41.1.4, “Using the Infrastructure”.

            <?xml version="1.0" encoding="UTF-8"?>
<ProActiveDescriptor xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation=
"http://www-sop.inria.fr/oasis/proactive/schema/3.2/DescriptorSchema.xsd">
  <variables>
    <descriptorVariable name="PROACTIVE_HOME" value="ProActive"/>
    <descriptorVariable name="LOCAL_HOME" value="/home/smariani"/>
    <descriptorVariable name="REMOTE_HOME_NEF" value="/home/smariani"/>
    <descriptorVariable name="REMOTE_HOME_NINA" value="/user/smariani/home"/>
    <descriptorVariable name="MPIRUN_PATH_NEF" value=
"/usr/src/redhat/BUILD/mpich-1.2.6/bin/mpirun"/>
    <descriptorVariable name="MPIRUN_PATH_NINA" value=
"/user/smariani/home/mpich-1.2.6/bin/mpirun"/>
    <descriptorVariable name="QSUB_PATH" value="/opt/torque/bin/qsub"/>
  </variables>
  <componentDefinition>
    <virtualNodesDefinition>
      <virtualNode name="Cluster_Nef" />
      <virtualNode name="Cluster_Nina" />
    </virtualNodesDefinition>
  </componentDefinition>
  <deployment>
    <mapping>
      <map virtualNode="Cluster_Nef">
        <jvmSet>
          <vmName value="Jvm1" />
        </jvmSet>
      </map>
      <map virtualNode="Cluster_Nina">
        <jvmSet>
          <vmName value="Jvm2" />
        </jvmSet>
        
      </map>
    </mapping>
    <jvms>
      <jvm name="Jvm1">
        <creation>
          <processReference refid="sshProcess_nef" />
        </creation>
      </jvm>
      <jvm name="Jvm2">
        <creation>
          <processReference refid="sshProcess_nina" />
        </creation>
      </jvm>
    </jvms>
  </deployment>
  <fileTransferDefinitions>
    <fileTransfer id="JACOBI">
      <!-- Transfer mpi program on remote hosts -->
      <file src="jacobi" dest="jacobi" />
    </fileTransfer>
  </fileTransferDefinitions>
  <infrastructure>
    <processes>      

      <processDefinition id="localJVM_NEF">
        <jvmProcess class="org.objectweb.proactive.core.process.JVMNodeProcess">
          <classpath>
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/ProActive.jar" />
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/asm.jar" />
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/log4j.jar" />
            <absolutePath value=
"${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/components/fractal.jar" />
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/xercesImpl.jar" />
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/bouncycastle.jar" />
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/jsch.jar" />
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/javassist.jar" />
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/classes" />
          </classpath>
          <javaPath>
            <absolutePath value="${REMOTE_HOME_NEF}/jdk1.5.0_05/bin/java" />
          </javaPath>
          <policyFile>
            <absolutePath value="${REMOTE_HOME_NEF}/proactive.java.policy" />
          </policyFile>
          <log4jpropertiesFile>
            <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/compile/proactive-log4j"
 />
          </log4jpropertiesFile>
          <jvmParameters>
            <parameter value="-Dproactive.useIPaddress=true" />
            <parameter value="-Dproactive.rmi.port=6099" />
            <!--  DO NOT FORGET TO SET THE java.library.path VARIABLE to the remote directory
 path of the application -->
            <parameter value="-Djava.library.path=${REMOTE_HOME_NEF}/MyApp" />
          </jvmParameters>
        </jvmProcess>
      </processDefinition>
      
      <processDefinition id="localJVM_NINA">
        <jvmProcess class="org.objectweb.proactive.core.process.JVMNodeProcess">
          <classpath>
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/ProActive.jar" />
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/asm.jar" />
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/log4j.jar" />
            <absolutePath value=
"${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/components/fractal.jar" />
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/xercesImpl.jar" />
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/bouncycastle.jar" />
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/jsch.jar" />
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/javassist.jar" />
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/classes" />
          </classpath>
          <javaPath>
            <absolutePath value="/user/smariani/home/NOSAVE/jdk1.5.0_05/bin/java"/>
          </javaPath>
          <policyFile>
            <absolutePath value="${REMOTE_HOME_NINA}/proactive.java.policy"/>
          </policyFile>
          <log4jpropertiesFile>
            <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/compile/proactive-log4j"
/>
          </log4jpropertiesFile>
          <jvmParameters>
            <parameter value="-Dproactive.useIPaddress=true" />
            <parameter value="-Dproactive.rmi.port=6099" />
            <!--  DO NOT FORGET TO SET THE java.library.path VARIABLE to the remote directory
 path of the application -->
            <parameter value="-Djava.library.path=${REMOTE_HOME_NINA}/MyApp" />
          </jvmParameters>
        </jvmProcess>
      </processDefinition>
      
      
      <!-- pbs Process -->
      <processDefinition id="pbsProcess">
        <pbsProcess class="org.objectweb.proactive.core.process.pbs.PBSSubProcess">
          <processReference refid="localJVM_NEF" />
          <commandPath value="${QSUB_PATH}" />
          <pbsOption>
            <!-- ask for 16 nodes on cluster nef (8 hosts, 2 nodes per machine)-->
            <hostsNumber>8</hostsNumber>
            <processorPerNode>2</processorPerNode>
            <bookingDuration>01:00:00</bookingDuration>
            <scriptPath>
              <absolutePath value="${REMOTE_HOME_NEF}/pbsStartRuntime.sh" />
            </scriptPath>
          </pbsOption>
        </pbsProcess>
      </processDefinition>
      
      
      <processDefinition id="lsfProcess">
        <bsubProcess class="org.objectweb.proactive.core.process.lsf.LSFBSubProcess">
          <processReference refid="localJVM_NINA"/>
            <bsubOption>
            <!-- ask for 16 nodes on cluster nina (8 hosts, 2 nodes per machine)-->
            <processor>16</processor>
            <resourceRequirement value="span[ptile=2]"/>
            <scriptPath>
              <absolutePath value="${REMOTE_HOME_NINA}/startRuntime.sh"/>
            </scriptPath>
            </bsubOption>
        </bsubProcess>
      </processDefinition>
      
      <!-- mpi Process -->
      <processDefinition id="mpiProcess_nef">
        <mpiProcess class="org.objectweb.proactive.core.process.mpi.MPIDependentProcess"  
mpiFileName="jacobi" >
          <commandPath value="${MPIRUN_PATH_NEF}" />
          <mpiOptions>
            <processNumber>16</processNumber>
            <localRelativePath>
              <relativePath origin="user.home" value="Test" />
            </localRelativePath>
            <remoteAbsolutePath>
              <absolutePath value="${REMOTE_HOME_NEF}/MyApp" />
            </remoteAbsolutePath>
          </mpiOptions>
        </mpiProcess>
      </processDefinition>
      
      <!-- mpi Process -->
      <processDefinition id="mpiProcess_nina">
        <mpiProcess class="org.objectweb.proactive.core.process.mpi.MPIDependentProcess"  
mpiFileName="jacobi" >
          <commandPath value="${MPIRUN_PATH_NINA}" />
          <mpiOptions>
            <processNumber>16</processNumber>
            <localRelativePath>
              <relativePath origin="user.home" value="Test" />
            </localRelativePath>
            <remoteAbsolutePath>
              <absolutePath value="${REMOTE_HOME_NINA}/MyApp" />
            </remoteAbsolutePath>
          </mpiOptions>
        </mpiProcess>
      </processDefinition>
      
      <!-- dependent process -->
      <processDefinition id="dpsProcess_nef">
        <dependentProcessSequence class=
"org.objectweb.proactive.core.process.DependentListProcess">
          <processReference refid="pbsProcess" />
          <processReference refid="mpiProcess_nef" />
        </dependentProcessSequence>
      </processDefinition>
      
      <!-- dependent process -->
      <processDefinition id="dpsProcess_nina">
        <dependentProcessSequence class=
"org.objectweb.proactive.core.process.DependentListProcess">
          <processReference refid="lsfProcess" />
          <processReference refid="mpiProcess_nina" />
        </dependentProcessSequence>
      </processDefinition>
      
      <!-- ssh process -->
      <processDefinition id="sshProcess_nef">
        <sshProcess class="org.objectweb.proactive.core.process.ssh.SSHProcess"  hostname=
"nef.inria.fr" username="smariani">
          <processReference refid="dpsProcess_nef" />
          <fileTransferDeploy refid="JACOBI">
            <copyProtocol>processDefault, scp, rcp</copyProtocol>
            <!-- local host path -->
            <sourceInfo prefix=
"${PROACTIVE_HOME}/src/org/objectweb/proactive/mpi/control/config/bin" />
            <!-- remote host path -->
            <destinationInfo prefix="${REMOTE_HOME_NEF}/MyApp" />
          </fileTransferDeploy>
        </sshProcess>
      </processDefinition>
            
      <!-- ssh process -->
      <processDefinition id="sshProcess_nina">
        <sshProcess class="org.objectweb.proactive.core.process.ssh.SSHProcess" hostname=
"cluster.inria.fr" username="smariani">
          <processReference refid="dpsProcess_nina" />
          <fileTransferDeploy refid="JACOBI">
            <copyProtocol>scp</copyProtocol>
            <!-- local host path -->
            <sourceInfo prefix=
"${PROACTIVE_HOME}/src/org/objectweb/proactive/mpi/control/config/bin" />
            <!-- remote host path -->
            <destinationInfo prefix="${REMOTE_HOME_NINA}/MyApp" />
          </fileTransferDeploy>
        </sshProcess>
      </processDefinition>
    </processes>
  </infrastructure>
</ProActiveDescriptor>

          
[Note]Note

To be interfaced with some native code, each wrapper/proxy loads a library in their JVM context. Then, it is necessary that the value of the java.library.path variable for each JVM is set to the remote directory path. To be done, use the following tag in each jvmProcess definition:

<parameter value="-Djava.library.path=${REMOTE_HOME_NEF}/MyApp" />

41.2.5.3. Writing the MPI source code

Place the source file in org/objectweb/proactive/mpi/control/config/src directory

             
#include <stdio.h>
#include "mpi.h"
#include "ProActiveMPI.h"
#include <time.h>

/* This example handles a 1680x1680 mesh, on 2 clusters with 16 nodes (2 ppn) for each  */
#define maxn 1680
#define size 840
#define JOB_ZERO 0
#define JOB_ONE  1
#define NB_ITER  10000

int main( argc, argv )
int argc;
char **argv;
{
	int        rank, initValue, i, j, itcnt, idjob, nb_proc, error;
	int        i_first, i_last;
	double     xlocal[(size/3)+2][maxn];
	double     xnew[(size/3)+3][maxn];
	char processor_name[MPI_MAX_PROCESSOR_NAME];
	int  namelen;
	
	// MPI initialization 
	MPI_Init( &argc, &argv );
	MPI_Comm_rank( MPI_COMM_WORLD, &rank );
	MPI_Comm_size( MPI_COMM_WORLD, &nb_proc );
	MPI_Get_processor_name(processor_name,&namelen);

	// ProActive with MPI initialization  
	error = ProActiveMPI_Init(rank);
	if (error < 0){
		printf("[MPI] !!! Error ProActiveMPI init \n");
		MPI_Abort( MPI_COMM_WORLD, 1 );
	}
	
	// get this process job ID
	ProActiveMPI_Job(&idjob);
	if (nb_proc != 16) MPI_Abort( MPI_COMM_WORLD, 1 );
	
	/* xlocal[][0] is lower ghostpoints, xlocal[][size+2] is upper */
	
	/*
	 * Note that top and bottom processes have one less row of interior points
	 */
	i_first = 1;
	i_last = size/nb_proc;
	
	if ((rank == 0) && (idjob == JOB_ZERO)) i_first++;
	if ((rank == nb_proc - 1) && (idjob == JOB_ONE)) i_last--;
	
	// matrix initialization
	if (idjob==JOB_ZERO) initValue=rank;
	else {initValue = nb_proc+rank;} 
	
		
	/* Fill the data as specified */
	for (i=1; i<=size/nb_proc; i++)
		for (j=0; j<maxn; j++)
			xlocal[i][j] = initValue;
	for (j=0; j<maxn; j++) {
		xlocal[i_first-1][j] = -1;
		xlocal[i_last+1][j] = -1;
	}
	
	itcnt = 0;
	do {
	
		/*----+----+----+----+----+----+ MPI COMMS +----+----+----+----+----+----+*/
		/* Send up unless I'm at the top, then receive from below */
		/* Note the use of xlocal[i] for &xlocal[i][0] */
		if (rank < nb_proc - 1) 
			MPI_Send( xlocal[size/nb_proc], maxn, MPI_DOUBLE, rank + 1, 0, 
					MPI_COMM_WORLD );
		
		if (rank > 0)			
			MPI_Recv( xlocal[0], maxn, MPI_DOUBLE, rank - 1, 0, 
					MPI_COMM_WORLD, &status );
	
		/*----+----+----+----+----+----+ PROACTIVE COMMS +----+----+----+----+----+----+*/
		if ((rank == nb_proc - 1) && (idjob == JOB_ZERO)){ 
			error = ProActiveMPI_Send(xlocal[size/nb_proc], maxn, MPI_DOUBLE, 0, 0, JOB_ONE);
			if (error < 0){
				printf("[MPI] !!! Error ProActiveMPI send #15/0 -> #0/1 \n");}
		}
		
		if ((rank == 0) && (idjob==JOB_ONE)) {
			error = ProActiveMPI_Recv(xlocal[0], maxn, MPI_DOUBLE, nb_proc - 1, 0, JOB_ZERO);
			if (error < 0){
				printf("[MPI] !!! Error ProActiveMPI recv #0/1 <- #15/0 \n");}
			
		}
		
		
		/*----+----+----+----+----+----+ MPI COMMS +----+----+----+----+----+----+*/
		/* Send down unless I'm at the bottom */
		if (rank > 0) 
			MPI_Send( xlocal[1], maxn, MPI_DOUBLE, rank - 1, 1, 
					MPI_COMM_WORLD );
		
		if (rank < nb_proc - 1) 
			MPI_Recv( xlocal[size/nb_proc+1], maxn, MPI_DOUBLE, rank + 1, 1, 
					MPI_COMM_WORLD, &status );
		
		
		/*----+----+----+----+----+----+ PROACTIVE COMMS +----+----+----+----+----+----+*/
		if ((rank == 0) && (idjob==JOB_ONE)){
			error = ProActiveMPI_Send(xlocal[1], maxn, MPI_DOUBLE, nb_proc - 1, 1, JOB_ZERO);
			if (error < 0){
				printf("[MPI] !!! Error ProActiveMPI send #0/1 -> #15/0 \n");}
			
		}
		
		if ((rank == nb_proc - 1) && (idjob==JOB_ZERO)) {
			t_00 = MPI_Wtime();
			error = ProActiveMPI_Recv(xlocal[size/nb_proc+1], maxn, MPI_DOUBLE, 0, 1, JOB_ONE);
			t_01 = MPI_Wtime();
			if (error < 0){
				printf("[MPI] !!! Error ProActiveMPI recv  #15/0 <- #0/1 \n");}
			waitForRecv += t_01 - t_00;
			
		}
		/*----+----+----+----+----+----+ COMPUTATION +----+----+----+----+----+----+*/
		/* Compute new values (but not on boundary) */
		itcnt ++;
		diffnorm = 0.0;
		for (i=i_first; i<=i_last; i++) 
			for (j=1; j<maxn-1; j++) {
				xnew[i][j] = (xlocal[i][j+1] + xlocal[i][j-1] +
						xlocal[i+1][j] + xlocal[i-1][j]) / 4.0;
				diffnorm += (xnew[i][j] - xlocal[i][j]) * 
				(xnew[i][j] - xlocal[i][j]);
			}
		/* Only transfer the interior points */
		for (i=i_first; i<=i_last; i++) 
			for (j=1; j<maxn-1; j++) 
				xlocal[i][j] = xnew[i][j];
		
		if (rank == 0) printf( "[MPI] At iteration %d, job %d \n", itcnt, idjob );
	} while (itcnt < NB_ITER);
	
	// print this process buffer 
	printf("[MPI] Rank: %d Job: %d \n",rank, idjob );
	for (i=1; i<(size/16); i++){
		printf("[");
		for (j=0; j<maxn; j++)
			printf( "%f ",xlocal[i][j]);
		printf("] \n");
	}
	
	// clean environment
	ProActiveMPI_Finalize();
	MPI_Finalize( );
	return 0;
}




          

41.2.5.4. Compiling the MPI source code

To compile the MPI code with the added features for wrapping, you may enter the org/objectweb/proactive/mpi/control/config directory and type:

linux> make clean
linux> make mpicode=jacobi

[Note]Note

The mpicode value is the name of the source file without its extension. The Makefile generates a binary with the same name in /bin directory.

41.2.5.5. Writing the ProActive Main program

            import org.apache.log4j.Logger;

import org.objectweb.proactive.ProActive;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.config.ProActiveConfiguration;
import org.objectweb.proactive.core.descriptor.data.ProActiveDescriptor;
import org.objectweb.proactive.core.descriptor.data.VirtualNode;
import org.objectweb.proactive.core.node.Node;
import org.objectweb.proactive.core.util.log.Loggers;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.mpi.MPI;
import org.objectweb.proactive.mpi.MPISpmd;
import org.objectweb.proactive.mpi.control.ProActiveMPI;

import java.util.ArrayList;
import java.util.Vector;


public class Main {
    public static void main(String[] args) {
        Logger logger = ProActiveLogger.getLogger(Loggers.EXAMPLES);

        if (args.length != 1) {
            logger.error("Usage: java " + Main.class.getName() +
                " <deployment file>");
            System.exit(0);
        }

        ProActiveConfiguration.load();

        VirtualNode jacobiOnNina;
        VirtualNode jacobiOnNef;
        ProActiveDescriptor pad = null;

        try {
            pad = ProActive.getProactiveDescriptor("file:" + args[0]);

            // gets virtual node 
            jacobiOnNef = pad.getVirtualNode("Cluster_Nef");
            jacobiOnNina = pad.getVirtualNode("Cluster_Nina");
            
            MPISpmd nefMPISpmd = MPI.newMPISpmd(jacobiOnNef);
            MPISpmd ninaMPISpmd = MPI.newMPISpmd(jacobiOnNina);

            ArrayList my_jobs = new ArrayList();
            my_jobs.add(nefMPISpmd);
            my_jobs.add(ninaMPISpmd);
            ProActiveMPI.deploy(my_jobs);

        } catch (ProActiveException e) {
            e.printStackTrace();
            logger.error("Pb when reading descriptor");
        }
    }
}

          

41.2.5.6. Executing application

Deploy the ProActive main program above like any another ProActive application using a script like the following one:

#!/bin/sh

echo --- ProActive/MPI JACOBI example ---------------------------------------------

workingDir=`dirname $0`
. $workingDir/env.sh

XMLDESCRIPTOR=/user/smariani/home/Test/MPI-Jacobi-nina-nef.xml

$JAVACMD -classpath $CLASSPATH  -Djava.security.policy=$PROACTIVE/compile/proactive.java.policy 
 -Dproactive.rmi.port=6099
  -Dlog4j.configuration=file:$PROACTIVE/compile/proactive-log4j Main $XMLDESCRIPTOR 

41.2.5.7. The Output

Reading of the file descriptor and return of 16 nodes from the first cluster Nef and 16 nodes from the second cluster Nina


************* Reading deployment descriptor: file:/user/smariani/home/TestLoadLib/MPI-Jacobi-nina-nef.xml ******************** 
created VirtualNode name=Cluster_Nef 
created VirtualNode name=Cluster_Nina 
...
**** Mapping VirtualNode Cluster_Nef with Node: //193.51.209.75:6099/Cluster_Nef932675317 done 
**** Mapping VirtualNode Cluster_Nef with Node: //193.51.209.76:6099/Cluster_Nef1864357984 done 
**** Mapping VirtualNode Cluster_Nef with Node: //193.51.209.70:6099/Cluster_Nef1158912343 done 
...

**** Mapping VirtualNode Cluster_Nina with Node: //193.51.209.47:6099/Cluster_Nina1755746262 done 
**** Mapping VirtualNode Cluster_Nina with Node: //193.51.209.47:6099/Cluster_Nina-1139061904 done 
**** Mapping VirtualNode Cluster_Nina with Node: //193.51.209.45:6099/Cluster_Nina-941377986 done 
...

Deployment of proxies on remote nodes and environment initialization

[MANAGER] Create SPMD Proxy for jobID: 0
[MANAGER] Initialize remote environments
[MANAGER] Activate remote thread for communication 
[MANAGER] Create SPMD Proxy for jobID: 1 
[MANAGER] Initialize remote environments 
[MANAGER] Activate remote thread for communication 

Processes registration

[MANAGER] JobID #0 register mpi process #12 
[MANAGER] JobID #0 register mpi process #3 
[MANAGER] JobID #0 register mpi process #1 
[MANAGER] JobID #0 register mpi process #15 
[MANAGER] JobID #0 register mpi process #4 
[MANAGER] JobID #0 register mpi process #7 
[MANAGER] JobID #0 register mpi process #0 
[MANAGER] JobID #0 register mpi process #9 
[MANAGER] JobID #0 register mpi process #2 
[MANAGER] JobID #0 register mpi process #13 
[MANAGER] JobID #0 register mpi process #10 
[MANAGER] JobID #0 register mpi process #5 
[MANAGER] JobID #0 register mpi process #11 
[MANAGER] JobID #0 register mpi process #14 
[MANAGER] JobID #0 register mpi process #6 
[MANAGER] JobID #0 register mpi process #8 
[MANAGER] JobID #1 register mpi process #10 
[MANAGER] JobID #1 register mpi process #13 
[MANAGER] JobID #1 register mpi process #6 
[MANAGER] JobID #1 register mpi process #3 
[MANAGER] JobID #1 register mpi process #7 
[MANAGER] JobID #1 register mpi process #8 
[MANAGER] JobID #1 register mpi process #15 
[MANAGER] JobID #1 register mpi process #9 
[MANAGER] JobID #1 register mpi process #4 
[MANAGER] JobID #1 register mpi process #1 
[MANAGER] JobID #1 register mpi process #0 
[MANAGER] JobID #1 register mpi process #11 
[MANAGER] JobID #1 register mpi process #2 
[MANAGER] JobID #1 register mpi process #5 
[MANAGER] JobID #1 register mpi process #12 
[MANAGER] JobID #1 register mpi process #14

Starting computation

[MPI] At iteration 1, job 1  
[MPI] At iteration 2, job 1  
[MPI] At iteration 3, job 1  
[MPI] At iteration 4, job 1  
[MPI] At iteration 5, job 1  
...
[MPI] At iteration 1, job 0  
[MPI] At iteration 2, job 0  
[MPI] At iteration 3, job 0  
[MPI] At iteration 4, job 0  
[MPI] At iteration 5, job 0  
[MPI] At iteration 6, job 0  
...

[MPI] At iteration 9996, job 1  
[MPI] At iteration 9997, job 1  
[MPI] At iteration 9998, job 1  
[MPI] At iteration 9999, job 1  
[MPI] At iteration 10000, job 1  
...
[MPI] At iteration 9996, job 0  
[MPI] At iteration 9997, job 0  
[MPI] At iteration 9998, job 0  
[MPI] At iteration 9999, job 0  
[MPI] At iteration 10000, job 0  

Displaying each process result, for example

[MPI] Rank: 15 Job: 1  
[31.000000 27.482592 24.514056 ...  24.514056 27.482592 31.000000 ]  
[31.000000 26.484765 22.663677 ...  22.663677 26.484765 31.000000 ]  
[31.000000 24.765592 19.900617 ...  19.900617 24.765592 31.000000 ]  

All processes unregistration

[MANAGER] JobID #1 unregister mpi process #15 
[MANAGER] JobID #1 unregister mpi process #14 
[MANAGER] JobID #0 unregister mpi process #0 
[MANAGER] JobID #1 unregister mpi process #13 
[MANAGER] JobID #0 unregister mpi process #1 
[MANAGER] JobID #1 unregister mpi process #12 
[MANAGER] JobID #0 unregister mpi process #2
... 

The following snapshot shows the 32 Nodes required, distributed on 16 hosts (two processes per host, and 8 hosts on each cluster). Each Node contains its local wrapper, a ProActiveMPICoupling Active Object. One can notice the ProActive communication between two MPI processes trough the communication between two proxies which belongs to two Nodes residing on different clusters.

IC2D Snapshot

Figure 41.7. IC2D Snapshot

41.3. Design and Implementation

41.3.1. Simple wrapping

41.3.1.1. Structural Design

Proxy Pattern

Figure 41.8. Proxy Pattern

  • The proxy has the role of a smart reference that performs additional actions when the MPISpmdImpl Active Object is accessed. Especially the proxy forwards requests to the Active Object if the current status of this Active Object is in an appropriate state, otherwise an IllegalMPIStateException is thrown.

41.3.1.2. Infrastructure of processes

Process Package Architecture

Figure 41.9. Process Package Architecture

  • DependentListProcess and IndependentListProcess (left part on the picture)

    The SequentialListProcess relative classes are defined in the org.objectweb.proactive.core.process package. The two classes share the same characteristics: both contain a list of processes which have to be executed sequentially. This dependent constraint has been integrated in order to satisfy the MPI process requirement. Indeed, the DependentListProcess class specifies a list of processes which have to extend the DependentProcess interface, unless the header process which is a simple allocation resources process. It provides deployer to be sure that the dependent process will be executed if and only if this dependent process gets back parameters from which it is dependent.

  • MPIDependentProcess (right part on the picture)

    The MPI relative classes are defined in the org.objectweb.proactive.core.process.mpi package. MPI process preliminary requires a list of hosts for job execution. Thus, this process has to implement the Dependent Process interface. See section 11.7. Infrastructure and processes (part III) for more details on processes.

41.4. Summary of the API

41.4.1. Simple Wrapping and Deployment of MPI Code

   org.objectweb.proactive.mpi   

   public class MPI   

   static MPISpmd   

newMPISpmd(VirtualNode virtualNode) throws IllegalMPIStateException   

Creates an MPISpmd object from an existing VirtualNode   

   public class MPISpmd   

   MPIResult   

startMPI() throws IllegalMPIStateException   

Triggers MPI code execution and returns a future on an MPIResult object   

   MPIResult   

reStartMPI() throws IllegalMPIStateException   

Restarts MPI code execution and returns a new future on an MPIResult object   

   boolean   

killMPI() throws IllegalMPIStateException   

Kills the MPI code execution   

   String   

getStatus()   

Returns the current status of MPI code execution   

   void   

setCommandArguments(String arguments)   

Adds or modifies the MPI command parameters   

   public class MPIResult   

   int   

getReturnValue()   

Returns the exit value of the MPI code   

   public class MPIConstants   

   static final String   

MPI_UNSTARTED   

MPISpmd object status after creation   

   static final String   

MPI_RUNNING   

MPISpmd object has been started or restarted   

   static final String   

MPI_KILLED   

MPISpmd object has been killed   

   static final String   

MPI_FINISHED   

MPISpmd object has finished   

Table 41.1. Simple Wrapping of MPI Code

41.4.2. Wrapping with Control

41.4.2.1. One Active Object per MPI process

   org.objectweb.proactive.mpi   

   public class MPISpmd   

   void   

newActiveSpmd(String class)   

Deploys an SPMD group of Active Objects on each MPISpmd Nodes   

   void   

newActiveSpmd(String class, Object[] params)   

Deploys an SPMD group of Active Objects with specific constructor parameters on each MPISpmd Nodes   

   void   

newActiveSpmd(String class, Object[][] params)   

Deploys an SPMD group of Active Objects with specific constructor parameters on each MPISpmd Nodes   

   void   

newActive(String class, Object[] params, int rank)

throws ArrayIndexOutOfBoundsException   

Deploys an Active object with specific constructor parameters on a single node specified with rank   

   org.objectweb.proactive.mpi.control   

   public class ProActiveMPI   

   void   

deploy(ArrayList mpiSpmdList)   

Deploys and starts all MPISpmd objects in the list   

Table 41.2. API for creating one Active Object per MPI process

41.4.2.2. MPI to ProActive Communications

   int   

ProActiveSend(void* buf, int count, MPI_Datatype datatype, int dest, char* className, char* methodName, int jobID, ...)   

Performs a basic send from mpi side to a ProActive java class   

Table 41.3. MPI to ProActive Communications API

   org.objectweb.proactive.mpi.control   

   public class ProActiveMPIData   

   int   

getSrc()   

Returns the rank of mpi process sender   

   int   

getJobID()   

Returns jobID of mpi process sender   

   int   

getDataType()   

Returns type of data   

   String []   

getParameters()   

Returns the parameters passed in the ProActiveSend method call   

   byte []   

getData()   

Returns the data as a byte array   

   int   

getCount()   

Returns the number of elements in data array   

   org.objectweb.proactive.mpi.control.util   

   public class ProActiveMPIUtil   

   static int   

bytesToInt(byte[] bytes, int startIndex)   

Given a byte array, restores it as an int   

   static float   

bytesToFloat(byte[] bytes, int startIndex)   

Given a byte array, restores it as a float   

   static short   

bytesToShort(byte[] bytes, int startIndex)   

Given a byte array, restores it as a short   

   static long   

bytesToLong(byte[] bytes, int startIndex)   

Given a byte array, restores it as a long   

   static double   

bytesToDouble(byte[] bytes, int startIndex)   

Given a byte array, restores it as a double   

   static String   

bytesToString(byte[] bytes, int startIndex)   

Given a byte array, restores a string out of it   

   static int   

intTobytes(int num, byte[] bytes, int startIndex)   

Translates int into bytes, stored in byte array   

   static int   

floatToByte(float num, byte[] bytes, int startIndex)   

Translates float into bytes, stored in byte array   

   static int   

shortToBytes(short num, byte[] bytes, int startIndex)   

Translates short into bytes, stored in byte array   

   static int   

stringToBytes(String str, byte[] bytes, int startIndex)   

Gives a String less than 255 bytes, store it as byte array   

   static int   

longToBytes(long num, byte[] bytes, int startIndex)   

Translates long into bytes, stored in byte array   

   static int   

doubleToBytes(double num, byte[] bytes, int startIndex)   

Translates double into bytes, stored in byte array   

Table 41.4. Java API for MPI message conversion

41.4.2.3. ProActive to MPI Communications

   org.objectweb.proactive.mpi.control   

   public class ProActiveMPICoupling   

   static void   

MPISend(byte[] buf, int count, int datatype, int dest, int tag, int jobID)   

Sends a buffer of bytes to the specified MPI process   

   org.objectweb.proactive.mpi.control   

   public class ProActiveMPIConstants   

   static final int   

MPI_CHAR   

char   

   static final int   

MPI_UNSIGNED_CHAR   

unsigned char   

   static final int   

MPI_BYTE   

byte   

   static final int   

MPI_SHORT   

short   

   static final int   

MPI_UNSIGNED_SHORT   

unsigned short   

   static final int   

MPI_INT   

int   

   static final int   

MPI_UNSIGNED   

unsigned int   

   static final int   

MPI_LONG   

long   

   static final int   

MPI_UNSIGNED_LONG   

unsigned long   

   static final int   

MPI_FLOAT   

float   

   static final int   

MPI_DOUBLE   

double   

   static final int   

MPI_LONG_DOUBLE   

long double   

   static final int   

MPI_LONG_LONG_INT   

long long int   

Table 41.5. ProActiveMPI API for sending messages to MPI

   int   

ProActiveRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID)   

Performs a blocking receive from MPI side to receive data from a ProActive java class   

   int   

ProActiveIRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID, ProActiveMPI_Request *request)   

Performs a non blocking receive from MPI side to receive data from a ProActive java class   

   int   

ProActiveTest(ProActiveMPI_Request *request, int *flag)   

Tests for the completion of receive from a ProActive java class   

   int   

ProActiveWait(ProActiveMPI_Request *request)   

Waits for an MPI receive from a ProActive java class to complete   

Table 41.6. MPI message reception from ProActive

41.4.2.4. MPI to MPI Communications through ProActive

   int   

ProActiveMPI_Init(int rank)   

Initializes the MPI with ProActive execution environment   

   int   

ProActiveMPI_Job(int *job)   

Initializes the variable with the JOBID   

   int   

ProActiveMPI_Finalize()   

Terminates MPI with ProActive execution environment   

   int   

ProActiveMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, int jobID)   

Performs a basic send   

   int   

ProActiveMPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID)   

Performs a basic Recv   

   int   

ProActiveMPI_IRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID, ProActiveMPI_Request *request)   

Performs a non blocking receive   

   int   

ProActiveMPI_Test(ProActiveMPI_Request *request, int *flag)   

Tests for the completion of receive   

   int   

ProActiveMPI_Wait(ProActiveMPI_Request *request)   

Waits for an MPI receive to complete   

   int   

ProActiveMPI_AllSend(void *buf, int count, MPI_Datatype datatype, int tag, int jobID)   

Performs a basic send to all processes of a remote job   

   int   

ProActiveMPI_Barrier(int jobID)   

Blocks until all process of the specified job have reached this routine   

Table 41.7. MPI to MPI through ProActive C API

Datatypes: MPI_CHAR, MPI_UNSIGNED_CHAR, MPI_BYTE, MPI_SHORT, MPI_UNSIGNED_SHORT, MPI_INT, MPI_UNSIGNED, MPI_LONG, MPI_UNSIGNED_LONG, MPI_FLOAT, MPI_DOUBLE, MPI_LONG_DOUBLE, MPI_LONG_LONG_INT

   Call   

PROACTIVEMPI_INIT(rank, err)   

integer :: rank, err

Initializes the MPI with ProActive execution environment   

   Call   

PROACTIVEMPI_JOB(job, err)   

integer :: job, err

Initializes the job environment variable   

   Call   

PROACTIVEMPI_FINALIZE(err)   

integer :: err

Terminates MPI with ProActive execution environment   

   Call   

PROACTIVEMPI_SEND(buf, count, datatype, dest, tag, jobID, err)   

< type >, dimension(*) :: buf

integer :: count, datatype, dest, tag, jobID, err

Performs a basic send   

   Call   

PROACTIVEMPI_RECV(buf, count, datatype, src, tag, jobID, err)   

< type >, dimension(*) :: buf

integer :: count, datatype, src, tag, jobID, err

Performs a basic Recv   

   Call   

PROACTIVEMPI_ALLSEND(buf, count, datatype, tag, jobID, err)   

< type >, dimension(*) :: buf

integer :: count, datatype, tag, jobID, err

Performs a basic send to all processes of a remote job   

   Call   

PROACTIVEMPI_BARRIER(jobID, err)   

integer :: jobID, err

Blocks until all process of the specified job have reached this routine   

Table 41.8. MPI to MPI through ProActive Fortran API

Datatypes: MPI_CHARACTER, MPI_BYTE, MPI_INTEGER, MPI_DOUBLE