00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.process.globus;
00032
00033 import org.apache.log4j.Logger;
00034 import org.globus.gram.*;
00035 import org.globus.io.gass.server.*;
00036
00037
00038 import org.globus.util.deactivator.Deactivator;
00039 import org.gridforum.jgss.ExtendedGSSManager;
00040 import org.ietf.jgss.GSSCredential;
00041 import org.objectweb.proactive.core.util.log.Loggers;
00042 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00043
00044
00048 public class GridJob implements GramJobListener {
00049 static Logger logger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_PROCESS);
00050 private GassServer m_gassServer;
00051 private String m_gassURL = null;
00052 private GramJob m_job = null;
00053 private String m_jobOutput = "";
00054 private boolean m_batch = false;
00055 private String m_remoteHost = null;
00056 private String m_jobid = null;
00057 int options = org.globus.io.gass.server.GassServer.READ_ENABLE |
00058 org.globus.io.gass.server.GassServer.WRITE_ENABLE |
00059 org.globus.io.gass.server.GassServer.STDOUT_ENABLE |
00060 org.globus.io.gass.server.GassServer.STDERR_ENABLE;
00061
00062
00063 public GridJob(String Contact, boolean batch) {
00064 m_remoteHost = Contact;
00065 m_batch = batch;
00066 }
00067
00068 public String GlobusRun(String RSL) {
00069 String newRSL = null;
00070 try {
00071
00072
00073 ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager.getInstance();
00074 GSSCredential cred = manager.createCredential(GSSCredential.INITIATE_AND_ACCEPT);
00075 if (cred == null) {
00076 logger.error("credential null");
00077 }
00078
00079
00080
00081
00082 if ((RSL.indexOf("stdout") < 0) && (RSL.indexOf("stderr") < 0)) {
00083
00084 if (!startGassServer(cred)) {
00085 throw new Exception("Unable to start GASS server.");
00086 }
00087
00088
00089 initJobOutListeners();
00090
00091
00092
00093 if (!m_batch) {
00094 newRSL = "&" + RSL.substring(0, RSL.indexOf('&')) +
00095 "(rsl_substitution=(GLOBUSRUN_GASS_URL " + m_gassURL +
00096 "))" +
00097 RSL.substring(RSL.indexOf('&') + 1, RSL.length()) +
00098 "(stdout=$(GLOBUSRUN_GASS_URL)/dev/stdout-5)(stderr=$(GLOBUSRUN_GASS_URL)/dev/sterr-5)";
00099
00100 } else {
00101
00102 newRSL = RSL +
00103 "(stdout=x-gass-cache://$(GLOBUS_GRAM_JOB_CONTACT)stdout anExtraTag)" +
00104 "(stderr=x-gass-cache://$(GLOBUS_GRAM_JOB_CONTACT)stderr anExtraTag)";
00105 }
00106 } else {
00107 newRSL = RSL;
00108 }
00109 Gram.ping(m_remoteHost);
00110 logger.info("ping successfull");
00111 logger.info(newRSL);
00112 m_job = new GramJob(newRSL);
00113
00114
00115
00116 m_job.setCredentials(cred);
00117
00118
00119
00120
00121 m_job.addListener(this);
00122
00123 logger.info("Sending job request to: " + m_remoteHost);
00124 m_job.request(m_remoteHost, m_batch, false);
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137 m_jobOutput = "Job sent. url=" + m_job.getIDAsString();
00138
00139 } catch (Exception ex) {
00140 if (m_gassServer != null) {
00141
00142 m_gassServer.unregisterJobOutputStream("err-5");
00143 m_gassServer.unregisterJobOutputStream("out-5");
00144 }
00145
00146 m_jobOutput = "Error submitting job: " + ex.getClass() + ":" +
00147 ex.getMessage();
00148 ex.printStackTrace();
00149 }
00150
00151
00152 Deactivator.deactivateAll();
00153 return m_jobOutput;
00154 }
00155
00160 private boolean startGassServer(GSSCredential cred) {
00161 if (m_gassServer != null) {
00162 return true;
00163 }
00164
00165 try {
00166 m_gassServer = new GassServer(cred, 0);
00167 m_gassServer.setOptions(options);
00168 m_gassURL = m_gassServer.getURL();
00169 logger.info("gass server started succesfully " + m_gassURL);
00170 } catch (Exception e) {
00171 logger.error("gass server failed to start!");
00172 e.printStackTrace();
00173
00174 return false;
00175 }
00176
00177 m_gassServer.registerDefaultDeactivator();
00178
00179 return true;
00180 }
00181
00185 private void initJobOutListeners() throws Exception {
00186
00187 JobOutputListenerImpl outListener = new JobOutputListenerImpl();
00188 JobOutputStream outStream = new JobOutputStream(outListener);
00189 m_gassServer.registerJobOutputStream("out-5", outStream);
00190 m_gassServer.registerJobOutputStream("err-5", outStream);
00191 }
00192
00193 public void statusChanged(GramJob job) {
00194 String status = job.getStatusAsString();
00195 logger.info("status changed " + status);
00196
00197 try {
00198 if (job.getStatus() == GramJob.STATUS_ACTIVE) {
00199
00200 m_jobOutput = "Job sent. url=" + job.getIDAsString();
00201 logger.info(m_jobOutput);
00202
00203
00204
00205
00206
00207 }
00208 } catch (Exception ex) {
00209 logger.error("statusChanged Error:" + ex.getMessage());
00210 ex.printStackTrace();
00211 }
00212 }
00213
00214 private class JobOutputListenerImpl implements JobOutputListener {
00215 public void outputClosed() {
00216 logger.info("output closed");
00217 }
00218
00219 public void outputChanged(String output) {
00220 m_jobOutput += output;
00221 logger.info("output changed: " + m_jobOutput);
00222 }
00223 }
00224 }