org/objectweb/proactive/core/process/globus/GridJob.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.process.globus;
00032 
00033 import org.apache.log4j.Logger;
00034 import org.globus.gram.*;
00035 import org.globus.io.gass.server.*;
00036 
00037 //import org.globus.security.*;
00038 import org.globus.util.deactivator.Deactivator;
00039 import org.gridforum.jgss.ExtendedGSSManager;
00040 import org.ietf.jgss.GSSCredential;
00041 import org.objectweb.proactive.core.util.log.Loggers;
00042 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00043 
00044 
00048 public class GridJob implements GramJobListener {
00049     static Logger logger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_PROCESS);
00050     private GassServer m_gassServer; // GASS Server: required to get job output
00051     private String m_gassURL = null; // URL of the GASS server
00052     private GramJob m_job = null; // GRAM JOB to be executed
00053     private String m_jobOutput = ""; // job output as string
00054     private boolean m_batch = false; // Submission modes:
00055     private String m_remoteHost = null; // host where job will run
00056     private String m_jobid = null; // Globus job id on the form: 
00057     int options = org.globus.io.gass.server.GassServer.READ_ENABLE |
00058         org.globus.io.gass.server.GassServer.WRITE_ENABLE |
00059         org.globus.io.gass.server.GassServer.STDOUT_ENABLE |
00060         org.globus.io.gass.server.GassServer.STDERR_ENABLE;
00061 
00062     //https://server.com:39374/15621/1021382777/
00063     public GridJob(String Contact, boolean batch) {
00064         m_remoteHost = Contact; // remote host
00065         m_batch = batch; // submission mode
00066     }
00067 
00068     public String GlobusRun(String RSL) {
00069         String newRSL = null;
00070         try {
00071             // load default Globus proxy. Java CoG kit must be installed 
00072             // and a user certificate setup properly
00073             ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager.getInstance();
00074             GSSCredential cred = manager.createCredential(GSSCredential.INITIATE_AND_ACCEPT);
00075             if (cred == null) {
00076                 logger.error("credential null");
00077             }
00078 
00079             //We check if both stdout and stderr are already set
00080             //We should test if one of both, but usually we set both or none
00081             //This might be change if it doesn't fit user requirements
00082             if ((RSL.indexOf("stdout") < 0) && (RSL.indexOf("stderr") < 0)) {
00083                 // Start GASS server
00084                 if (!startGassServer(cred)) {
00085                     throw new Exception("Unable to start GASS server.");
00086                 }
00087 
00088                 //         setup Job Output listeners
00089                 initJobOutListeners();
00090 
00091                 // Append GASS URL to job String so we can get some output back
00092                 // if non-batch, then get some output back
00093                 if (!m_batch) {
00094                     newRSL = "&" + RSL.substring(0, RSL.indexOf('&')) +
00095                         "(rsl_substitution=(GLOBUSRUN_GASS_URL " + m_gassURL +
00096                         "))" +
00097                         RSL.substring(RSL.indexOf('&') + 1, RSL.length()) +
00098                         "(stdout=$(GLOBUSRUN_GASS_URL)/dev/stdout-5)(stderr=$(GLOBUSRUN_GASS_URL)/dev/sterr-5)";
00099                     //newRSL = RSL;
00100                 } else {
00101                     //     format batching RSL so output can be retrieved later on using any GTK commands
00102                     newRSL = RSL +
00103                         "(stdout=x-gass-cache://$(GLOBUS_GRAM_JOB_CONTACT)stdout anExtraTag)" +
00104                         "(stderr=x-gass-cache://$(GLOBUS_GRAM_JOB_CONTACT)stderr anExtraTag)";
00105                 }
00106             } else {
00107                 newRSL = RSL;
00108             }
00109             Gram.ping(m_remoteHost);
00110             logger.info("ping successfull");
00111             logger.info(newRSL);
00112             m_job = new GramJob(newRSL);
00113 
00114             //     set proxy. CoG kit and user credentials must be installed and set 
00115             //     up properly
00116             m_job.setCredentials(cred);
00117 
00118             // if non-batch then listen for output
00119             //            if (!m_batch) {
00120             //m_job.addListener(new GramJobListenerImpl());
00121             m_job.addListener(this);
00122             // }
00123             logger.info("Sending job request to: " + m_remoteHost);
00124             m_job.request(m_remoteHost, m_batch, false);
00125 
00126             // Wait for job to complete
00127             //         if (!m_batch) {
00128             //                synchronized (this) {
00129             //                    try {
00130             //                        wait();
00131             //                    } catch (Exception e) {
00132             //                          e.printStackTrace();
00133             //                    }
00134             // }
00135             //} else {
00136             // do not wait for job. Return immediately
00137             m_jobOutput = "Job sent. url=" + m_job.getIDAsString();
00138             // }
00139         } catch (Exception ex) {
00140             if (m_gassServer != null) {
00141                 // unregister from gass server
00142                 m_gassServer.unregisterJobOutputStream("err-5");
00143                 m_gassServer.unregisterJobOutputStream("out-5");
00144             }
00145 
00146             m_jobOutput = "Error submitting job: " + ex.getClass() + ":" +
00147                 ex.getMessage();
00148             ex.printStackTrace();
00149         }
00150 
00151         // cleanup
00152         Deactivator.deactivateAll();
00153         return m_jobOutput;
00154     }
00155 
00160     private boolean startGassServer(GSSCredential cred) {
00161         if (m_gassServer != null) {
00162             return true;
00163         }
00164 
00165         try {
00166             m_gassServer = new GassServer(cred, 0);
00167             m_gassServer.setOptions(options);
00168             m_gassURL = m_gassServer.getURL();
00169             logger.info("gass server started succesfully " + m_gassURL);
00170         } catch (Exception e) {
00171             logger.error("gass server failed to start!");
00172             e.printStackTrace();
00173 
00174             return false;
00175         }
00176 
00177         m_gassServer.registerDefaultDeactivator();
00178 
00179         return true;
00180     }
00181 
00185     private void initJobOutListeners() throws Exception {
00186         // job output vars
00187         JobOutputListenerImpl outListener = new JobOutputListenerImpl();
00188         JobOutputStream outStream = new JobOutputStream(outListener);
00189         m_gassServer.registerJobOutputStream("out-5", outStream);
00190         m_gassServer.registerJobOutputStream("err-5", outStream);
00191     }
00192 
00193     public void statusChanged(GramJob job) {
00194         String status = job.getStatusAsString();
00195         logger.info("status changed " + status);
00196 
00197         try {
00198             if (job.getStatus() == GramJob.STATUS_ACTIVE) {
00199                 // notify waiting thread when job ready
00200                 m_jobOutput = "Job sent. url=" + job.getIDAsString();
00201                 logger.info(m_jobOutput);
00202 
00203                 // if notify enabled return URL as output
00204                 //                                              synchronized (this) {
00205                 //                                                      notify();
00206                 //                                              }
00207             }
00208         } catch (Exception ex) {
00209             logger.error("statusChanged Error:" + ex.getMessage());
00210             ex.printStackTrace();
00211         }
00212     }
00213 
00214     private class JobOutputListenerImpl implements JobOutputListener {
00215         public void outputClosed() {
00216             logger.info("output closed");
00217         }
00218 
00219         public void outputChanged(String output) {
00220             m_jobOutput += output;
00221             logger.info("output changed: " + m_jobOutput);
00222         }
00223     }
00224 }

Generated on Mon Jan 22 15:16:08 2007 for ProActive by  doxygen 1.5.1