00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.filetransfer;
00032
00033 import java.io.File;
00034 import java.util.HashMap;
00035 import java.util.Vector;
00036
00037 import org.apache.log4j.Logger;
00038 import org.objectweb.proactive.ProActive;
00039 import org.objectweb.proactive.core.ProActiveException;
00040 import org.objectweb.proactive.core.util.log.Loggers;
00041 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00042
00043
00051 class FileForwarder {
00052 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00053
00054
00055 protected HashMap<File, Vector<FileTransferRequest>> servingRequests;
00056
00057
00058 protected HashMap<File, Vector<FileTransferRequest>> newRequests;
00059
00060
00061 protected HashMap<File, Vector<FileTransferRequest>> failedRequests;
00062 protected FileTransferService servingFTS;
00063 protected FileDispatcher dispatcher;
00064
00065 public FileForwarder(FileTransferService fts) {
00066 servingRequests = new HashMap<File, Vector<FileTransferRequest>>();
00067 newRequests = new HashMap<File, Vector<FileTransferRequest>>();
00068 failedRequests = new HashMap<File, Vector<FileTransferRequest>>();
00069 servingFTS = fts;
00070
00071 try {
00072 dispatcher = (FileDispatcher) ProActive.newActive(FileDispatcher.class.getName(),
00073 null);
00074 } catch (Exception e) {
00075 }
00076 }
00077
00084 public synchronized void requestFileTransfer(FileTransferRequest fti) {
00085 if (logger.isDebugEnabled()) {
00086 logger.debug("Setting file transfer request for:" + fti);
00087 }
00088 if (!newRequests.containsKey(fti.getSrcFile())) {
00089 newRequests.put(fti.getSrcFile(), new Vector<FileTransferRequest>());
00090 }
00091
00092 Vector<FileTransferRequest> requests = newRequests.get(fti.getSrcFile());
00093 requests.add(fti);
00094 }
00095
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118 protected synchronized void forward(File srcFile, FileBlock block) {
00119 if (!servingRequests.containsKey(srcFile)) {
00120 return;
00121 }
00122
00123 Vector<FileTransferRequest> forward = servingRequests.get(srcFile);
00124 for (int i = 0; i < forward.size(); i++) {
00125 FileTransferRequest ftr = forward.get(i);
00126 OperationStatus opStat = dispatcher.sendBlock(ftr.getDestinationFTS(),
00127 block, ftr.getDstFile());
00128
00129 if (opStat.hasException()) {
00130 forward.remove(i);
00131 ftr.setDstFuture(opStat);
00132 addToHash(failedRequests, srcFile, ftr);
00133 }
00134 }
00135 }
00136
00137 protected synchronized void forwardWithoutThrowingException(File srcFile,
00138 FileBlock block) {
00139 if (!servingRequests.containsKey(srcFile)) {
00140 return;
00141 }
00142
00143 Vector<FileTransferRequest> forward = servingRequests.get(srcFile);
00144 for (int i = 0; i < forward.size(); i++) {
00145 FileTransferRequest ftr = forward.get(i);
00146 dispatcher.sendBlockFileBlockWithoutThrowingException(ftr.getDestinationFTS(),
00147 block, ftr.getDstFile());
00148
00149
00150
00151
00152
00153
00154
00155 }
00156 }
00157
00158 protected synchronized void forward(File srcFile, FileBlock[] block) {
00159 if (!servingRequests.containsKey(srcFile)) {
00160 return;
00161 }
00162
00163 Vector<FileTransferRequest> forward = servingRequests.get(srcFile);
00164 for (int i = 0; i < forward.size(); i++) {
00165 FileTransferRequest ftr = forward.get(i);
00166 for (int j = 0; j < block.length; j++) {
00167 dispatcher.sendBlockFileBlockWithoutThrowingException(ftr.getDestinationFTS(),
00168 block[j], ftr.getDstFile());
00169 }
00170
00171
00172
00173
00174
00175
00176
00177
00178 }
00179 }
00180
00186 protected synchronized void handleNewRequests(File file) {
00187 if (!newRequests.containsKey(file)) {
00188 return;
00189 }
00190
00191 if (logger.isDebugEnabled()) {
00192 logger.debug("Sending already saved data for file: " +
00193 file.getAbsolutePath() + " " + file.length() + "[bytes]");
00194 }
00195
00196 servingFTS.closeWrite(file);
00197
00198 Vector<FileTransferRequest> requests = newRequests.remove(file);
00199 for (int i = 0; i < requests.size(); i++) {
00200 FileTransferRequest ftr = requests.get(i);
00201
00202 if (!ftr.getSrcFile().equals(file)) {
00203 ftr.setDstFuture(new OperationStatus(
00204 new ProActiveException(
00205 "Error when sending saved data. Source files do not match:" +
00206 ftr.getSrcFile().getAbsolutePath() + " != " +
00207 file.getAbsolutePath())));
00208 addToHash(failedRequests, file, ftr);
00209 continue;
00210 }
00211
00212
00213 if (file.length() > 0) {
00214 OperationStatus opRes = servingFTS.sendFile(ftr.getDestinationFTS(),
00215 ftr.getSrcFile(), ftr.getDstFile());
00216 ProActive.waitFor(opRes);
00217 if (opRes.hasException()) {
00218 ftr.setDstFuture(opRes);
00219 addToHash(failedRequests, file, ftr);
00220 continue;
00221 } else {
00222
00223 ftr.getDestinationFTS().canWrite(ftr.getDstFile(), true);
00224 }
00225 }
00226
00227 addToHash(servingRequests, file, ftr);
00228 }
00229
00230 servingFTS.getWritingBuffer(file, true);
00231 }
00232
00233 private void addToHash(HashMap<File, Vector<FileTransferRequest>> hash,
00234 File file, FileTransferRequest ftr) {
00235 if (!hash.containsKey(file)) {
00236 hash.put(file, new Vector<FileTransferRequest>());
00237 }
00238 Vector<FileTransferRequest> v = hash.get(file);
00239
00240 v.add(ftr);
00241 }
00242
00243 public synchronized void closeForwardingService(File srcFile) {
00244 closeForwardingService(srcFile, null);
00245 }
00246
00247 public synchronized void closeForwardingService(File srcFile, Exception e) {
00248 servingFTS.closeWrite(srcFile);
00249
00250 Vector requests = (Vector) this.servingRequests.remove(srcFile);
00251 if (requests == null) {
00252 return;
00253 }
00254
00255 Vector<OperationStatus> opStat = new Vector<OperationStatus>();
00256 for (int i = 0; i < requests.size(); i++) {
00257 FileTransferRequest ftr = (FileTransferRequest) requests.get(i);
00258 FileTransferService remoteFTS = ftr.getDestinationFTS();
00259
00260 if (e != null) {
00261 opStat.add(dispatcher.closeForwardingService(remoteFTS,
00262 ftr.getDstFile(), e));
00263 ftr.setDstFuture(new OperationStatus(e));
00264 addToHash(failedRequests, srcFile, ftr);
00265 } else {
00266 opStat.add(dispatcher.closeForwardingService(remoteFTS,
00267 ftr.getDstFile()));
00268 }
00269 }
00270 ProActive.waitForAll(opStat);
00271 }
00272
00273 public synchronized void clearNewRequests(File srcFile) {
00274 this.newRequests.remove(srcFile);
00275 }
00276
00283 public synchronized OperationStatus getFileTransferRequestStatus(
00284 FileTransferRequest ftr) {
00285
00286 Vector<FileTransferRequest> v = newRequests.get(ftr.getSrcFile());
00287 for (int i = 0; (v != null) && (i < v.size()); i++) {
00288 if (ftr.equals(v.get(i))) {
00289 v.remove(i);
00290 return new OperationStatus(true);
00291 }
00292 }
00293 if ((v != null) && v.isEmpty()) {
00294 newRequests.remove(ftr.getSrcFile());
00295 }
00296
00297
00298 v = failedRequests.get(ftr.getSrcFile());
00299 for (int i = 0; (v != null) && (i < v.size()); i++) {
00300 if (ftr.equals(v.get(i))) {
00301 v.remove(i);
00302 return ftr.getOperationFuture();
00303 }
00304 }
00305
00306 if ((v != null) && v.isEmpty()) {
00307 failedRequests.remove(ftr.getSrcFile());
00308 }
00309
00310 return new OperationStatus();
00311 }
00312 }