00001 /* 00002 * ################################################################ 00003 * 00004 * ProActive: The Java(TM) library for Parallel, Distributed, 00005 * Concurrent computing with Security and Mobility 00006 * 00007 * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis 00008 * Contact: proactive@objectweb.org 00009 * 00010 * This library is free software; you can redistribute it and/or 00011 * modify it under the terms of the GNU Lesser General Public 00012 * License as published by the Free Software Foundation; either 00013 * version 2.1 of the License, or any later version. 00014 * 00015 * This library is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 * Lesser General Public License for more details. 00019 * 00020 * You should have received a copy of the GNU Lesser General Public 00021 * License along with this library; if not, write to the Free Software 00022 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 00023 * USA 00024 * 00025 * Initial developer(s): The ProActive Team 00026 * http://www.inria.fr/oasis/ProActive/contacts.html 00027 * Contributor(s): 00028 * 00029 * ################################################################ 00030 */ 00031 package org.objectweb.proactive.core.process.lsf; 00032 00033 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator; 00034 import org.objectweb.proactive.core.process.ExternalProcess; 00035 import org.objectweb.proactive.core.process.MessageSink; 00036 import org.objectweb.proactive.core.process.SimpleExternalProcess; 00037 import org.objectweb.proactive.core.process.UniversalProcess; 00038 import org.objectweb.proactive.core.util.RemoteProcessMessageLogger; 00039 00040 00062 public class LSFBSubProcess extends AbstractExternalProcessDecorator { 00063 protected static final String FILE_SEPARATOR = System.getProperty( 00064 "file.separator"); 00065 protected static final String DEFAULT_SCRIPT_LOCATION = System.getProperty( 00066 "user.home") + FILE_SEPARATOR + "ProActive" + FILE_SEPARATOR + 00067 "scripts" + FILE_SEPARATOR + "unix" + FILE_SEPARATOR + "cluster" + 00068 FILE_SEPARATOR + "startRuntime.sh "; 00069 public final static String DEFAULT_LSFPATH = FILE_SEPARATOR + "usr" + 00070 FILE_SEPARATOR + "local" + FILE_SEPARATOR + "lsf" + FILE_SEPARATOR + 00071 "bin"; 00072 public final static String DEFAULT_BSUBPATH = DEFAULT_LSFPATH + 00073 FILE_SEPARATOR + "bsub"; 00074 public final static String DEFAULT_BJOBPATH = DEFAULT_LSFPATH + 00075 FILE_SEPARATOR + "bjobs"; 00076 public static final String DEFAULT_QUEUE_NAME = "normal"; 00077 protected static final String DEFAULT_PROCESSOR_NUMBER = "1"; 00078 protected int jobID; 00079 protected String queueName = DEFAULT_QUEUE_NAME; 00080 protected String hostList; 00081 protected String scriptLocation = DEFAULT_SCRIPT_LOCATION; 00082 protected String processor = DEFAULT_PROCESSOR_NUMBER; 00083 protected String interactive = "false"; 00084 protected String res_requirement = ""; 00085 protected String jobname; 00086 00087 // 00088 // -- CONSTRUCTORS ----------------------------------------------- 00089 // 00090 00095 public LSFBSubProcess() { 00096 super(); 00097 setCompositionType(GIVE_COMMAND_AS_PARAMETER); 00098 this.hostname = null; 00099 this.command_path = DEFAULT_BSUBPATH; 00100 } 00101 00107 public LSFBSubProcess(ExternalProcess targetProcess) { 00108 super(targetProcess); 00109 this.hostname = null; 00110 this.command_path = DEFAULT_BSUBPATH; 00111 } 00112 00113 // 00114 // -- PUBLIC METHODS ----------------------------------------------- 00115 // 00116 public void setInputMessageLogger( 00117 RemoteProcessMessageLogger inputMessageLogger) { 00118 super.setInputMessageLogger(new CompositeMessageLogger( 00119 new ParserMessageLogger(), inputMessageLogger)); 00120 } 00121 00122 public void setOutputMessageSink(MessageSink outputMessageSink) { 00123 if (outputMessageSink == null) { 00124 super.setOutputMessageSink(new SimpleMessageSink()); 00125 } else { 00126 super.setOutputMessageSink(outputMessageSink); 00127 } 00128 } 00129 00135 public static ExternalProcess buildBKillProcess(int jobID) { 00136 return new SimpleExternalProcess("bkill " + jobID); 00137 } 00138 00139 public static void main(String[] args) { 00140 try { 00141 LSFBSubProcess p = new LSFBSubProcess(new SimpleExternalProcess( 00142 "ls -lsa")); 00143 p.startProcess(); 00144 } catch (Exception e) { 00145 e.printStackTrace(); 00146 } 00147 } 00148 00153 public int getJobID() { 00154 return jobID; 00155 } 00156 00161 public String getQueueName() { 00162 return queueName; 00163 } 00164 00169 public void setQueueName(String queueName) { 00170 checkStarted(); 00171 if (queueName == null) { 00172 throw new NullPointerException(); 00173 } 00174 this.queueName = queueName; 00175 } 00176 00181 public void setHostList(String hostList) { 00182 checkStarted(); 00183 this.hostList = hostList; 00184 } 00185 00190 public String getHostList() { 00191 return hostList; 00192 } 00193 00198 public String isInteractive() { 00199 return interactive; 00200 } 00201 00206 public void setInteractive(String interactive) { 00207 this.interactive = interactive; 00208 } 00209 00214 public void setProcessorNumber(String processor) { 00215 checkStarted(); 00216 if (processor != null) { 00217 this.processor = processor; 00218 } 00219 } 00220 00224 public String getProcessId() { 00225 return "lsf_" + targetProcess.getProcessId(); 00226 } 00227 00231 public int getNodeNumber() { 00232 return (new Integer(getProcessorNumber()).intValue()); 00233 } 00234 00238 public UniversalProcess getFinalProcess() { 00239 checkStarted(); 00240 ; 00241 return targetProcess.getFinalProcess(); 00242 } 00243 00248 public String getProcessorNumber() { 00249 return processor; 00250 } 00251 00252 public void setScriptLocation(String location) { 00253 checkStarted(); 00254 if (location != null) { 00255 this.scriptLocation = location; 00256 } 00257 } 00258 00259 public String getScriptLocation() { 00260 return scriptLocation; 00261 } 00262 00263 public String getRes_requirement() { 00264 return res_requirement; 00265 } 00266 00267 public void setRes_requirement(String res_requirement) { 00268 this.res_requirement = "-R " + res_requirement + " "; 00269 } 00270 00271 // 00272 // -- PROTECTED METHODS ----------------------------------------------- 00273 // 00274 protected String internalBuildCommand() { 00275 return buildEnvironmentCommand() + buildBSubCommand(); 00276 } 00277 00278 protected String buildBSubCommand() { 00279 StringBuilder bSubCommand = new StringBuilder(); 00280 bSubCommand.append(command_path); 00281 if (interactive.equals("true")) { 00282 bSubCommand.append(" -I"); 00283 } 00284 bSubCommand.append(" -n " + processor + " -q " + queueName + " "); 00285 if (hostList != null) { 00286 bSubCommand.append("-m " + hostList + " "); 00287 } 00288 if (jobname != null) { 00289 bSubCommand.append("-J " + jobname + " "); 00290 } 00291 if (getCompositionType() == GIVE_COMMAND_AS_PARAMETER) { 00292 bSubCommand.append(getRes_requirement() + scriptLocation + " " + 00293 getTargetProcess().getCommand()); 00294 } 00295 00296 //logger.info("bsub command is "+bSubCommand.toString()); 00297 return bSubCommand.toString(); 00298 } 00299 00300 protected String buildBJobsCommand() { 00301 return DEFAULT_BJOBPATH + " " + jobID; 00302 } 00303 00311 protected int parseJobID(String message) { 00312 if (logger.isDebugEnabled()) { 00313 logger.debug("parseJobID analyzing " + message); 00314 } 00315 String beginJobIDMarkup = "Job <"; 00316 String endJobIDMarkup = ">"; 00317 int n1 = message.indexOf(beginJobIDMarkup); 00318 if (n1 == -1) { 00319 return 0; 00320 } 00321 int n2 = message.indexOf(endJobIDMarkup, n1 + 00322 beginJobIDMarkup.length()); 00323 if (n2 == -1) { 00324 return 0; 00325 } 00326 String id = message.substring(n1 + beginJobIDMarkup.length(), n2); 00327 if (logger.isDebugEnabled()) { 00328 logger.debug("!!!!!!!!!!!!!! JOBID = " + id); 00329 } 00330 try { 00331 return Integer.parseInt(id); 00332 } catch (NumberFormatException e) { 00333 return 0; 00334 } 00335 } 00336 00351 protected String parseHostname(String message) { 00352 if (logger.isDebugEnabled()) { 00353 logger.debug("parseHostname analyzing " + message); 00354 } 00355 java.util.StringTokenizer st = new java.util.StringTokenizer(message); 00356 if (st.countTokens() < 6) { 00357 return null; // we expect at least 6 tokens 00358 } 00359 try { 00360 int currentJobID = Integer.parseInt(st.nextToken()); 00361 if (currentJobID != jobID) { 00362 return null; // not the same id 00363 } 00364 } catch (NumberFormatException e) { 00365 return null; 00366 } 00367 st.nextToken(); // ignore user 00368 String status = st.nextToken(); 00369 if (status.equals("PEND")) { 00370 return ""; // not running yet 00371 } 00372 st.nextToken(); // ignore queue 00373 st.nextToken(); // ignore fromHost 00374 String hostname = st.nextToken(); 00375 if (logger.isDebugEnabled()) { 00376 logger.debug("!!!!!!!!!!!!!! hostname = " + hostname); 00377 } 00378 logger.info("token " + st.countTokens()); 00379 return hostname; 00380 } 00381 00382 protected void sendJobDetailsCommand() { 00383 outputMessageSink.setMessage(buildBJobsCommand()); 00384 } 00385 00386 // 00387 // -- PRIVATE METHODS ----------------------------------------------- 00388 // 00389 // 00390 // -- INNER CLASSES ----------------------------------------------- 00391 // 00392 00396 public class ParserMessageLogger implements RemoteProcessMessageLogger, 00397 java.io.Serializable { 00398 private boolean foundJobID; 00399 private boolean foundHostname; 00400 00401 public ParserMessageLogger() { 00402 } 00403 00404 public void log(String message) { 00405 //int nbProcessor = (new Integer(processor)).intValue(); 00406 //parseHostname(message); 00407 if (!foundJobID) { 00408 jobID = parseJobID(message); 00409 foundJobID = jobID != 0; 00410 if (foundJobID) { 00411 sendJobDetailsCommand(); 00412 } 00413 } else if (!foundHostname) { 00414 hostname = parseHostname(message); 00415 if (hostname != null) { 00416 //int counter=1; 00417 foundHostname = hostname.length() > 0; 00418 //while(counter < nbProcessor){ 00419 //parseHostname(message); 00420 //counter ++; 00421 //} 00422 if (foundHostname) { 00423 // we are done 00424 outputMessageSink.setMessage(null); 00425 } else { 00426 // send another command to fetch the hostname 00427 try { 00428 Thread.sleep(2000); 00429 } catch (InterruptedException e) { 00430 } 00431 sendJobDetailsCommand(); 00432 } 00433 } 00434 } 00435 } 00436 00437 public void log(Throwable t) { 00438 } 00439 00440 public void log(String message, Throwable t) { 00441 } 00442 } // end inner class CompositeMessageLogger 00443 00444 public String getJobname() { 00445 return jobname; 00446 } 00447 00448 public void setJobname(String jobname) { 00449 this.jobname = jobname; 00450 } 00451 }