00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.core.process;
00032 
00033 import org.apache.log4j.Logger;
00034 
00035 import org.objectweb.proactive.core.process.filetransfer.CopyProtocol;
00036 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00037 import org.objectweb.proactive.core.util.RemoteProcessMessageLogger;
00038 import org.objectweb.proactive.core.util.log.Loggers;
00039 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00040 
00041 import java.io.IOException;
00042 
00043 
00044 public abstract class AbstractExternalProcess extends AbstractUniversalProcess
00045     implements ExternalProcess {
00046     protected static Logger clogger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_PROCESS);
00047     protected static Logger fileTransferLogger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_FILETRANSFER);
00048     protected static final boolean IS_WINDOWS_SYSTEM = System.getProperty(
00049             "os.name").toLowerCase().startsWith("win");
00050     protected Process externalProcess;
00051     private volatile boolean shouldRun = true;
00052     public static final int NO_COMPOSITION = 0;
00053     protected boolean closeStream = false;
00054     protected RemoteProcessMessageLogger inputMessageLogger;
00055     protected RemoteProcessMessageLogger errorMessageLogger;
00056     protected MessageSink outputMessageSink;
00057     private ThreadActivityMonitor inThreadMonitor;
00058     private ThreadActivityMonitor errThreadMonitor;
00059     private FileTransferWorkShop ftsDeploy = null;
00060     private FileTransferWorkShop ftsRetrieve = null;
00061     protected String FILE_TRANSFER_DEFAULT_PROTOCOL = "dummy";
00062 
00063     
00064     
00065     private boolean requiresFileTransferDeployOnNodeCreation = false;
00066 
00067     
00068     
00069     
00070     protected AbstractExternalProcess() {
00071     }
00072 
00073     public AbstractExternalProcess(RemoteProcessMessageLogger messageLogger) {
00074         this(messageLogger, messageLogger, null);
00075     }
00076 
00077     public AbstractExternalProcess(
00078         RemoteProcessMessageLogger inputMessageLogger,
00079         RemoteProcessMessageLogger errorMessageLogger) {
00080         this(inputMessageLogger, errorMessageLogger, null);
00081     }
00082 
00083     public AbstractExternalProcess(
00084         RemoteProcessMessageLogger inputMessageLogger,
00085         RemoteProcessMessageLogger errorMessageLogger,
00086         MessageSink outputMessageSink) {
00087         this.inputMessageLogger = inputMessageLogger;
00088         this.errorMessageLogger = errorMessageLogger;
00089         this.outputMessageSink = outputMessageSink;
00090     }
00091 
00092     
00093     
00094     
00095     
00096     
00097     
00098     public void closeStream() {
00099         this.closeStream = true;
00100     }
00101 
00102     public RemoteProcessMessageLogger getInputMessageLogger() {
00103         return inputMessageLogger;
00104     }
00105 
00106     public RemoteProcessMessageLogger getErrorMessageLogger() {
00107         return errorMessageLogger;
00108     }
00109 
00110     public MessageSink getOutputMessageSink() {
00111         return outputMessageSink;
00112     }
00113 
00114     public void setInputMessageLogger(
00115         RemoteProcessMessageLogger inputMessageLogger) {
00116         checkStarted();
00117         this.inputMessageLogger = inputMessageLogger;
00118     }
00119 
00120     public void setErrorMessageLogger(
00121         RemoteProcessMessageLogger errorMessageLogger) {
00122         checkStarted();
00123         this.errorMessageLogger = errorMessageLogger;
00124     }
00125 
00126     public void setOutputMessageSink(MessageSink outputMessageSink) {
00127         checkStarted();
00128         this.outputMessageSink = outputMessageSink;
00129     }
00130 
00131     public FileTransferWorkShop getFileTransferWorkShopDeploy() {
00132         if (ftsDeploy == null) {
00133             ftsDeploy = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00134         }
00135 
00136         return ftsDeploy;
00137     }
00138 
00139     public FileTransferWorkShop getFileTransferWorkShopRetrieve() {
00140         if (ftsRetrieve == null) {
00141             ftsRetrieve = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00142         }
00143 
00144         return ftsRetrieve;
00145     }
00146 
00147     public String getFileTransferDefaultCopyProtocol() {
00148         return FILE_TRANSFER_DEFAULT_PROTOCOL;
00149     }
00150 
00151     public int getCompositionType() {
00152         return NO_COMPOSITION;
00153     }
00154 
00155     
00156     
00157     
00158     protected abstract String buildCommand();
00159 
00160     protected String buildEnvironmentCommand() {
00161         if (environment == null) {
00162             return "";
00163         }
00164         if (IS_WINDOWS_SYSTEM) {
00165             return buildWindowsEnvironmentCommand();
00166         } else {
00167             return buildUnixEnvironmentCommand();
00168         }
00169     }
00170 
00171     protected String buildWindowsEnvironmentCommand() {
00172         StringBuilder sb = new StringBuilder();
00173         for (int i = 0; i < environment.length; i++) {
00174             inputMessageLogger.log("      exporting variable " +
00175                 environment[i]);
00176             sb.append("set ");
00177             sb.append(environment[i]);
00178             sb.append(" ; ");
00179         }
00180         return sb.toString();
00181     }
00182 
00183     protected String buildUnixEnvironmentCommand() {
00184         StringBuilder sb = new StringBuilder();
00185         for (int i = 0; i < environment.length; i++) {
00186             inputMessageLogger.log("      exporting variable " +
00187                 environment[i]);
00188             sb.append("export ");
00189             sb.append(environment[i]);
00190             sb.append(" ; ");
00191         }
00192         return sb.toString();
00193     }
00194 
00195     protected void internalStartProcess(String commandToExecute)
00196         throws java.io.IOException {
00197         try {
00198             shouldRun = true;
00199             externalProcess = Runtime.getRuntime().exec(commandToExecute);
00200             java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(
00201                         externalProcess.getInputStream()));
00202             java.io.BufferedReader err = new java.io.BufferedReader(new java.io.InputStreamReader(
00203                         externalProcess.getErrorStream()));
00204             java.io.BufferedWriter out = new java.io.BufferedWriter(new java.io.OutputStreamWriter(
00205                         externalProcess.getOutputStream()));
00206             handleProcess(in, out, err);
00207         } catch (java.io.IOException e) {
00208             isFinished = true;
00209             throw e;
00210         }
00211     }
00212 
00213     protected void internalStopProcess() {
00214         shouldRun = false;
00215 
00216         if (externalProcess != null) {
00217             externalProcess.destroy();
00218         }
00219         if (outputMessageSink != null) {
00220             outputMessageSink.setMessage(null);
00221         }
00222     }
00223 
00224     protected int internalWaitFor() throws InterruptedException {
00225         return externalProcess.waitFor();
00226     }
00227 
00228     protected int internalExitValue() throws IllegalThreadStateException {
00229         return externalProcess.exitValue();
00230     }
00231 
00235     protected void internalStartFileTransfer(FileTransferWorkShop fts) {
00236         CopyProtocol[] copyProtocol = fts.getCopyProtocols();
00237         boolean success = false;
00238 
00239         long beginning = System.currentTimeMillis();
00240         long end = beginning;
00241 
00242         
00243 
00244 
00245 
00246 
00247 
00248         if (!fts.check()) {
00249             return; 
00250         }
00251 
00252         
00253 
00254         for (int i = 0; (i < copyProtocol.length) && !success; i++) {
00255             fileTransferLogger.info("Trying copyprotocol: " +
00256                 copyProtocol[i].getProtocolName());
00257             if (!copyProtocol[i].checkProtocol()) {
00258                 logger.error("Protocol check failed");
00259                 continue;
00260             }
00261 
00262             
00263             
00264             
00265             if (copyProtocol[i].getProtocolName().equalsIgnoreCase("pftp")) {
00266                 if (fileTransferLogger.isDebugEnabled()) {
00267                     fileTransferLogger.debug(
00268                         "ProActive File Transfer will be used later on.");
00269                 }
00270 
00271                 
00272                 success = true;
00273                 requiresFileTransferDeployOnNodeCreation = true;
00274             }
00275             
00276             else if (copyProtocol[i].isDefaultProtocol() &&
00277                     copyProtocol[i].isDummyProtocol()) {
00278                 if (fileTransferLogger.isDebugEnabled()) {
00279                     fileTransferLogger.debug(
00280                         "Trying protocol internal filetransfer");
00281                 }
00282 
00283                 success = internalFileTransferDefaultProtocol();
00284             }
00285             
00286             else {
00287                 success = copyProtocol[i].startFileTransfer();
00288             }
00289         }
00290 
00291         end = System.currentTimeMillis();
00292 
00293         if (fileTransferLogger.isDebugEnabled()) {
00294             fileTransferLogger.debug("FileTransfer spent:" + (end - beginning) +
00295                 "[ms]");
00296         }
00297 
00298         if (!success) {
00299             fileTransferLogger.info("FileTransfer faild");
00300         }
00301     }
00302 
00308     protected boolean internalFileTransferDefaultProtocol() {
00309         
00310         return false;
00311     }
00312 
00316     public boolean isRequiredFileTransferDeployOnNodeCreation() {
00317         return requiresFileTransferDeployOnNodeCreation;
00318     }
00319 
00320     protected void handleProcess(java.io.BufferedReader in,
00321         java.io.BufferedWriter out, java.io.BufferedReader err) {
00322         if (closeStream) {
00323             try {
00324                 
00325                 
00326                 Thread.sleep(200);
00327                 out.close();
00328                 err.close();
00329                 in.close();
00330             } catch (IOException e) {
00331                 
00332                 e.printStackTrace();
00333             } catch (InterruptedException e) {
00334                 
00335                 e.printStackTrace();
00336             }
00337         } else {
00338             handleInput(in);
00339             handleOutput(out);
00340             handleError(err);
00341         }
00342     }
00343 
00344     protected void handleInput(java.io.BufferedReader in) {
00345         if (inputMessageLogger == null) {
00346             return;
00347         }
00348         inThreadMonitor = new ThreadActivityMonitor();
00349         Runnable r = new ProcessInputHandler(in, inputMessageLogger,
00350                 inThreadMonitor);
00351         Thread t = new Thread(r, "IN -> " + getShortName(getCommand(), 20));
00352         t.start();
00353     }
00354 
00355     protected void handleError(java.io.BufferedReader err) {
00356         if (errorMessageLogger == null) {
00357             return;
00358         }
00359         errThreadMonitor = new ThreadActivityMonitor();
00360         Runnable r = new ProcessInputHandler(err, errorMessageLogger,
00361                 errThreadMonitor);
00362         Thread t = new Thread(r, "ERR -> " + getShortName(getCommand(), 20));
00363         t.start();
00364     }
00365 
00366     protected void handleOutput(java.io.BufferedWriter out) {
00367         if (outputMessageSink == null) {
00368             return;
00369         }
00370 
00371         
00372         Runnable r = new ProcessOutputHandler(out, outputMessageSink);
00373         Thread t = new Thread(r, "OUT -> " + getShortName(getCommand(), 20));
00374         t.start();
00375     }
00376 
00377     
00378     
00379     
00380     private final String getShortName(String name, int length) {
00381         return name.substring(0, Math.min(name.length(), length));
00382     }
00383 
00384     private final void waitForMonitoredThread() {
00385         do {
00386             try {
00387                 Thread.sleep(300);
00388             } catch (InterruptedException e) {
00389             }
00390         } while (errThreadMonitor.isActive() || inThreadMonitor.isActive());
00391     }
00392 
00393     private void writeObject(java.io.ObjectOutputStream out)
00394         throws java.io.IOException {
00395         if (isStarted) {
00396             
00397             
00398             externalProcess = null;
00399         }
00400         out.defaultWriteObject();
00401     }
00402 
00403     
00404     
00405     
00406     private static class ThreadActivityMonitor implements java.io.Serializable {
00407         private boolean isActive;
00408 
00409         public boolean isActive() {
00410             return isActive;
00411         }
00412 
00413         public void setActive(boolean b) {
00414             isActive = b;
00415         }
00416     }
00417 
00421     public static class StandardOutputMessageLogger
00422         implements RemoteProcessMessageLogger, java.io.Serializable {
00423         public StandardOutputMessageLogger() {
00424             
00425         }
00426 
00427         public void log(String message) {
00428             messageLogger.info(message);
00429         }
00430 
00431         public void log(Throwable t) {
00432             t.printStackTrace();
00433         }
00434 
00435         public void log(String message, Throwable t) {
00436             messageLogger.info(message);
00437             t.printStackTrace();
00438         }
00439     }
00440 
00441     
00442 
00446     public static class NullMessageLogger implements RemoteProcessMessageLogger,
00447         java.io.Serializable {
00448         public NullMessageLogger() {
00449         }
00450 
00451         public void log(String message) {
00452         }
00453 
00454         public void log(Throwable t) {
00455         }
00456 
00457         public void log(String message, Throwable t) {
00458         }
00459     }
00460 
00461     
00462 
00466     public static class SimpleMessageSink implements MessageSink,
00467         java.io.Serializable {
00468         private String message;
00469         private boolean isActive = true;
00470 
00471         public synchronized String getMessage() {
00472             if (!isActive) {
00473                 return null;
00474             }
00475             while ((message == null) && isActive) {
00476                 try {
00477                     wait();
00478                 } catch (InterruptedException e) {
00479                 }
00480             }
00481             String messageToSend = message;
00482             message = null;
00483             notifyAll();
00484             return messageToSend;
00485         }
00486 
00487         public synchronized void setMessage(String messageToPost) {
00488             if (!isActive) {
00489                 return;
00490             }
00491             while ((message != null) && isActive) {
00492                 try {
00493                     wait();
00494                 } catch (InterruptedException e) {
00495                 }
00496             }
00497             if (messageToPost == null) {
00498                 isActive = false;
00499             }
00500             this.message = messageToPost;
00501             notifyAll();
00502         }
00503 
00504         public synchronized boolean hasMessage() {
00505             return message != null;
00506         }
00507 
00508         public synchronized boolean isActive() {
00509             return isActive;
00510         }
00511     }
00512 
00513     
00514 
00519     protected class ProcessInputHandler implements Runnable {
00520         private java.io.BufferedReader in;
00521         private RemoteProcessMessageLogger logger;
00522         private ThreadActivityMonitor threadMonitor;
00523 
00524         public ProcessInputHandler(java.io.BufferedReader in,
00525             RemoteProcessMessageLogger logger,
00526             ThreadActivityMonitor threadMonitor) {
00527             this.in = in;
00528             this.logger = logger;
00529             this.threadMonitor = threadMonitor;
00530         }
00531 
00532         public void run() {
00533             if (AbstractExternalProcess.clogger.isDebugEnabled()) {
00534                 AbstractExternalProcess.clogger.debug("Process started Thread=" +
00535                     Thread.currentThread().getName());
00536             }
00537 
00538             
00539             try {
00540                 while (shouldRun) {
00541                     threadMonitor.setActive(false);
00542                     
00543                     
00544                     
00545                     
00546                     String s = in.readLine();
00547                     if (AbstractExternalProcess.clogger.isDebugEnabled()) {
00548                         
00549                         
00550                         AbstractExternalProcess.clogger.debug(s);
00551                     }
00552 
00553                     
00554                     threadMonitor.setActive(true);
00555                     if (s == null) {
00556                         break;
00557                     } else {
00558                         logger.log(s);
00559                     }
00560                 }
00561             } catch (java.io.IOException e) {
00562                 logger.log(e);
00563             } finally {
00564                 isFinished = true;
00565                 threadMonitor.setActive(false);
00566                 try {
00567                     in.close();
00568                 } catch (java.io.IOException e) {
00569                     e.printStackTrace();
00570                 }
00571                 logger.log("Process finished Thread=" +
00572                     Thread.currentThread().getName());
00573             }
00574         }
00575     }
00576 
00577     
00578 
00583     protected class ProcessOutputHandler implements Runnable {
00584         private java.io.BufferedWriter out;
00585         private MessageSink messageSink;
00586 
00587         public ProcessOutputHandler(java.io.BufferedWriter out,
00588             MessageSink messageSink) {
00589             this.out = out;
00590             this.messageSink = messageSink;
00591         }
00592 
00593         public void run() {
00594             try {
00595                 while (shouldRun) {
00596                     waitForMonitoredThread();
00597                     
00598                     String message = messageSink.getMessage();
00599                     if (message == null) {
00600                         break;
00601                     }
00602                     try {
00603                         out.write(message);
00604                         out.newLine();
00605                         out.flush();
00606                         
00607                     } catch (java.io.IOException e) {
00608                         break;
00609                     }
00610                     message = null;
00611                 }
00612             } finally {
00613                 isFinished = true;
00614                 waitForMonitoredThread();
00615                 try {
00616                     out.close();
00617                 } catch (java.io.IOException e) {
00618                     e.printStackTrace();
00619                 }
00620             }
00621         }
00622     }
00623 
00624     
00625 }