00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.filetransfer;
00032
00033 import java.io.BufferedInputStream;
00034 import java.io.BufferedOutputStream;
00035 import java.io.File;
00036 import java.io.FileInputStream;
00037 import java.io.FileNotFoundException;
00038 import java.io.FileOutputStream;
00039 import java.io.IOException;
00040 import java.util.HashMap;
00041 import java.util.zip.Adler32;
00042 import java.util.zip.CheckedInputStream;
00043
00044 import org.apache.log4j.Logger;
00045
00046 import org.objectweb.proactive.ProActive;
00047 import org.objectweb.proactive.ProActiveInternalObject;
00048 import org.objectweb.proactive.core.ProActiveException;
00049 import org.objectweb.proactive.core.util.log.Loggers;
00050 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00051 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00052 import org.objectweb.proactive.core.util.wrapper.LongWrapper;
00053
00054
00064 public class FileTransferService implements ProActiveInternalObject {
00065 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00066 public final static int DEFAULT_MAX_SIMULTANEOUS_BLOCKS = 8;
00067 public static final int DEFAULT_BUFFER_SIZE = 512 * 1024;
00068 protected HashMap<File, BufferedInputStream> readBufferMap;
00069 protected HashMap<File, BufferedOutputStream> writeBufferMap;
00070 protected FileForwarder forwardFile;
00071
00072 public FileTransferService() {
00073 readBufferMap = new HashMap<File, BufferedInputStream>();
00074 writeBufferMap = new HashMap<File, BufferedOutputStream>();
00075 forwardFile = new FileForwarder(this);
00076 }
00077
00078 public LongWrapper getFileLength(File f) {
00079 return new LongWrapper(f.length());
00080 }
00081
00082 public BooleanWrapper canRead(File f) {
00083 return new BooleanWrapper(f.canRead());
00084 }
00085
00086 public BooleanWrapper openWrite(File file) {
00087 return new BooleanWrapper(getWritingBuffer(file, false) != null);
00088 }
00089
00090 public BooleanWrapper canWrite(File file, boolean append) {
00091 return new BooleanWrapper(getWritingBuffer(file, append) != null);
00092 }
00093
00094 private BufferedOutputStream getWritingBuffer(File f) {
00095 return getWritingBuffer(f, false);
00096 }
00097
00098 protected synchronized BufferedOutputStream getWritingBuffer(File f, boolean append) {
00099 if (!writeBufferMap.containsKey(f)) {
00100 try {
00101 BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(
00102 f.getAbsolutePath(), append), DEFAULT_BUFFER_SIZE);
00103 writeBufferMap.put(f, bos);
00104 } catch (FileNotFoundException e) {
00105 logger.error("Unable to open file: " + f.getAbsolutePath());
00106 return null;
00107 }
00108 }
00109
00110 return writeBufferMap.get(f);
00111 }
00112
00113 private synchronized BufferedInputStream getReadingBuffer(File f) {
00114 if (!readBufferMap.containsKey(f)) {
00115 try {
00116 BufferedInputStream bis = new BufferedInputStream(new FileInputStream(
00117 f.getAbsolutePath()), DEFAULT_BUFFER_SIZE);
00118 readBufferMap.put(f, bis);
00119 } catch (FileNotFoundException e) {
00120 logger.error("Unable to open file: " + f.getAbsolutePath());
00121 return null;
00122 }
00123 }
00124
00125 return readBufferMap.get(f);
00126 }
00127
00128 public synchronized void closeRead(File f) {
00129 BufferedInputStream bis = readBufferMap.remove(f);
00130 try {
00131 if (bis != null) {
00132 bis.close();
00133 }
00134 } catch (IOException e) {
00135 }
00136 }
00137
00138 public synchronized BooleanWrapper closeWrite(File f) {
00139 BufferedOutputStream bos = writeBufferMap.remove(f);
00140 try {
00141 if (bos != null) {
00142 bos.close();
00143 }
00144 } catch (IOException e) {
00145 return new BooleanWrapper(false);
00146 }
00147
00148 return new BooleanWrapper(true);
00149 }
00150
00155 public void saveFileBlock(File dstFile, FileBlock block) throws IOException {
00156
00157 BufferedOutputStream bos = getWritingBuffer(dstFile);
00158 block.saveCurrentBlock(bos);
00159 }
00160
00161 public void saveFileBlockWithoutThrowingException(File dstFile, FileBlock block) {
00162
00163 BufferedOutputStream bos = getWritingBuffer(dstFile);
00164 try {
00165 block.saveCurrentBlock(bos);
00166 } catch (IOException e) {
00167
00168 }
00169 }
00170
00171 public void saveFileBlockAndForward(File dstFile, FileBlock block) throws IOException {
00172
00173 this.forwardFile.handleNewRequests(dstFile);
00174 this.forwardFile.forward(dstFile,block);
00175
00176 BufferedOutputStream bos = getWritingBuffer(dstFile);
00177 block.saveCurrentBlock(bos);
00178 }
00179
00180 public void saveFileBlockAndForwardWithoutThrowingException(File dstFile, FileBlock block) {
00181
00182 this.forwardFile.handleNewRequests(dstFile);
00183 this.forwardFile.forward(dstFile,block);
00184
00185 BufferedOutputStream bos = getWritingBuffer(dstFile);
00186 try {
00187 block.saveCurrentBlock(bos);
00188 } catch (IOException e) {
00189
00190 }
00191 }
00192
00200 public FileBlock getFileBlock(File file, long offset, int bsize){
00201
00202 FileBlock newBlock = new FileBlock(offset, bsize);
00203 BufferedInputStream bis = getReadingBuffer(file);
00204 try {
00205 newBlock.loadNextBlock(bis);
00206 } catch (IOException e) {
00207 newBlock.setException(e);
00208 }
00209 return newBlock;
00210 }
00211
00221 public OperationStatus sendFile(FileTransferService ftsRemote, File srcFile,File dstFile, int bsize, int numFlyingBlocks) {
00222 long init = System.currentTimeMillis();
00223 long numBlocks = 0;
00224
00225
00226 BufferedInputStream bis=getReadingBuffer(srcFile);
00227 if(bis==null){
00228 return new OperationStatus(new ProActiveException("Can not open for sending:" + srcFile.getAbsoluteFile()));
00229 }
00230
00231 long totalNumBlocks = Math.round(Math.ceil((double) srcFile.length() / bsize));
00232 if (totalNumBlocks == 0) {
00233 closeRead(srcFile);
00234 return new OperationStatus(new ProActiveException("Can not send an empty File:" + srcFile.getAbsolutePath()));
00235 }
00236
00237 BooleanWrapper bw = ftsRemote.openWrite(dstFile);
00238 if (bw.booleanValue() != true) {
00239 closeRead(srcFile);
00240 return new OperationStatus(new ProActiveException("Unable to open remote file for writting: " +dstFile.getAbsolutePath()));
00241 }
00242
00243 FileBlock fileBlock = new FileBlock(0, bsize);
00244 while (numBlocks < (totalNumBlocks - 1)) {
00245 for (int i = 0; (i < numFlyingBlocks) && (numBlocks < (totalNumBlocks - 1)); i++) {
00246 try{
00247 fileBlock.loadNextBlock(bis);
00248
00249 if (i == (numFlyingBlocks - 1)) {
00250 ftsRemote.saveFileBlockAndForward(dstFile,fileBlock);
00251 } else {
00252 ftsRemote.saveFileBlockAndForwardWithoutThrowingException(dstFile,fileBlock);
00253 }
00254 numBlocks++;
00255 } catch (IOException e) {
00256 ftsRemote.closeForwardingService(dstFile,e);
00257 return new OperationStatus(new ProActiveException("Can not send file block to:" + ProActive.getActiveObjectNodeUrl(ftsRemote),e));
00258 }
00259 }
00260 }
00261
00262
00263 try {
00264 fileBlock.loadNextBlock(bis);
00265 ftsRemote.saveFileBlock(dstFile,fileBlock);
00266 numBlocks++;
00267 } catch (IOException e) {
00268 ftsRemote.closeForwardingService(dstFile,e);
00269 return new OperationStatus(new ProActiveException("Can not send File to:" + ProActive.getActiveObjectNodeUrl(ftsRemote),e));
00270 }
00271
00272
00273 ProActive.waitFor(ftsRemote.closeWrite(dstFile));
00274 closeRead(srcFile);
00275 ftsRemote.closeForwardingService(dstFile);
00276
00277 if (logger.isDebugEnabled()) {
00278 long fin = System.currentTimeMillis();
00279 long delta = (fin - init);
00280 logger.debug("File "+dstFile.getAbsolutePath()+" sent using " + numBlocks + " blocks, in: " +
00281 delta + "[ms]");
00282 }
00283
00284 return new OperationStatus();
00285 }
00286
00287 public void closeForwardingService(File dstFile) {
00288 this.forwardFile.closeForwardingService(dstFile);
00289 }
00290
00291 public void closeForwardingService(File dstFile, Exception e) {
00292 this.forwardFile.closeForwardingService(dstFile, e);
00293 }
00294
00304 public OperationStatus receiveFile(FileTransferService ftsRemote, File srcFile, File dstFile, int bsize, int numFlyingBlocks) {
00305 long numBlocks = 0;
00306 long init = System.currentTimeMillis();
00307
00308 LongWrapper length = ftsRemote.getFileLength(srcFile);
00309 if (length.longValue() <= 0) {
00310 closeWrite(dstFile);
00311 OperationStatus fc = new OperationStatus(new Exception("Unable to open remote file for reading:" + srcFile.getAbsolutePath()));
00312 return fc;
00313 }
00314
00315 long totalNumBlocks = Math.round(Math.ceil((double) length.longValue() / bsize));
00316 FileBlock[] flyingBlocks = new FileBlock[numFlyingBlocks];
00317 while (numBlocks < totalNumBlocks) {
00318 int i;
00319
00320 for (i = 0; (i < flyingBlocks.length) && (numBlocks < totalNumBlocks); i++) {
00321 flyingBlocks[i] = ftsRemote.getFileBlock(srcFile, bsize * numBlocks, bsize);
00322 numBlocks++;
00323 }
00324
00325 forwardFile.handleNewRequests(dstFile);
00326 forwardFile.forward(dstFile,flyingBlocks);
00327
00328
00329 for (int j = 0; j < i; j++) {
00330 if(flyingBlocks[j].hasException()){
00331 Exception e= flyingBlocks[j].getException();
00332 forwardFile.closeForwardingService(dstFile,e);
00333 return new OperationStatus(e);
00334 }
00335 try {
00336 saveFileBlock(dstFile,flyingBlocks[j]);
00337 } catch (IOException e) {
00338 forwardFile.closeForwardingService(dstFile,e);
00339 return new OperationStatus(e);
00340 }
00341 }
00342 }
00343
00344
00345 ftsRemote.closeRead(srcFile);
00346 closeWrite(dstFile);
00347
00348 forwardFile.closeForwardingService(dstFile);
00349
00350 if (logger.isDebugEnabled()) {
00351 long fin = System.currentTimeMillis();
00352 long delta = (fin - init);
00353 logger.debug("File "+dstFile.getAbsolutePath()+" received using " + numBlocks + " blocks, in: " +
00354 delta + "[ms]");
00355 }
00356
00357 return new OperationStatus();
00358 }
00359
00360
00361
00362
00363
00364
00365
00366 public OperationStatus sendFile(FileTransferService remoteFTS, File srcFile,
00367 File dstFile) {
00368 return sendFile(remoteFTS, srcFile, dstFile,
00369 FileBlock.DEFAULT_BLOCK_SIZE, DEFAULT_MAX_SIMULTANEOUS_BLOCKS);
00370 }
00371
00376 public void requestFileTransfer(FileTransferRequest fti) {
00377 forwardFile.requestFileTransfer(fti);
00378 }
00379
00380 public OperationStatus getFileTransferRequestStatus(FileTransferRequest fti) {
00381 return forwardFile.getFileTransferRequestStatus(fti);
00382 }
00383
00397 public OperationStatus submitFileTransferRequest(FileTransferService meFTS,
00398 FileTransferRequest ftr, OperationStatus parentOpStat) {
00399 FileTransferService srcFTS = ftr.getSourceFTS();
00400
00401 OperationStatus opstat=null;
00402
00403
00404 if (ProActive.isAwaited(parentOpStat)) {
00405 if(logger.isDebugEnabled()){
00406 logger.debug("File transfer request will be handled using forwarding");
00407 }
00408 srcFTS.requestFileTransfer(ftr);
00409 ProActive.waitFor(parentOpStat);
00410 if(parentOpStat.hasException()){
00411 srcFTS.getFileTransferRequestStatus(ftr);
00412 return parentOpStat;
00413 }
00414
00415 opstat = srcFTS.getFileTransferRequestStatus(ftr);
00416 if(opstat.hasException()) return opstat;
00417 }
00418
00419
00420 if (opstat ==null || opstat.isPending()) {
00421 if(logger.isDebugEnabled()){
00422 logger.debug("File transfer request will be handled directly");
00423 }
00424
00425 String myNodeURL=ProActive.getActiveObjectNodeUrl(meFTS);
00426 String srcNodeURL=ProActive.getActiveObjectNodeUrl(ftr.getSourceFTS());
00427
00428
00429 if(!myNodeURL.equals(srcNodeURL)){
00430 try {
00431 srcFTS = FileTransferEngine.getFileTransferEngine().getFTS(srcNodeURL);
00432 opstat = srcFTS.sendFile(ftr.getDestinationFTS(), ftr.getSrcFile(),ftr.getDstFile());
00433 ftr.setSourceFTS(srcFTS);
00434 } catch (Exception e) {
00435 return new OperationStatus( new ProActiveException("Unable to create File Transfer Service on node: "+ srcNodeURL, e));
00436 }
00437 }
00438 else{
00439 opstat=sendFile(ftr.getDestinationFTS(), ftr.getSrcFile(),ftr.getDstFile());
00440 ftr.setSourceFTS(meFTS);
00441 }
00442 }
00443 return opstat;
00444 }
00445
00452 public long checkSum(File file) throws IOException {
00453
00454 CheckedInputStream cis = new CheckedInputStream(new FileInputStream(
00455 file.getAbsoluteFile()), new Adler32());
00456 byte[] tempBuf = new byte[1024 * 1024];
00457 while (cis.read(tempBuf) >= 0)
00458 ;
00459
00460 return cis.getChecksum().getValue();
00461 }
00462
00466 public String sayHello() {
00467 return "Hello World from " + getHostName() + " at " +
00468 new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date());
00469 }
00470
00475 public String getHostName() {
00476 try {
00477 return java.net.InetAddress.getLocalHost().toString();
00478 } catch (Exception e) {
00479 return "unknown";
00480 }
00481 }
00482 }