00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.process;
00032
00033 import org.objectweb.proactive.core.process.filetransfer.CopyProtocol;
00034 import org.objectweb.proactive.core.process.filetransfer.FileDependant;
00035 import org.objectweb.proactive.core.process.filetransfer.FileTransferWorkShop;
00036 import org.objectweb.proactive.core.util.RemoteProcessMessageLogger;
00037
00038
00039 public abstract class AbstractExternalProcessDecorator
00040 extends AbstractExternalProcess implements ExternalProcessDecorator {
00041 protected ExternalProcess targetProcess;
00042 private int compositionType = APPEND_TO_COMMAND_COMPOSITION;
00043
00044
00045
00046
00047 public AbstractExternalProcessDecorator() {
00048 super();
00049 }
00050
00051 public AbstractExternalProcessDecorator(ExternalProcess targetProcess) {
00052 this(targetProcess, APPEND_TO_COMMAND_COMPOSITION);
00053 }
00054
00055 public AbstractExternalProcessDecorator(ExternalProcess targetProcess,
00056 int compositionType) {
00057 super();
00058 setTargetProcess(targetProcess);
00059 this.compositionType = compositionType;
00060 }
00061
00062
00063
00064
00065 public ExternalProcess getTargetProcess() {
00066 return targetProcess;
00067 }
00068
00069 public void setTargetProcess(ExternalProcess targetProcess) {
00070 checkStarted();
00071 this.targetProcess = targetProcess;
00072 setInputMessageLogger(targetProcess.getInputMessageLogger());
00073 setErrorMessageLogger(targetProcess.getErrorMessageLogger());
00074 setOutputMessageSink(targetProcess.getOutputMessageSink());
00075 }
00076
00077 public int getCompositionType() {
00078 return compositionType;
00079 }
00080
00081 public void setCompositionType(int compositionType) {
00082 checkStarted();
00083 this.compositionType = compositionType;
00084 }
00085
00086
00087
00088
00089 protected void toString(StringBuilder sb) {
00090 super.toString(sb);
00091 sb.append(" ---- Target Process ----- \n");
00092 if (targetProcess == null) {
00093 sb.append(" NOT DEFINED \n");
00094 } else {
00095 sb.append(targetProcess.toString());
00096 }
00097 sb.append(" -------------- \n");
00098 }
00099
00100 protected String buildCommand() {
00101 if ((compositionType == SEND_TO_OUTPUT_STREAM_COMPOSITION) ||
00102 (compositionType == GIVE_COMMAND_AS_PARAMETER) ||
00103 (compositionType == COPY_FILE_AND_APPEND_COMMAND)) {
00104 return internalBuildCommand();
00105 } else {
00106 if (targetProcess != null) {
00107
00108
00109
00110 String targetCommand = targetProcess.getCommand();
00111 if (targetProcess.getCompositionType() == COPY_FILE_AND_APPEND_COMMAND) {
00112 handleCopyFile();
00113 }
00114 return internalBuildCommand() + targetCommand;
00115 } else {
00116 return internalBuildCommand();
00117 }
00118 }
00119 }
00120
00121 protected abstract String internalBuildCommand();
00122
00123 protected void internalStartProcess(String command)
00124 throws java.io.IOException {
00125 super.internalStartProcess(command);
00126 if (compositionType == SEND_TO_OUTPUT_STREAM_COMPOSITION) {
00127 try {
00128 Thread.sleep(3000);
00129 } catch (InterruptedException e) {
00130 }
00131
00132
00133 outputMessageSink.setMessage(targetProcess.getCommand());
00134 }
00135 }
00136
00137 protected void handleOutput(java.io.BufferedWriter out) {
00138 if (compositionType == SEND_TO_OUTPUT_STREAM_COMPOSITION) {
00139 if (outputMessageSink == null) {
00140 outputMessageSink = new SimpleMessageSink();
00141 }
00142 }
00143 super.handleOutput(out);
00144 }
00145
00146 protected void handleCopyFile() {
00147 FileTransferWorkShop ftw = new FileTransferWorkShop(getFileTransferDefaultCopyProtocol());
00148 ftw.setFileTransferStructureDstInfo("hostname", hostname);
00149 ftw.setFileTransferStructureDstInfo("username", username);
00150 try {
00151 ftw.addFileTransfer(((FileDependant) targetProcess).getFileTransfertDefiniton());
00152 } catch (ClassCastException e) {
00153 logger.error(
00154 "Unable to handle the file transfert dependant process");
00155 return;
00156 }
00157
00158 CopyProtocol cp = ftw.copyProtocolFactory(getFileTransferDefaultCopyProtocol());
00159 cp.startFileTransfer();
00160 }
00161
00162
00163
00164
00165
00169 public static class CompositeMessageLogger
00170 implements RemoteProcessMessageLogger, java.io.Serializable {
00171 private RemoteProcessMessageLogger messageLogger1;
00172 private RemoteProcessMessageLogger messageLogger2;
00173
00174 public CompositeMessageLogger(
00175 RemoteProcessMessageLogger messageLogger1,
00176 RemoteProcessMessageLogger messageLogger2) {
00177 this.messageLogger1 = messageLogger1;
00178 this.messageLogger2 = messageLogger2;
00179 }
00180
00181 public void log(String message) {
00182 messageLogger1.log(message);
00183 messageLogger2.log(message);
00184 }
00185
00186 public void log(Throwable t) {
00187 messageLogger1.log(t);
00188 messageLogger2.log(t);
00189 }
00190
00191 public void log(String message, Throwable t) {
00192 messageLogger1.log(message, t);
00193 messageLogger2.log(message, t);
00194 }
00195 }
00196
00200 public static class CompositeMessageSink implements MessageSink {
00201 private MessageSink messageSink1;
00202 private MessageSink messageSink2;
00203
00204 public CompositeMessageSink(MessageSink messageSink1,
00205 MessageSink messageSink2) {
00206 this.messageSink1 = messageSink1;
00207 this.messageSink2 = messageSink2;
00208 }
00209
00210 public synchronized String getMessage() {
00211 while ((!hasMessage()) && isActive()) {
00212 try {
00213 wait(1000);
00214 } catch (InterruptedException e) {
00215 }
00216 }
00217 if (messageSink1.hasMessage()) {
00218 return messageSink1.getMessage();
00219 } else if (messageSink2.hasMessage()) {
00220 return messageSink1.getMessage();
00221 }
00222 return null;
00223 }
00224
00225 public synchronized void setMessage(String messageToPost) {
00226 messageSink1.setMessage(messageToPost);
00227 notifyAll();
00228 }
00229
00230 public synchronized boolean hasMessage() {
00231 return messageSink1.hasMessage() || messageSink2.hasMessage();
00232 }
00233
00234 public synchronized boolean isActive() {
00235 return messageSink1.isActive() || messageSink2.isActive();
00236 }
00237 }
00238 }