00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.process.oar;
00032
00033 import java.util.ArrayList;
00034 import java.util.regex.Matcher;
00035 import java.util.regex.Pattern;
00036
00037 import org.objectweb.proactive.core.process.AbstractExternalProcessDecorator;
00038 import org.objectweb.proactive.core.process.ExternalProcess;
00039 import org.objectweb.proactive.core.process.JVMProcessImpl;
00040 import org.objectweb.proactive.core.process.MessageSink;
00041 import org.objectweb.proactive.core.process.UniversalProcess;
00042
00043
00066 public class OARSubProcess extends AbstractExternalProcessDecorator {
00067 private static final String FILE_SEPARATOR = System.getProperty(
00068 "file.separator");
00069 public final static String DEFAULT_OARSUBPATH = "/usr/local/bin/oarsub";
00070 private static final String DEFAULT_SCRIPT_LOCATION = System.getProperty(
00071 "user.home") + FILE_SEPARATOR + "ProActive" + FILE_SEPARATOR +
00072 "scripts" + FILE_SEPARATOR + "unix" + FILE_SEPARATOR + "cluster" +
00073 FILE_SEPARATOR + "oarStartRuntime.sh ";
00074 protected static final String DEFAULT_HOSTS_NUMBER = "1";
00075 protected String hostNumber = DEFAULT_HOSTS_NUMBER;
00076 protected String weight = "2";
00077 protected String scriptLocation = DEFAULT_SCRIPT_LOCATION;
00078 protected String interactive = "false";
00079 protected int jobID;
00080 protected String queueName;
00081 protected String accessProtocol = "rsh";
00082 protected String resources;
00083
00084
00085 public OARSubProcess() {
00086 super();
00087 setCompositionType(GIVE_COMMAND_AS_PARAMETER);
00088 this.hostname = null;
00089 this.command_path = DEFAULT_OARSUBPATH;
00090 }
00091
00092 public OARSubProcess(ExternalProcess targetProcess) {
00093 super(targetProcess);
00094 this.hostname = null;
00095 this.command_path = DEFAULT_OARSUBPATH;
00096 }
00097
00098
00099
00100
00101
00102 public void setOutputMessageSink(MessageSink outputMessageSink) {
00103 if (outputMessageSink == null) {
00104 super.setOutputMessageSink(new SimpleMessageSink());
00105 } else {
00106 super.setOutputMessageSink(outputMessageSink);
00107 }
00108 }
00109
00113 public String getProcessId() {
00114 return "oar_" + targetProcess.getProcessId();
00115 }
00116
00120 public int getNodeNumber() {
00121 if (hostNumber.equals("all")) {
00122 return UniversalProcess.UNKNOWN_NODE_NUMBER;
00123 }
00124
00125 return (new Integer(hostNumber).intValue() * new Integer(weight).intValue());
00126 }
00127
00131 public UniversalProcess getFinalProcess() {
00132 checkStarted();
00133 return targetProcess.getFinalProcess();
00134 }
00135
00140 public void setScriptLocation(String location) {
00141 checkStarted();
00142
00143 this.scriptLocation = location;
00144
00145 }
00146
00152 public void setAccessProtocol(String accessProtocol) {
00153 this.accessProtocol = accessProtocol;
00154 }
00155
00161 public void setResources(String res) {
00162 checkStarted();
00163 if (res != null) {
00164 this.resources = res;
00165 parseRes(res);
00166 }
00167 }
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00186 public void setInteractive(String interactive) {
00187 this.interactive = interactive;
00188 }
00189
00195 public void setQueueName(String queueName) {
00196 checkStarted();
00197 if (queueName == null) {
00198 throw new NullPointerException();
00199 }
00200 this.queueName = queueName;
00201 }
00202
00203 protected String parseHostname(String message) {
00204
00205 String result = new String();
00206 if (logger.isDebugEnabled()) {
00207 logger.debug("parseHostname() analyzing " + message);
00208 }
00209 java.util.StringTokenizer st = new java.util.StringTokenizer(message);
00210 if (st.countTokens() < 2) {
00211 return null;
00212 }
00213 if (!":".equals(st.nextToken())) {
00214 return null;
00215 }
00216
00217 while (st.hasMoreTokens()) {
00218 result += (st.nextToken());
00219 result += " ";
00220 }
00221 return result;
00222 }
00223
00224
00225
00226
00227 protected void internalStartProcess(String commandToExecute)
00228 throws java.io.IOException {
00229 ArrayList<String> al = new ArrayList<String>();
00230
00231
00232
00233 Pattern p = Pattern.compile("(.*) .*(-c).*'(.*)'");
00234 Matcher m = p.matcher(command);
00235 if (!m.matches()) {
00236 System.err.println("Could not match command ");
00237 System.err.println(commandToExecute);
00238 }
00239 for (int i = 1; i <= m.groupCount(); i++) {
00240
00241 al.add(m.group(i));
00242 }
00243 String[] command = al.toArray(new String[] { });
00244
00245 try {
00246 externalProcess = Runtime.getRuntime().exec(command);
00247 java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(
00248 externalProcess.getInputStream()));
00249 java.io.BufferedReader err = new java.io.BufferedReader(new java.io.InputStreamReader(
00250 externalProcess.getErrorStream()));
00251 java.io.BufferedWriter out = new java.io.BufferedWriter(new java.io.OutputStreamWriter(
00252 externalProcess.getOutputStream()));
00253 handleProcess(in, out, err);
00254 } catch (java.io.IOException e) {
00255 isFinished = true;
00256
00257 e.printStackTrace();
00258 }
00259 }
00260
00266 protected String internalBuildCommand() {
00267 StringBuilder oarsubCommand = new StringBuilder();
00268 oarsubCommand.append(
00269 "/bin/sh -c 'echo for i in \\`cat \\$OAR_NODEFILE\\` \\; do " +
00270 accessProtocol + " \\$i ");
00271 oarsubCommand.append(targetProcess.getCommand());
00272 oarsubCommand.append(" \\& done \\; wait > ");
00273 oarsubCommand.append(scriptLocation).append(" ; ");
00274 oarsubCommand.append(command_path);
00275 oarsubCommand.append(" ");
00276 if (resources != null) {
00277 oarsubCommand.append("-l " + resources).append(" ");
00278 }
00279
00280
00281
00282
00283
00284 oarsubCommand.append(scriptLocation).append(" '");
00285 if (logger.isDebugEnabled()) {
00286 logger.debug("oarsub command is " + oarsubCommand.toString());
00287 }
00288 return oarsubCommand.toString();
00289 }
00290
00294 private void parseRes(String res) {
00295 String[] resTab = res.split(",");
00296 for (int i = 0; i < resTab.length; i++) {
00297 if (!(resTab[i].indexOf("nodes") < 0)) {
00298 hostNumber = resTab[i].substring(resTab[i].indexOf("=") + 1,
00299 resTab[i].length());
00300 }
00301 if (!(resTab[i].indexOf("weight") < 0)) {
00302 weight = resTab[i].substring(resTab[i].indexOf("=") + 1,
00303 resTab[i].length());
00304 }
00305 }
00306 }
00307
00308 public static void main(String[] args) {
00309 System.out.println("Testing OARSubProcess");
00310 JVMProcessImpl p = new JVMProcessImpl();
00311 OARSubProcess oar = new OARSubProcess(p);
00312 oar.setResources("nodes=2");
00313 System.out.println(oar.buildCommand());
00314 }
00315 }