00001 /* 00002 * ################################################################ 00003 * 00004 * ProActive: The Java(TM) library for Parallel, Distributed, 00005 * Concurrent computing with Security and Mobility 00006 * 00007 * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis 00008 * Contact: proactive@objectweb.org 00009 * 00010 * This library is free software; you can redistribute it and/or 00011 * modify it under the terms of the GNU Lesser General Public 00012 * License as published by the Free Software Foundation; either 00013 * version 2.1 of the License, or any later version. 00014 * 00015 * This library is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 * Lesser General Public License for more details. 00019 * 00020 * You should have received a copy of the GNU Lesser General Public 00021 * License along with this library; if not, write to the Free Software 00022 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 00023 * USA 00024 * 00025 * Initial developer(s): The ProActive Team 00026 * http://www.inria.fr/oasis/ProActive/contacts.html 00027 * Contributor(s): 00028 * 00029 * ################################################################ 00030 */ 00031 package org.objectweb.proactive.core.process.pbs; 00032 00033 import java.util.ArrayList; 00034 import java.util.StringTokenizer; 00035 00036 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator; 00037 import org.objectweb.proactive.core.process.ExternalProcess; 00038 import org.objectweb.proactive.core.process.MessageSink; 00039 import org.objectweb.proactive.core.process.UniversalProcess; 00040 00041 00064 public class PBSSubProcess extends AbstractExternalProcessDecorator { 00065 private static final String FILE_SEPARATOR = System.getProperty( 00066 "file.separator"); 00067 public final static String DEFAULT_QSUBPATH = "/usr/local/bin/qsub"; 00068 private static final String DEFAULT_SCRIPT_LOCATION = System.getProperty( 00069 "user.home") + FILE_SEPARATOR + "ProActive" + FILE_SEPARATOR + 00070 "scripts" + FILE_SEPARATOR + "unix" + FILE_SEPARATOR + "cluster" + 00071 FILE_SEPARATOR + "pbsStartRuntime.sh "; 00072 protected static final String DEFAULT_HOSTS_NUMBER = "1"; 00073 protected static final String DEFAULT_PROCESSOR_NUMBER = "1"; 00074 protected static final String DEFAULT_BOOKING_DURATION = "00:01:00"; 00075 protected String hostNumber = DEFAULT_HOSTS_NUMBER; 00076 protected String processorPerNode = DEFAULT_PROCESSOR_NUMBER; 00077 protected String bookingDuration = DEFAULT_BOOKING_DURATION; 00078 protected String interactive = "false"; 00079 protected String outputFile; 00080 protected int jobID; 00081 protected String queueName; 00082 protected String scriptLocation = DEFAULT_SCRIPT_LOCATION; 00083 00084 //Following options is not yet available for pbs, it might be in the future 00085 protected String hostList; 00086 00087 public PBSSubProcess() { 00088 super(); 00089 setCompositionType(GIVE_COMMAND_AS_PARAMETER); 00090 this.hostname = null; 00091 this.command_path = DEFAULT_QSUBPATH; 00092 } 00093 00094 public PBSSubProcess(ExternalProcess targetProcess) { 00095 super(targetProcess); 00096 this.hostname = null; 00097 this.command_path = DEFAULT_QSUBPATH; 00098 } 00099 00100 // ---------------------------------------------------------------------------------------- 00101 //-----------------------Extends AbstractExternalProcessDecorator------------------------- 00102 // ---------------------------------------------------------------------------------------- 00103 00104 public void setOutputMessageSink(MessageSink outputMessageSink) { 00105 if (outputMessageSink == null) { 00106 super.setOutputMessageSink(new SimpleMessageSink()); 00107 } else { 00108 super.setOutputMessageSink(outputMessageSink); 00109 } 00110 } 00111 00116 public String getHostsNumber() { 00117 return this.hostNumber; 00118 } 00119 00124 public void setHostsNumber(String nodeNumber) { 00125 checkStarted(); 00126 if (nodeNumber != null) { 00127 this.hostNumber = nodeNumber; 00128 } 00129 } 00130 00134 public String getProcessId() { 00135 return "pbs_" + targetProcess.getProcessId(); 00136 } 00137 00141 public int getNodeNumber() { 00142 return (new Integer(getProcessorPerNodeNumber()).intValue()) * (new Integer(getHostsNumber()).intValue()); 00143 } 00144 00148 public UniversalProcess getFinalProcess() { 00149 checkStarted(); 00150 return targetProcess.getFinalProcess(); 00151 } 00152 00157 public void setInteractive(String interactive) { 00158 this.interactive = interactive; 00159 } 00160 00165 public void setScriptLocation(String location) { 00166 checkStarted(); 00167 // if (location != null) { 00168 this.scriptLocation = location; 00169 // } 00170 } 00171 00176 public void setOutputFile(String string) { 00177 outputFile = string; 00178 } 00179 00184 public void setBookingDuration(String d) { 00185 this.bookingDuration = d; 00186 } 00187 00191 public String getProcessorPerNodeNumber() { 00192 return this.processorPerNode; 00193 } 00194 00199 public void setProcessorPerNodeNumber(String processorPerNode) { 00200 checkStarted(); 00201 if (processorPerNode != null) { 00202 this.processorPerNode = processorPerNode; 00203 } 00204 } 00205 00210 public void setQueueName(String queueName) { 00211 checkStarted(); 00212 if (queueName == null) { 00213 throw new NullPointerException(); 00214 } 00215 this.queueName = queueName; 00216 } 00217 00223 public void setHostList(String hostList) { 00224 checkStarted(); 00225 this.hostList = hostList; 00226 } 00227 00232 public String getHostList() { 00233 return hostList; 00234 } 00235 00242 protected String parseHostname(String message) { 00243 String result = new String(); 00244 if (logger.isDebugEnabled()) { 00245 logger.debug("parseHostname() analyzing " + message); 00246 } 00247 java.util.StringTokenizer st = new java.util.StringTokenizer(message); 00248 if (st.countTokens() < 2) { 00249 return null; //at least two tokens 00250 } 00251 if (!":".equals(st.nextToken())) { 00252 return null; //should start with : 00253 } 00254 00255 while (st.hasMoreTokens()) { 00256 result += (st.nextToken()); 00257 result += " "; 00258 } 00259 return result; 00260 } 00261 00262 protected String internalBuildCommand() { 00263 return buildEnvironmentCommand(); // + buildPSubCommand(); 00264 } 00265 00266 protected void internalStartProcess(String commandToExecute) { 00267 //java does not seem to be able to deal with command 00268 //where there are quotation marks 00269 //thus we have to divide our command into tokens 00270 //and pass them to the runtime 00271 //tokens are separated by space or quotation marks 00272 StringTokenizer st = new StringTokenizer(commandToExecute, " \"", true); 00273 String token; 00274 ArrayList<String> al = new ArrayList<String>(); 00275 int quotationFound = 0; 00276 boolean commandFound = false; 00277 StringBuilder buff = new StringBuilder(); 00278 while (st.hasMoreTokens()) { 00279 token = (String) st.nextElement(); 00280 if (!commandFound) { 00281 if (token.equals("PROACTIVE_COMMAND=")) { 00282 quotationFound = 0; 00283 buff.append(token); 00284 commandFound = true; 00285 } else { 00286 if (!token.equals(" ")) { 00287 al.add(token); 00288 } 00289 } 00290 } else { 00291 //we have found the command, we are now looking for two quotation mark 00292 if (token.equals("\"")) { 00293 quotationFound++; 00294 buff.append(token); 00295 if (quotationFound == 2) { 00296 al.add(buff.toString()); 00297 commandFound = false; 00298 } 00299 } else { 00300 buff.append(token); 00301 } 00302 } 00303 } 00304 String[] command = al.toArray(new String[] { }); 00305 00306 try { 00307 externalProcess = Runtime.getRuntime().exec(command); 00308 java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader( 00309 externalProcess.getInputStream())); 00310 java.io.BufferedReader err = new java.io.BufferedReader(new java.io.InputStreamReader( 00311 externalProcess.getErrorStream())); 00312 java.io.BufferedWriter out = new java.io.BufferedWriter(new java.io.OutputStreamWriter( 00313 externalProcess.getOutputStream())); 00314 handleProcess(in, out, err); 00315 } catch (java.io.IOException e) { 00316 isFinished = true; 00317 //throw e; 00318 e.printStackTrace(); 00319 } 00320 } 00321 00322 protected String buildCommand() { 00323 StringBuilder qsubCommand = new StringBuilder(); 00324 qsubCommand.append(command_path).append(" "); 00325 if (interactive.equals("true")) { 00326 qsubCommand.append(" -I"); 00327 } 00328 00329 qsubCommand.append(buildResourceString()); 00330 if (outputFile != null) { 00331 qsubCommand.append(" -o ").append(outputFile).append(" "); 00332 } 00333 00334 //the parameters for the script are given as an 00335 //environment variable 00336 qsubCommand.append(" -v ").append("PROACTIVE_COMMAND=\" ") 00337 .append(targetProcess.getCommand()).append("\" "); 00338 qsubCommand.append(scriptLocation); 00339 00340 if (queueName != null) { 00341 qsubCommand.append(" -q " + queueName + " "); 00342 } 00343 00344 if (logger.isDebugEnabled()) { 00345 logger.debug("qsub command is " + qsubCommand.toString()); 00346 } 00347 00348 //System.out.println("PBSSubProcess.buildCommand() " + qsubCommand); 00349 return qsubCommand.toString(); 00350 } 00351 00352 protected String buildResourceString() { 00353 StringBuilder rs = new StringBuilder(); 00354 rs.append(" -l walltime=").append(bookingDuration).append(","); 00355 //to specify nodes and processor per nodes, the syntax is different from 00356 //other resources 00357 rs.append("nodes=").append(hostNumber).append(":"); 00358 rs.append("ppn=").append(processorPerNode); 00359 return rs.toString(); 00360 } 00361 }