org/objectweb/proactive/core/process/lsf/LSFBSubProcess.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.process.lsf;
00032 
00033 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator;
00034 import org.objectweb.proactive.core.process.ExternalProcess;
00035 import org.objectweb.proactive.core.process.MessageSink;
00036 import org.objectweb.proactive.core.process.SimpleExternalProcess;
00037 import org.objectweb.proactive.core.process.UniversalProcess;
00038 import org.objectweb.proactive.core.util.RemoteProcessMessageLogger;
00039 
00040 
00062 public class LSFBSubProcess extends AbstractExternalProcessDecorator {
00063     protected static final String FILE_SEPARATOR = System.getProperty(
00064             "file.separator");
00065     protected static final String DEFAULT_SCRIPT_LOCATION = System.getProperty(
00066             "user.home") + FILE_SEPARATOR + "ProActive" + FILE_SEPARATOR +
00067         "scripts" + FILE_SEPARATOR + "unix" + FILE_SEPARATOR + "cluster" +
00068         FILE_SEPARATOR + "startRuntime.sh ";
00069     public final static String DEFAULT_LSFPATH = FILE_SEPARATOR + "usr" +
00070         FILE_SEPARATOR + "local" + FILE_SEPARATOR + "lsf" + FILE_SEPARATOR +
00071         "bin";
00072     public final static String DEFAULT_BSUBPATH = DEFAULT_LSFPATH +
00073         FILE_SEPARATOR + "bsub";
00074     public final static String DEFAULT_BJOBPATH = DEFAULT_LSFPATH +
00075         FILE_SEPARATOR + "bjobs";
00076     public static final String DEFAULT_QUEUE_NAME = "normal";
00077     protected static final String DEFAULT_PROCESSOR_NUMBER = "1";
00078     protected int jobID;
00079     protected String queueName = DEFAULT_QUEUE_NAME;
00080     protected String hostList;
00081     protected String scriptLocation = DEFAULT_SCRIPT_LOCATION;
00082     protected String processor = DEFAULT_PROCESSOR_NUMBER;
00083     protected String interactive = "false";
00084     protected String res_requirement = "";
00085     protected String jobname;
00086 
00087     //
00088     // -- CONSTRUCTORS -----------------------------------------------
00089     //
00090 
00095     public LSFBSubProcess() {
00096         super();
00097         setCompositionType(GIVE_COMMAND_AS_PARAMETER);
00098         this.hostname = null;
00099         this.command_path = DEFAULT_BSUBPATH;
00100     }
00101 
00107     public LSFBSubProcess(ExternalProcess targetProcess) {
00108         super(targetProcess);
00109         this.hostname = null;
00110         this.command_path = DEFAULT_BSUBPATH;
00111     }
00112 
00113     //
00114     // -- PUBLIC METHODS -----------------------------------------------
00115     //
00116     public void setInputMessageLogger(
00117         RemoteProcessMessageLogger inputMessageLogger) {
00118         super.setInputMessageLogger(new CompositeMessageLogger(
00119                 new ParserMessageLogger(), inputMessageLogger));
00120     }
00121 
00122     public void setOutputMessageSink(MessageSink outputMessageSink) {
00123         if (outputMessageSink == null) {
00124             super.setOutputMessageSink(new SimpleMessageSink());
00125         } else {
00126             super.setOutputMessageSink(outputMessageSink);
00127         }
00128     }
00129 
00135     public static ExternalProcess buildBKillProcess(int jobID) {
00136         return new SimpleExternalProcess("bkill " + jobID);
00137     }
00138 
00139     public static void main(String[] args) {
00140         try {
00141             LSFBSubProcess p = new LSFBSubProcess(new SimpleExternalProcess(
00142                         "ls -lsa"));
00143             p.startProcess();
00144         } catch (Exception e) {
00145             e.printStackTrace();
00146         }
00147     }
00148 
00153     public int getJobID() {
00154         return jobID;
00155     }
00156 
00161     public String getQueueName() {
00162         return queueName;
00163     }
00164 
00169     public void setQueueName(String queueName) {
00170         checkStarted();
00171         if (queueName == null) {
00172             throw new NullPointerException();
00173         }
00174         this.queueName = queueName;
00175     }
00176 
00181     public void setHostList(String hostList) {
00182         checkStarted();
00183         this.hostList = hostList;
00184     }
00185 
00190     public String getHostList() {
00191         return hostList;
00192     }
00193 
00198     public String isInteractive() {
00199         return interactive;
00200     }
00201 
00206     public void setInteractive(String interactive) {
00207         this.interactive = interactive;
00208     }
00209 
00214     public void setProcessorNumber(String processor) {
00215         checkStarted();
00216         if (processor != null) {
00217             this.processor = processor;
00218         }
00219     }
00220 
00224     public String getProcessId() {
00225         return "lsf_" + targetProcess.getProcessId();
00226     }
00227 
00231     public int getNodeNumber() {
00232         return (new Integer(getProcessorNumber()).intValue());
00233     }
00234 
00238     public UniversalProcess getFinalProcess() {
00239         checkStarted();
00240         ;
00241         return targetProcess.getFinalProcess();
00242     }
00243 
00248     public String getProcessorNumber() {
00249         return processor;
00250     }
00251 
00252     public void setScriptLocation(String location) {
00253         checkStarted();
00254         if (location != null) {
00255             this.scriptLocation = location;
00256         }
00257     }
00258 
00259     public String getScriptLocation() {
00260         return scriptLocation;
00261     }
00262 
00263     public String getRes_requirement() {
00264         return res_requirement;
00265     }
00266 
00267     public void setRes_requirement(String res_requirement) {
00268         this.res_requirement = "-R " + res_requirement + " ";
00269     }
00270 
00271     //
00272     // -- PROTECTED METHODS -----------------------------------------------
00273     //
00274     protected String internalBuildCommand() {
00275         return buildEnvironmentCommand() + buildBSubCommand();
00276     }
00277 
00278     protected String buildBSubCommand() {
00279         StringBuilder bSubCommand = new StringBuilder();
00280         bSubCommand.append(command_path);
00281         if (interactive.equals("true")) {
00282             bSubCommand.append(" -I");
00283         }
00284         bSubCommand.append(" -n " + processor + " -q " + queueName + " ");
00285         if (hostList != null) {
00286             bSubCommand.append("-m " + hostList + " ");
00287         }
00288         if (jobname != null) {
00289             bSubCommand.append("-J " + jobname + " ");
00290         }
00291         if (getCompositionType() == GIVE_COMMAND_AS_PARAMETER) {
00292             bSubCommand.append(getRes_requirement() + scriptLocation + " " +
00293                 getTargetProcess().getCommand());
00294         }
00295 
00296         //logger.info("bsub command is "+bSubCommand.toString());
00297         return bSubCommand.toString();
00298     }
00299 
00300     protected String buildBJobsCommand() {
00301         return DEFAULT_BJOBPATH + " " + jobID;
00302     }
00303 
00311     protected int parseJobID(String message) {
00312         if (logger.isDebugEnabled()) {
00313             logger.debug("parseJobID analyzing " + message);
00314         }
00315         String beginJobIDMarkup = "Job <";
00316         String endJobIDMarkup = ">";
00317         int n1 = message.indexOf(beginJobIDMarkup);
00318         if (n1 == -1) {
00319             return 0;
00320         }
00321         int n2 = message.indexOf(endJobIDMarkup, n1 +
00322                 beginJobIDMarkup.length());
00323         if (n2 == -1) {
00324             return 0;
00325         }
00326         String id = message.substring(n1 + beginJobIDMarkup.length(), n2);
00327         if (logger.isDebugEnabled()) {
00328             logger.debug("!!!!!!!!!!!!!! JOBID = " + id);
00329         }
00330         try {
00331             return Integer.parseInt(id);
00332         } catch (NumberFormatException e) {
00333             return 0;
00334         }
00335     }
00336 
00351     protected String parseHostname(String message) {
00352         if (logger.isDebugEnabled()) {
00353             logger.debug("parseHostname analyzing " + message);
00354         }
00355         java.util.StringTokenizer st = new java.util.StringTokenizer(message);
00356         if (st.countTokens() < 6) {
00357             return null; // we expect at least 6 tokens
00358         }
00359         try {
00360             int currentJobID = Integer.parseInt(st.nextToken());
00361             if (currentJobID != jobID) {
00362                 return null; // not the same id
00363             }
00364         } catch (NumberFormatException e) {
00365             return null;
00366         }
00367         st.nextToken(); // ignore user
00368         String status = st.nextToken();
00369         if (status.equals("PEND")) {
00370             return ""; // not running yet
00371         }
00372         st.nextToken(); // ignore queue
00373         st.nextToken(); // ignore fromHost
00374         String hostname = st.nextToken();
00375         if (logger.isDebugEnabled()) {
00376             logger.debug("!!!!!!!!!!!!!! hostname = " + hostname);
00377         }
00378         logger.info("token " + st.countTokens());
00379         return hostname;
00380     }
00381 
00382     protected void sendJobDetailsCommand() {
00383         outputMessageSink.setMessage(buildBJobsCommand());
00384     }
00385 
00386     //
00387     // -- PRIVATE METHODS -----------------------------------------------
00388     //
00389     //
00390     // -- INNER CLASSES -----------------------------------------------
00391     //
00392 
00396     public class ParserMessageLogger implements RemoteProcessMessageLogger,
00397         java.io.Serializable {
00398         private boolean foundJobID;
00399         private boolean foundHostname;
00400 
00401         public ParserMessageLogger() {
00402         }
00403 
00404         public void log(String message) {
00405             //int nbProcessor = (new Integer(processor)).intValue();
00406             //parseHostname(message);
00407             if (!foundJobID) {
00408                 jobID = parseJobID(message);
00409                 foundJobID = jobID != 0;
00410                 if (foundJobID) {
00411                     sendJobDetailsCommand();
00412                 }
00413             } else if (!foundHostname) {
00414                 hostname = parseHostname(message);
00415                 if (hostname != null) {
00416                     //int counter=1;
00417                     foundHostname = hostname.length() > 0;
00418                     //while(counter < nbProcessor){
00419                     //parseHostname(message);
00420                     //counter ++;
00421                     //}
00422                     if (foundHostname) {
00423                         // we are done
00424                         outputMessageSink.setMessage(null);
00425                     } else {
00426                         // send another command to fetch the hostname
00427                         try {
00428                             Thread.sleep(2000);
00429                         } catch (InterruptedException e) {
00430                         }
00431                         sendJobDetailsCommand();
00432                     }
00433                 }
00434             }
00435         }
00436 
00437         public void log(Throwable t) {
00438         }
00439 
00440         public void log(String message, Throwable t) {
00441         }
00442     } // end inner class CompositeMessageLogger
00443 
00444     public String getJobname() {
00445         return jobname;
00446     }
00447 
00448     public void setJobname(String jobname) {
00449         this.jobname = jobname;
00450     }
00451 }

Generated on Mon Jan 22 15:16:08 2007 for ProActive by  doxygen 1.5.1