00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.core.process;
00032 
00033 import org.objectweb.proactive.core.process.filetransfer.CopyProtocol;
00034 import org.objectweb.proactive.core.process.filetransfer.FileDependant;
00035 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00036 import org.objectweb.proactive.core.util.RemoteProcessMessageLogger;
00037 
00038 
00039 public abstract class AbstractExternalProcessDecorator
00040     extends AbstractExternalProcess implements ExternalProcessDecorator {
00041     protected ExternalProcess targetProcess;
00042     private int compositionType = APPEND_TO_COMMAND_COMPOSITION;
00043 
00044     
00045     
00046     
00047     public AbstractExternalProcessDecorator() {
00048         super();
00049     }
00050 
00051     public AbstractExternalProcessDecorator(ExternalProcess targetProcess) {
00052         this(targetProcess, APPEND_TO_COMMAND_COMPOSITION);
00053     }
00054 
00055     public AbstractExternalProcessDecorator(ExternalProcess targetProcess,
00056         int compositionType) {
00057         super();
00058         setTargetProcess(targetProcess);
00059         this.compositionType = compositionType;
00060     }
00061 
00062     
00063     
00064     
00065     public ExternalProcess getTargetProcess() {
00066         return targetProcess;
00067     }
00068 
00069     public void setTargetProcess(ExternalProcess targetProcess) {
00070         checkStarted();
00071         this.targetProcess = targetProcess;
00072         setInputMessageLogger(targetProcess.getInputMessageLogger());
00073         setErrorMessageLogger(targetProcess.getErrorMessageLogger());
00074         setOutputMessageSink(targetProcess.getOutputMessageSink());
00075     }
00076 
00077     public int getCompositionType() {
00078         return compositionType;
00079     }
00080 
00081     public void setCompositionType(int compositionType) {
00082         checkStarted();
00083         this.compositionType = compositionType;
00084     }
00085 
00086     
00087     
00088     
00089     protected void toString(StringBuilder sb) {
00090         super.toString(sb);
00091         sb.append(" ---- Target Process ----- \n");
00092         if (targetProcess == null) {
00093             sb.append(" NOT DEFINED \n");
00094         } else {
00095             sb.append(targetProcess.toString());
00096         }
00097         sb.append(" -------------- \n");
00098     }
00099 
00100     protected String buildCommand() {
00101         if ((compositionType == SEND_TO_OUTPUT_STREAM_COMPOSITION) ||
00102                 (compositionType == GIVE_COMMAND_AS_PARAMETER) ||
00103                 (compositionType == COPY_FILE_AND_APPEND_COMMAND)) {
00104             return internalBuildCommand();
00105         } else {
00106             if (targetProcess != null) {
00107                 
00108                 
00109                 
00110                 String targetCommand = targetProcess.getCommand();
00111                 if (targetProcess.getCompositionType() == COPY_FILE_AND_APPEND_COMMAND) {
00112                     handleCopyFile();
00113                 }
00114                 return internalBuildCommand() + targetCommand;
00115             } else {
00116                 return internalBuildCommand();
00117             }
00118         }
00119     }
00120 
00121     protected abstract String internalBuildCommand();
00122 
00123     protected void internalStartProcess(String command)
00124         throws java.io.IOException {
00125         super.internalStartProcess(command);
00126         if (compositionType == SEND_TO_OUTPUT_STREAM_COMPOSITION) {
00127             try {
00128                 Thread.sleep(3000);
00129             } catch (InterruptedException e) {
00130             }
00131 
00132             
00133             outputMessageSink.setMessage(targetProcess.getCommand());
00134         }
00135     }
00136 
00137     protected void handleOutput(java.io.BufferedWriter out) {
00138         if (compositionType == SEND_TO_OUTPUT_STREAM_COMPOSITION) {
00139             if (outputMessageSink == null) {
00140                 outputMessageSink = new SimpleMessageSink();
00141             }
00142         }
00143         super.handleOutput(out);
00144     }
00145 
00146     protected void handleCopyFile() {
00147         FileTransferWorkShop ftw = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00148         ftw.setFileTransferStructureDstInfo("hostname", hostname);
00149         ftw.setFileTransferStructureDstInfo("username", username);
00150         try {
00151             ftw.addFileTransfer(((FileDependant) targetProcess).getFileTransfertDefiniton());
00152         } catch (ClassCastException e) {
00153             logger.error(
00154                 "Unable to handle the file transfert dependant process");
00155             return;
00156         }
00157 
00158         CopyProtocol cp = ftw.copyProtocolFactory(getFileTransferDefaultCopyProtocol());
00159         cp.startFileTransfer();
00160     }
00161 
00162     
00163     
00164     
00165 
00169     public static class CompositeMessageLogger
00170         implements RemoteProcessMessageLogger, java.io.Serializable {
00171         private RemoteProcessMessageLogger messageLogger1;
00172         private RemoteProcessMessageLogger messageLogger2;
00173 
00174         public CompositeMessageLogger(
00175             RemoteProcessMessageLogger messageLogger1,
00176             RemoteProcessMessageLogger messageLogger2) {
00177             this.messageLogger1 = messageLogger1;
00178             this.messageLogger2 = messageLogger2;
00179         }
00180 
00181         public void log(String message) {
00182             messageLogger1.log(message);
00183             messageLogger2.log(message);
00184         }
00185 
00186         public void log(Throwable t) {
00187             messageLogger1.log(t);
00188             messageLogger2.log(t);
00189         }
00190 
00191         public void log(String message, Throwable t) {
00192             messageLogger1.log(message, t);
00193             messageLogger2.log(message, t);
00194         }
00195     } 
00196 
00200     public static class CompositeMessageSink implements MessageSink {
00201         private MessageSink messageSink1;
00202         private MessageSink messageSink2;
00203 
00204         public CompositeMessageSink(MessageSink messageSink1,
00205             MessageSink messageSink2) {
00206             this.messageSink1 = messageSink1;
00207             this.messageSink2 = messageSink2;
00208         }
00209 
00210         public synchronized String getMessage() {
00211             while ((!hasMessage()) && isActive()) {
00212                 try {
00213                     wait(1000);
00214                 } catch (InterruptedException e) {
00215                 }
00216             }
00217             if (messageSink1.hasMessage()) {
00218                 return messageSink1.getMessage();
00219             } else if (messageSink2.hasMessage()) {
00220                 return messageSink1.getMessage();
00221             }
00222             return null;
00223         }
00224 
00225         public synchronized void setMessage(String messageToPost) {
00226             messageSink1.setMessage(messageToPost);
00227             notifyAll();
00228         }
00229 
00230         public synchronized boolean hasMessage() {
00231             return messageSink1.hasMessage() || messageSink2.hasMessage();
00232         }
00233 
00234         public synchronized boolean isActive() {
00235             return messageSink1.isActive() || messageSink2.isActive();
00236         }
00237     } 
00238 }