org/objectweb/proactive/core/process/AbstractExternalProcess.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.process;
00032 
00033 import org.apache.log4j.Logger;
00034 
00035 import org.objectweb.proactive.core.process.filetransfer.CopyProtocol;
00036 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00037 import org.objectweb.proactive.core.util.RemoteProcessMessageLogger;
00038 import org.objectweb.proactive.core.util.log.Loggers;
00039 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00040 
00041 import java.io.IOException;
00042 
00043 
00044 public abstract class AbstractExternalProcess extends AbstractUniversalProcess
00045     implements ExternalProcess {
00046     protected static Logger clogger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_PROCESS);
00047     protected static Logger fileTransferLogger = ProActiveLogger.getLogger(Loggers.DEPLOYMENT_FILETRANSFER);
00048     protected static final boolean IS_WINDOWS_SYSTEM = System.getProperty(
00049             "os.name").toLowerCase().startsWith("win");
00050     protected Process externalProcess;
00051     private volatile boolean shouldRun = true;
00052     public static final int NO_COMPOSITION = 0;
00053     protected boolean closeStream = false;
00054     protected RemoteProcessMessageLogger inputMessageLogger;
00055     protected RemoteProcessMessageLogger errorMessageLogger;
00056     protected MessageSink outputMessageSink;
00057     private ThreadActivityMonitor inThreadMonitor;
00058     private ThreadActivityMonitor errThreadMonitor;
00059     private FileTransferWorkShop ftsDeploy = null;
00060     private FileTransferWorkShop ftsRetrieve = null;
00061     protected String FILE_TRANSFER_DEFAULT_PROTOCOL = "dummy";
00062 
00063     //Used to determine if a File Transfer is required to the Nodes deployed from
00064     //the VirtualNode. @see VirtualNodeImpl
00065     private boolean requiresFileTransferDeployOnNodeCreation = false;
00066 
00067     //
00068     // -- CONSTRUCTORS -----------------------------------------------
00069     //
00070     protected AbstractExternalProcess() {
00071     }
00072 
00073     public AbstractExternalProcess(RemoteProcessMessageLogger messageLogger) {
00074         this(messageLogger, messageLogger, null);
00075     }
00076 
00077     public AbstractExternalProcess(
00078         RemoteProcessMessageLogger inputMessageLogger,
00079         RemoteProcessMessageLogger errorMessageLogger) {
00080         this(inputMessageLogger, errorMessageLogger, null);
00081     }
00082 
00083     public AbstractExternalProcess(
00084         RemoteProcessMessageLogger inputMessageLogger,
00085         RemoteProcessMessageLogger errorMessageLogger,
00086         MessageSink outputMessageSink) {
00087         this.inputMessageLogger = inputMessageLogger;
00088         this.errorMessageLogger = errorMessageLogger;
00089         this.outputMessageSink = outputMessageSink;
00090     }
00091 
00092     //
00093     // -- PUBLIC METHODS -----------------------------------------------
00094     //
00095     //
00096     // -- implements ExternalProcess -----------------------------------------------
00097     //
00098     public void closeStream() {
00099         this.closeStream = true;
00100     }
00101 
00102     public RemoteProcessMessageLogger getInputMessageLogger() {
00103         return inputMessageLogger;
00104     }
00105 
00106     public RemoteProcessMessageLogger getErrorMessageLogger() {
00107         return errorMessageLogger;
00108     }
00109 
00110     public MessageSink getOutputMessageSink() {
00111         return outputMessageSink;
00112     }
00113 
00114     public void setInputMessageLogger(
00115         RemoteProcessMessageLogger inputMessageLogger) {
00116         checkStarted();
00117         this.inputMessageLogger = inputMessageLogger;
00118     }
00119 
00120     public void setErrorMessageLogger(
00121         RemoteProcessMessageLogger errorMessageLogger) {
00122         checkStarted();
00123         this.errorMessageLogger = errorMessageLogger;
00124     }
00125 
00126     public void setOutputMessageSink(MessageSink outputMessageSink) {
00127         checkStarted();
00128         this.outputMessageSink = outputMessageSink;
00129     }
00130 
00131     public FileTransferWorkShop getFileTransferWorkShopDeploy() {
00132         if (ftsDeploy == null) {
00133             ftsDeploy = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00134         }
00135 
00136         return ftsDeploy;
00137     }
00138 
00139     public FileTransferWorkShop getFileTransferWorkShopRetrieve() {
00140         if (ftsRetrieve == null) {
00141             ftsRetrieve = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00142         }
00143 
00144         return ftsRetrieve;
00145     }
00146 
00147     public String getFileTransferDefaultCopyProtocol() {
00148         return FILE_TRANSFER_DEFAULT_PROTOCOL;
00149     }
00150 
00151     public int getCompositionType() {
00152         return NO_COMPOSITION;
00153     }
00154 
00155     //
00156     // -- PROTECTED METHODS -----------------------------------------------
00157     //
00158     protected abstract String buildCommand();
00159 
00160     protected String buildEnvironmentCommand() {
00161         if (environment == null) {
00162             return "";
00163         }
00164         if (IS_WINDOWS_SYSTEM) {
00165             return buildWindowsEnvironmentCommand();
00166         } else {
00167             return buildUnixEnvironmentCommand();
00168         }
00169     }
00170 
00171     protected String buildWindowsEnvironmentCommand() {
00172         StringBuilder sb = new StringBuilder();
00173         for (int i = 0; i < environment.length; i++) {
00174             inputMessageLogger.log("      exporting variable " +
00175                 environment[i]);
00176             sb.append("set ");
00177             sb.append(environment[i]);
00178             sb.append(" ; ");
00179         }
00180         return sb.toString();
00181     }
00182 
00183     protected String buildUnixEnvironmentCommand() {
00184         StringBuilder sb = new StringBuilder();
00185         for (int i = 0; i < environment.length; i++) {
00186             inputMessageLogger.log("      exporting variable " +
00187                 environment[i]);
00188             sb.append("export ");
00189             sb.append(environment[i]);
00190             sb.append(" ; ");
00191         }
00192         return sb.toString();
00193     }
00194 
00195     protected void internalStartProcess(String commandToExecute)
00196         throws java.io.IOException {
00197         try {
00198             shouldRun = true;
00199             externalProcess = Runtime.getRuntime().exec(commandToExecute);
00200             java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(
00201                         externalProcess.getInputStream()));
00202             java.io.BufferedReader err = new java.io.BufferedReader(new java.io.InputStreamReader(
00203                         externalProcess.getErrorStream()));
00204             java.io.BufferedWriter out = new java.io.BufferedWriter(new java.io.OutputStreamWriter(
00205                         externalProcess.getOutputStream()));
00206             handleProcess(in, out, err);
00207         } catch (java.io.IOException e) {
00208             isFinished = true;
00209             throw e;
00210         }
00211     }
00212 
00213     protected void internalStopProcess() {
00214         shouldRun = false;
00215 
00216         if (externalProcess != null) {
00217             externalProcess.destroy();
00218         }
00219         if (outputMessageSink != null) {
00220             outputMessageSink.setMessage(null);
00221         }
00222     }
00223 
00224     protected int internalWaitFor() throws InterruptedException {
00225         return externalProcess.waitFor();
00226     }
00227 
00228     protected int internalExitValue() throws IllegalThreadStateException {
00229         return externalProcess.exitValue();
00230     }
00231 
00235     protected void internalStartFileTransfer(FileTransferWorkShop fts) {
00236         CopyProtocol[] copyProtocol = fts.getCopyProtocols();
00237         boolean success = false;
00238 
00239         long beginning = System.currentTimeMillis();
00240         long end = beginning;
00241 
00242         /*
00243            if (fileTransferLogger.isDebugEnabled()) {
00244                fileTransferLogger.debug(
00245                    "Using the following FileTransferWorkShop:\n" + fts);
00246            }
00247          */
00248         if (!fts.check()) {
00249             return; //No files to transfer or some error.
00250         }
00251 
00252         /* Try all the protocols for this FileTransferStructure
00253          * until one of them is successful */
00254         for (int i = 0; (i < copyProtocol.length) && !success; i++) {
00255             fileTransferLogger.info("Trying copyprotocol: " +
00256                 copyProtocol[i].getProtocolName());
00257             if (!copyProtocol[i].checkProtocol()) {
00258                 logger.error("Protocol check failed");
00259                 continue;
00260             }
00261 
00262             //If ProActive File Transfer will be used then we stop trying
00263             //more protocols. The file transfer will take place when
00264             //the nodes register back to this runtime, in the VirtualNodeImpl class.
00265             if (copyProtocol[i].getProtocolName().equalsIgnoreCase("pftp")) {
00266                 if (fileTransferLogger.isDebugEnabled()) {
00267                     fileTransferLogger.debug(
00268                         "ProActive File Transfer will be used later on.");
00269                 }
00270 
00271                 //TODO transfer this info to the virtual node level
00272                 success = true;
00273                 requiresFileTransferDeployOnNodeCreation = true;
00274             }
00275             //it's an internal protocol
00276             else if (copyProtocol[i].isDefaultProtocol() &&
00277                     copyProtocol[i].isDummyProtocol()) {
00278                 if (fileTransferLogger.isDebugEnabled()) {
00279                     fileTransferLogger.debug(
00280                         "Trying protocol internal filetransfer");
00281                 }
00282 
00283                 success = internalFileTransferDefaultProtocol();
00284             }
00285             //else simply try to start the external copy protocol
00286             else {
00287                 success = copyProtocol[i].startFileTransfer();
00288             }
00289         }
00290 
00291         end = System.currentTimeMillis();
00292 
00293         if (fileTransferLogger.isDebugEnabled()) {
00294             fileTransferLogger.debug("FileTransfer spent:" + (end - beginning) +
00295                 "[ms]");
00296         }
00297 
00298         if (!success) {
00299             fileTransferLogger.info("FileTransfer faild");
00300         }
00301     }
00302 
00308     protected boolean internalFileTransferDefaultProtocol() {
00309         //The default is false, to keep on trying the protocols
00310         return false;
00311     }
00312 
00316     public boolean isRequiredFileTransferDeployOnNodeCreation() {
00317         return requiresFileTransferDeployOnNodeCreation;
00318     }
00319 
00320     protected void handleProcess(java.io.BufferedReader in,
00321         java.io.BufferedWriter out, java.io.BufferedReader err) {
00322         if (closeStream) {
00323             try {
00324                 //the sleep might be needed for processes that fail if
00325                 // we close the in/err too early
00326                 Thread.sleep(200);
00327                 out.close();
00328                 err.close();
00329                 in.close();
00330             } catch (IOException e) {
00331                 // TODO Auto-generated catch block
00332                 e.printStackTrace();
00333             } catch (InterruptedException e) {
00334                 // TODO Auto-generated catch block
00335                 e.printStackTrace();
00336             }
00337         } else {
00338             handleInput(in);
00339             handleOutput(out);
00340             handleError(err);
00341         }
00342     }
00343 
00344     protected void handleInput(java.io.BufferedReader in) {
00345         if (inputMessageLogger == null) {
00346             return;
00347         }
00348         inThreadMonitor = new ThreadActivityMonitor();
00349         Runnable r = new ProcessInputHandler(in, inputMessageLogger,
00350                 inThreadMonitor);
00351         Thread t = new Thread(r, "IN -> " + getShortName(getCommand(), 20));
00352         t.start();
00353     }
00354 
00355     protected void handleError(java.io.BufferedReader err) {
00356         if (errorMessageLogger == null) {
00357             return;
00358         }
00359         errThreadMonitor = new ThreadActivityMonitor();
00360         Runnable r = new ProcessInputHandler(err, errorMessageLogger,
00361                 errThreadMonitor);
00362         Thread t = new Thread(r, "ERR -> " + getShortName(getCommand(), 20));
00363         t.start();
00364     }
00365 
00366     protected void handleOutput(java.io.BufferedWriter out) {
00367         if (outputMessageSink == null) {
00368             return;
00369         }
00370 
00371         //System.out.println("my output sink :"+outputMessageSink.toString());
00372         Runnable r = new ProcessOutputHandler(out, outputMessageSink);
00373         Thread t = new Thread(r, "OUT -> " + getShortName(getCommand(), 20));
00374         t.start();
00375     }
00376 
00377     //
00378     // -- PRIVATE METHODS -----------------------------------------------
00379     //
00380     private final String getShortName(String name, int length) {
00381         return name.substring(0, Math.min(name.length(), length));
00382     }
00383 
00384     private final void waitForMonitoredThread() {
00385         do {
00386             try {
00387                 Thread.sleep(300);
00388             } catch (InterruptedException e) {
00389             }
00390         } while (errThreadMonitor.isActive() || inThreadMonitor.isActive());
00391     }
00392 
00393     private void writeObject(java.io.ObjectOutputStream out)
00394         throws java.io.IOException {
00395         if (isStarted) {
00396             //if the process is started, we have to remove the external process
00397             // which is now not Serializable:UnixProcess or WindowsProcess
00398             externalProcess = null;
00399         }
00400         out.defaultWriteObject();
00401     }
00402 
00403     //
00404     // -- INNER CLASSES -----------------------------------------------
00405     //
00406     private static class ThreadActivityMonitor implements java.io.Serializable {
00407         private boolean isActive;
00408 
00409         public boolean isActive() {
00410             return isActive;
00411         }
00412 
00413         public void setActive(boolean b) {
00414             isActive = b;
00415         }
00416     }
00417 
00421     public static class StandardOutputMessageLogger
00422         implements RemoteProcessMessageLogger, java.io.Serializable {
00423         public StandardOutputMessageLogger() {
00424             //messageLogger.addAppender(new ConsoleAppender(new PatternLayout("%-5p %m %n")));
00425         }
00426 
00427         public void log(String message) {
00428             messageLogger.info(message);
00429         }
00430 
00431         public void log(Throwable t) {
00432             t.printStackTrace();
00433         }
00434 
00435         public void log(String message, Throwable t) {
00436             messageLogger.info(message);
00437             t.printStackTrace();
00438         }
00439     }
00440 
00441     // end inner class StandardOutputMessageLogger
00442 
00446     public static class NullMessageLogger implements RemoteProcessMessageLogger,
00447         java.io.Serializable {
00448         public NullMessageLogger() {
00449         }
00450 
00451         public void log(String message) {
00452         }
00453 
00454         public void log(Throwable t) {
00455         }
00456 
00457         public void log(String message, Throwable t) {
00458         }
00459     }
00460 
00461     // end inner class NullMessageLogger
00462 
00466     public static class SimpleMessageSink implements MessageSink,
00467         java.io.Serializable {
00468         private String message;
00469         private boolean isActive = true;
00470 
00471         public synchronized String getMessage() {
00472             if (!isActive) {
00473                 return null;
00474             }
00475             while ((message == null) && isActive) {
00476                 try {
00477                     wait();
00478                 } catch (InterruptedException e) {
00479                 }
00480             }
00481             String messageToSend = message;
00482             message = null;
00483             notifyAll();
00484             return messageToSend;
00485         }
00486 
00487         public synchronized void setMessage(String messageToPost) {
00488             if (!isActive) {
00489                 return;
00490             }
00491             while ((message != null) && isActive) {
00492                 try {
00493                     wait();
00494                 } catch (InterruptedException e) {
00495                 }
00496             }
00497             if (messageToPost == null) {
00498                 isActive = false;
00499             }
00500             this.message = messageToPost;
00501             notifyAll();
00502         }
00503 
00504         public synchronized boolean hasMessage() {
00505             return message != null;
00506         }
00507 
00508         public synchronized boolean isActive() {
00509             return isActive;
00510         }
00511     }
00512 
00513     // end inner class SimpleMessageSink
00514 
00519     protected class ProcessInputHandler implements Runnable {
00520         private java.io.BufferedReader in;
00521         private RemoteProcessMessageLogger logger;
00522         private ThreadActivityMonitor threadMonitor;
00523 
00524         public ProcessInputHandler(java.io.BufferedReader in,
00525             RemoteProcessMessageLogger logger,
00526             ThreadActivityMonitor threadMonitor) {
00527             this.in = in;
00528             this.logger = logger;
00529             this.threadMonitor = threadMonitor;
00530         }
00531 
00532         public void run() {
00533             if (AbstractExternalProcess.clogger.isDebugEnabled()) {
00534                 AbstractExternalProcess.clogger.debug("Process started Thread=" +
00535                     Thread.currentThread().getName());
00536             }
00537 
00538             //
00539             try {
00540                 while (shouldRun) {
00541                     threadMonitor.setActive(false);
00542                     //                    if (AbstractExternalProcess.clogger.isDebugEnabled()) {
00543                     //                                          AbstractExternalProcess.clogger.debug(
00544                     //                            "ProcessInputHandler before readLine()");
00545                     //                    }
00546                     String s = in.readLine();
00547                     if (AbstractExternalProcess.clogger.isDebugEnabled()) {
00548                         //                        AbstractExternalProcess.clogger.debug(
00549                         //                            "ProcessInputHandler after readLine() s=" + s);
00550                         AbstractExternalProcess.clogger.debug(s);
00551                     }
00552 
00553                     //
00554                     threadMonitor.setActive(true);
00555                     if (s == null) {
00556                         break;
00557                     } else {
00558                         logger.log(s);
00559                     }
00560                 }
00561             } catch (java.io.IOException e) {
00562                 logger.log(e);
00563             } finally {
00564                 isFinished = true;
00565                 threadMonitor.setActive(false);
00566                 try {
00567                     in.close();
00568                 } catch (java.io.IOException e) {
00569                     e.printStackTrace();
00570                 }
00571                 logger.log("Process finished Thread=" +
00572                     Thread.currentThread().getName());
00573             }
00574         }
00575     }
00576 
00577     // end inner class ProcessInputHandler
00578 
00583     protected class ProcessOutputHandler implements Runnable {
00584         private java.io.BufferedWriter out;
00585         private MessageSink messageSink;
00586 
00587         public ProcessOutputHandler(java.io.BufferedWriter out,
00588             MessageSink messageSink) {
00589             this.out = out;
00590             this.messageSink = messageSink;
00591         }
00592 
00593         public void run() {
00594             try {
00595                 while (shouldRun) {
00596                     waitForMonitoredThread();
00597                     //System.out.println("ProcessOutputHandler before getMessage()");
00598                     String message = messageSink.getMessage();
00599                     if (message == null) {
00600                         break;
00601                     }
00602                     try {
00603                         out.write(message);
00604                         out.newLine();
00605                         out.flush();
00606                         //System.out.println("ProcessOutputHandler writing "+message);
00607                     } catch (java.io.IOException e) {
00608                         break;
00609                     }
00610                     message = null;
00611                 }
00612             } finally {
00613                 isFinished = true;
00614                 waitForMonitoredThread();
00615                 try {
00616                     out.close();
00617                 } catch (java.io.IOException e) {
00618                     e.printStackTrace();
00619                 }
00620             }
00621         }
00622     }
00623 
00624     // end inner class ProcessOutputHandler
00625 }

Generated on Mon Jan 22 15:16:08 2007 for ProActive by  doxygen 1.5.1