00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.process;
00032
00033 import org.apache.log4j.Logger;
00034
00035 import org.objectweb.proactive.core.process.filetransfer.CopyProtocol;
00036 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00037 import org.objectweb.proactive.core.util.RemoteProcessMessageLogger;
00038 import org.objectweb.proactive.core.util.log.Loggers;
00039 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00040
00041 import java.io.IOException;
00042
00043
00044 public abstract class AbstractExternalProcess extends AbstractUniversalProcess
00045 implements ExternalProcess {
00046 protected static Logger clogger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_PROCESS);
00047 protected static Logger fileTransferLogger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_FILETRANSFER);
00048 protected static final boolean IS_WINDOWS_SYSTEM = System.getProperty(
00049 "os.name").toLowerCase().startsWith("win");
00050 protected Process externalProcess;
00051 private volatile boolean shouldRun = true;
00052 public static final int NO_COMPOSITION = 0;
00053 protected boolean closeStream = false;
00054 protected RemoteProcessMessageLogger inputMessageLogger;
00055 protected RemoteProcessMessageLogger errorMessageLogger;
00056 protected MessageSink outputMessageSink;
00057 private ThreadActivityMonitor inThreadMonitor;
00058 private ThreadActivityMonitor errThreadMonitor;
00059 private FileTransferWorkShop ftsDeploy = null;
00060 private FileTransferWorkShop ftsRetrieve = null;
00061 protected String FILE_TRANSFER_DEFAULT_PROTOCOL = "dummy";
00062
00063
00064
00065 private boolean requiresFileTransferDeployOnNodeCreation = false;
00066
00067
00068
00069
00070 protected AbstractExternalProcess() {
00071 }
00072
00073 public AbstractExternalProcess(RemoteProcessMessageLogger messageLogger) {
00074 this(messageLogger, messageLogger, null);
00075 }
00076
00077 public AbstractExternalProcess(
00078 RemoteProcessMessageLogger inputMessageLogger,
00079 RemoteProcessMessageLogger errorMessageLogger) {
00080 this(inputMessageLogger, errorMessageLogger, null);
00081 }
00082
00083 public AbstractExternalProcess(
00084 RemoteProcessMessageLogger inputMessageLogger,
00085 RemoteProcessMessageLogger errorMessageLogger,
00086 MessageSink outputMessageSink) {
00087 this.inputMessageLogger = inputMessageLogger;
00088 this.errorMessageLogger = errorMessageLogger;
00089 this.outputMessageSink = outputMessageSink;
00090 }
00091
00092
00093
00094
00095
00096
00097
00098 public void closeStream() {
00099 this.closeStream = true;
00100 }
00101
00102 public RemoteProcessMessageLogger getInputMessageLogger() {
00103 return inputMessageLogger;
00104 }
00105
00106 public RemoteProcessMessageLogger getErrorMessageLogger() {
00107 return errorMessageLogger;
00108 }
00109
00110 public MessageSink getOutputMessageSink() {
00111 return outputMessageSink;
00112 }
00113
00114 public void setInputMessageLogger(
00115 RemoteProcessMessageLogger inputMessageLogger) {
00116 checkStarted();
00117 this.inputMessageLogger = inputMessageLogger;
00118 }
00119
00120 public void setErrorMessageLogger(
00121 RemoteProcessMessageLogger errorMessageLogger) {
00122 checkStarted();
00123 this.errorMessageLogger = errorMessageLogger;
00124 }
00125
00126 public void setOutputMessageSink(MessageSink outputMessageSink) {
00127 checkStarted();
00128 this.outputMessageSink = outputMessageSink;
00129 }
00130
00131 public FileTransferWorkShop getFileTransferWorkShopDeploy() {
00132 if (ftsDeploy == null) {
00133 ftsDeploy = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00134 }
00135
00136 return ftsDeploy;
00137 }
00138
00139 public FileTransferWorkShop getFileTransferWorkShopRetrieve() {
00140 if (ftsRetrieve == null) {
00141 ftsRetrieve = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00142 }
00143
00144 return ftsRetrieve;
00145 }
00146
00147 public String getFileTransferDefaultCopyProtocol() {
00148 return FILE_TRANSFER_DEFAULT_PROTOCOL;
00149 }
00150
00151 public int getCompositionType() {
00152 return NO_COMPOSITION;
00153 }
00154
00155
00156
00157
00158 protected abstract String buildCommand();
00159
00160 protected String buildEnvironmentCommand() {
00161 if (environment == null) {
00162 return "";
00163 }
00164 if (IS_WINDOWS_SYSTEM) {
00165 return buildWindowsEnvironmentCommand();
00166 } else {
00167 return buildUnixEnvironmentCommand();
00168 }
00169 }
00170
00171 protected String buildWindowsEnvironmentCommand() {
00172 StringBuilder sb = new StringBuilder();
00173 for (int i = 0; i < environment.length; i++) {
00174 inputMessageLogger.log(" exporting variable " +
00175 environment[i]);
00176 sb.append("set ");
00177 sb.append(environment[i]);
00178 sb.append(" ; ");
00179 }
00180 return sb.toString();
00181 }
00182
00183 protected String buildUnixEnvironmentCommand() {
00184 StringBuilder sb = new StringBuilder();
00185 for (int i = 0; i < environment.length; i++) {
00186 inputMessageLogger.log(" exporting variable " +
00187 environment[i]);
00188 sb.append("export ");
00189 sb.append(environment[i]);
00190 sb.append(" ; ");
00191 }
00192 return sb.toString();
00193 }
00194
00195 protected void internalStartProcess(String commandToExecute)
00196 throws java.io.IOException {
00197 try {
00198 shouldRun = true;
00199 externalProcess = Runtime.getRuntime().exec(commandToExecute);
00200 java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(
00201 externalProcess.getInputStream()));
00202 java.io.BufferedReader err = new java.io.BufferedReader(new java.io.InputStreamReader(
00203 externalProcess.getErrorStream()));
00204 java.io.BufferedWriter out = new java.io.BufferedWriter(new java.io.OutputStreamWriter(
00205 externalProcess.getOutputStream()));
00206 handleProcess(in, out, err);
00207 } catch (java.io.IOException e) {
00208 isFinished = true;
00209 throw e;
00210 }
00211 }
00212
00213 protected void internalStopProcess() {
00214 shouldRun = false;
00215
00216 if (externalProcess != null) {
00217 externalProcess.destroy();
00218 }
00219 if (outputMessageSink != null) {
00220 outputMessageSink.setMessage(null);
00221 }
00222 }
00223
00224 protected int internalWaitFor() throws InterruptedException {
00225 return externalProcess.waitFor();
00226 }
00227
00228 protected int internalExitValue() throws IllegalThreadStateException {
00229 return externalProcess.exitValue();
00230 }
00231
00235 protected void internalStartFileTransfer(FileTransferWorkShop fts) {
00236 CopyProtocol[] copyProtocol = fts.getCopyProtocols();
00237 boolean success = false;
00238
00239 long beginning = System.currentTimeMillis();
00240 long end = beginning;
00241
00242
00243
00244
00245
00246
00247
00248 if (!fts.check()) {
00249 return;
00250 }
00251
00252
00253
00254 for (int i = 0; (i < copyProtocol.length) && !success; i++) {
00255 fileTransferLogger.info("Trying copyprotocol: " +
00256 copyProtocol[i].getProtocolName());
00257 if (!copyProtocol[i].checkProtocol()) {
00258 logger.error("Protocol check failed");
00259 continue;
00260 }
00261
00262
00263
00264
00265 if (copyProtocol[i].getProtocolName().equalsIgnoreCase("pftp")) {
00266 if (fileTransferLogger.isDebugEnabled()) {
00267 fileTransferLogger.debug(
00268 "ProActive File Transfer will be used later on.");
00269 }
00270
00271
00272 success = true;
00273 requiresFileTransferDeployOnNodeCreation = true;
00274 }
00275
00276 else if (copyProtocol[i].isDefaultProtocol() &&
00277 copyProtocol[i].isDummyProtocol()) {
00278 if (fileTransferLogger.isDebugEnabled()) {
00279 fileTransferLogger.debug(
00280 "Trying protocol internal filetransfer");
00281 }
00282
00283 success = internalFileTransferDefaultProtocol();
00284 }
00285
00286 else {
00287 success = copyProtocol[i].startFileTransfer();
00288 }
00289 }
00290
00291 end = System.currentTimeMillis();
00292
00293 if (fileTransferLogger.isDebugEnabled()) {
00294 fileTransferLogger.debug("FileTransfer spent:" + (end - beginning) +
00295 "[ms]");
00296 }
00297
00298 if (!success) {
00299 fileTransferLogger.info("FileTransfer faild");
00300 }
00301 }
00302
00308 protected boolean internalFileTransferDefaultProtocol() {
00309
00310 return false;
00311 }
00312
00316 public boolean isRequiredFileTransferDeployOnNodeCreation() {
00317 return requiresFileTransferDeployOnNodeCreation;
00318 }
00319
00320 protected void handleProcess(java.io.BufferedReader in,
00321 java.io.BufferedWriter out, java.io.BufferedReader err) {
00322 if (closeStream) {
00323 try {
00324
00325
00326 Thread.sleep(200);
00327 out.close();
00328 err.close();
00329 in.close();
00330 } catch (IOException e) {
00331
00332 e.printStackTrace();
00333 } catch (InterruptedException e) {
00334
00335 e.printStackTrace();
00336 }
00337 } else {
00338 handleInput(in);
00339 handleOutput(out);
00340 handleError(err);
00341 }
00342 }
00343
00344 protected void handleInput(java.io.BufferedReader in) {
00345 if (inputMessageLogger == null) {
00346 return;
00347 }
00348 inThreadMonitor = new ThreadActivityMonitor();
00349 Runnable r = new ProcessInputHandler(in, inputMessageLogger,
00350 inThreadMonitor);
00351 Thread t = new Thread(r, "IN -> " + getShortName(getCommand(), 20));
00352 t.start();
00353 }
00354
00355 protected void handleError(java.io.BufferedReader err) {
00356 if (errorMessageLogger == null) {
00357 return;
00358 }
00359 errThreadMonitor = new ThreadActivityMonitor();
00360 Runnable r = new ProcessInputHandler(err, errorMessageLogger,
00361 errThreadMonitor);
00362 Thread t = new Thread(r, "ERR -> " + getShortName(getCommand(), 20));
00363 t.start();
00364 }
00365
00366 protected void handleOutput(java.io.BufferedWriter out) {
00367 if (outputMessageSink == null) {
00368 return;
00369 }
00370
00371
00372 Runnable r = new ProcessOutputHandler(out, outputMessageSink);
00373 Thread t = new Thread(r, "OUT -> " + getShortName(getCommand(), 20));
00374 t.start();
00375 }
00376
00377
00378
00379
00380 private final String getShortName(String name, int length) {
00381 return name.substring(0, Math.min(name.length(), length));
00382 }
00383
00384 private final void waitForMonitoredThread() {
00385 do {
00386 try {
00387 Thread.sleep(300);
00388 } catch (InterruptedException e) {
00389 }
00390 } while (errThreadMonitor.isActive() || inThreadMonitor.isActive());
00391 }
00392
00393 private void writeObject(java.io.ObjectOutputStream out)
00394 throws java.io.IOException {
00395 if (isStarted) {
00396
00397
00398 externalProcess = null;
00399 }
00400 out.defaultWriteObject();
00401 }
00402
00403
00404
00405
00406 private static class ThreadActivityMonitor implements java.io.Serializable {
00407 private boolean isActive;
00408
00409 public boolean isActive() {
00410 return isActive;
00411 }
00412
00413 public void setActive(boolean b) {
00414 isActive = b;
00415 }
00416 }
00417
00421 public static class StandardOutputMessageLogger
00422 implements RemoteProcessMessageLogger, java.io.Serializable {
00423 public StandardOutputMessageLogger() {
00424
00425 }
00426
00427 public void log(String message) {
00428 messageLogger.info(message);
00429 }
00430
00431 public void log(Throwable t) {
00432 t.printStackTrace();
00433 }
00434
00435 public void log(String message, Throwable t) {
00436 messageLogger.info(message);
00437 t.printStackTrace();
00438 }
00439 }
00440
00441
00442
00446 public static class NullMessageLogger implements RemoteProcessMessageLogger,
00447 java.io.Serializable {
00448 public NullMessageLogger() {
00449 }
00450
00451 public void log(String message) {
00452 }
00453
00454 public void log(Throwable t) {
00455 }
00456
00457 public void log(String message, Throwable t) {
00458 }
00459 }
00460
00461
00462
00466 public static class SimpleMessageSink implements MessageSink,
00467 java.io.Serializable {
00468 private String message;
00469 private boolean isActive = true;
00470
00471 public synchronized String getMessage() {
00472 if (!isActive) {
00473 return null;
00474 }
00475 while ((message == null) && isActive) {
00476 try {
00477 wait();
00478 } catch (InterruptedException e) {
00479 }
00480 }
00481 String messageToSend = message;
00482 message = null;
00483 notifyAll();
00484 return messageToSend;
00485 }
00486
00487 public synchronized void setMessage(String messageToPost) {
00488 if (!isActive) {
00489 return;
00490 }
00491 while ((message != null) && isActive) {
00492 try {
00493 wait();
00494 } catch (InterruptedException e) {
00495 }
00496 }
00497 if (messageToPost == null) {
00498 isActive = false;
00499 }
00500 this.message = messageToPost;
00501 notifyAll();
00502 }
00503
00504 public synchronized boolean hasMessage() {
00505 return message != null;
00506 }
00507
00508 public synchronized boolean isActive() {
00509 return isActive;
00510 }
00511 }
00512
00513
00514
00519 protected class ProcessInputHandler implements Runnable {
00520 private java.io.BufferedReader in;
00521 private RemoteProcessMessageLogger logger;
00522 private ThreadActivityMonitor threadMonitor;
00523
00524 public ProcessInputHandler(java.io.BufferedReader in,
00525 RemoteProcessMessageLogger logger,
00526 ThreadActivityMonitor threadMonitor) {
00527 this.in = in;
00528 this.logger = logger;
00529 this.threadMonitor = threadMonitor;
00530 }
00531
00532 public void run() {
00533 if (AbstractExternalProcess.clogger.isDebugEnabled()) {
00534 AbstractExternalProcess.clogger.debug("Process started Thread=" +
00535 Thread.currentThread().getName());
00536 }
00537
00538
00539 try {
00540 while (shouldRun) {
00541 threadMonitor.setActive(false);
00542
00543
00544
00545
00546 String s = in.readLine();
00547 if (AbstractExternalProcess.clogger.isDebugEnabled()) {
00548
00549
00550 AbstractExternalProcess.clogger.debug(s);
00551 }
00552
00553
00554 threadMonitor.setActive(true);
00555 if (s == null) {
00556 break;
00557 } else {
00558 logger.log(s);
00559 }
00560 }
00561 } catch (java.io.IOException e) {
00562 logger.log(e);
00563 } finally {
00564 isFinished = true;
00565 threadMonitor.setActive(false);
00566 try {
00567 in.close();
00568 } catch (java.io.IOException e) {
00569 e.printStackTrace();
00570 }
00571 logger.log("Process finished Thread=" +
00572 Thread.currentThread().getName());
00573 }
00574 }
00575 }
00576
00577
00578
00583 protected class ProcessOutputHandler implements Runnable {
00584 private java.io.BufferedWriter out;
00585 private MessageSink messageSink;
00586
00587 public ProcessOutputHandler(java.io.BufferedWriter out,
00588 MessageSink messageSink) {
00589 this.out = out;
00590 this.messageSink = messageSink;
00591 }
00592
00593 public void run() {
00594 try {
00595 while (shouldRun) {
00596 waitForMonitoredThread();
00597
00598 String message = messageSink.getMessage();
00599 if (message == null) {
00600 break;
00601 }
00602 try {
00603 out.write(message);
00604 out.newLine();
00605 out.flush();
00606
00607 } catch (java.io.IOException e) {
00608 break;
00609 }
00610 message = null;
00611 }
00612 } finally {
00613 isFinished = true;
00614 waitForMonitoredThread();
00615 try {
00616 out.close();
00617 } catch (java.io.IOException e) {
00618 e.printStackTrace();
00619 }
00620 }
00621 }
00622 }
00623
00624
00625 }