00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.core.filetransfer;
00032 
00033 import java.io.BufferedInputStream;
00034 import java.io.BufferedOutputStream;
00035 import java.io.File;
00036 import java.io.FileInputStream;
00037 import java.io.FileNotFoundException;
00038 import java.io.FileOutputStream;
00039 import java.io.IOException;
00040 import java.util.HashMap;
00041 import java.util.zip.Adler32;
00042 import java.util.zip.CheckedInputStream;
00043 
00044 import org.apache.log4j.Logger;
00045 
00046 import org.objectweb.proactive.ProActive;
00047 import org.objectweb.proactive.ProActiveInternalObject;
00048 import org.objectweb.proactive.core.ProActiveException;
00049 import org.objectweb.proactive.core.util.log.Loggers;
00050 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00051 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00052 import org.objectweb.proactive.core.util.wrapper.LongWrapper;
00053 
00054 
00064 public class FileTransferService implements ProActiveInternalObject {
00065     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00066     public final static int DEFAULT_MAX_SIMULTANEOUS_BLOCKS = 8;
00067     public static final int DEFAULT_BUFFER_SIZE = 512 * 1024; 
00068     protected HashMap<File, BufferedInputStream> readBufferMap; 
00069     protected HashMap<File, BufferedOutputStream> writeBufferMap; 
00070     protected FileForwarder forwardFile; 
00071     
00072     public FileTransferService() {
00073         readBufferMap = new HashMap<File, BufferedInputStream>();
00074         writeBufferMap = new HashMap<File, BufferedOutputStream>();
00075         forwardFile = new FileForwarder(this);
00076     }
00077 
00078     public LongWrapper getFileLength(File f) {
00079         return new LongWrapper(f.length());
00080     }
00081 
00082     public BooleanWrapper canRead(File f) {
00083         return new BooleanWrapper(f.canRead());
00084     }
00085 
00086     public BooleanWrapper openWrite(File file) {
00087                 return new BooleanWrapper(getWritingBuffer(file, false) != null);
00088         }
00089     
00090         public BooleanWrapper canWrite(File file, boolean append) {
00091                 return new BooleanWrapper(getWritingBuffer(file, append) != null);
00092         }
00093         
00094         private BufferedOutputStream getWritingBuffer(File f) {
00095                 return getWritingBuffer(f, false);
00096         }
00097         
00098     protected synchronized BufferedOutputStream getWritingBuffer(File f, boolean append) {
00099         if (!writeBufferMap.containsKey(f)) {
00100             try {
00101                 BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(
00102                             f.getAbsolutePath(), append), DEFAULT_BUFFER_SIZE);
00103                 writeBufferMap.put(f, bos);
00104             } catch (FileNotFoundException e) {
00105                 logger.error("Unable to open file: " + f.getAbsolutePath());
00106                 return null;
00107             }
00108         }
00109 
00110         return writeBufferMap.get(f);
00111     }
00112 
00113     private synchronized BufferedInputStream getReadingBuffer(File f) {
00114         if (!readBufferMap.containsKey(f)) {
00115             try {
00116                 BufferedInputStream bis = new BufferedInputStream(new FileInputStream(
00117                             f.getAbsolutePath()), DEFAULT_BUFFER_SIZE);
00118                 readBufferMap.put(f, bis);
00119             } catch (FileNotFoundException e) {
00120                 logger.error("Unable to open file: " + f.getAbsolutePath());
00121                 return null;
00122             }
00123         }
00124 
00125         return readBufferMap.get(f);
00126     }
00127 
00128     public synchronized void closeRead(File f) {
00129         BufferedInputStream bis = readBufferMap.remove(f);
00130         try {
00131             if (bis != null) {
00132                 bis.close();
00133             }
00134         } catch (IOException e) {
00135         }
00136     }
00137 
00138     public synchronized BooleanWrapper closeWrite(File f) {
00139         BufferedOutputStream bos = writeBufferMap.remove(f);
00140         try {
00141             if (bos != null) {
00142                 bos.close();
00143             }
00144         } catch (IOException e) {
00145                 return new BooleanWrapper(false);
00146         }
00147         
00148         return new BooleanWrapper(true);
00149     }
00150 
00155     public void saveFileBlock(File dstFile, FileBlock block) throws IOException {
00156         
00157         BufferedOutputStream bos = getWritingBuffer(dstFile);
00158         block.saveCurrentBlock(bos);
00159     }
00160 
00161     public void saveFileBlockWithoutThrowingException(File dstFile, FileBlock block) {
00162         
00163         BufferedOutputStream bos = getWritingBuffer(dstFile);
00164         try {
00165                         block.saveCurrentBlock(bos);
00166                 } catch (IOException e) {
00167                         
00168                 }
00169     }
00170     
00171     public void saveFileBlockAndForward(File dstFile, FileBlock block) throws IOException {
00172         
00173         this.forwardFile.handleNewRequests(dstFile);
00174         this.forwardFile.forward(dstFile,block);
00175         
00176         BufferedOutputStream bos = getWritingBuffer(dstFile);
00177         block.saveCurrentBlock(bos);
00178     }
00179 
00180     public void saveFileBlockAndForwardWithoutThrowingException(File dstFile, FileBlock block) {
00181         
00182         this.forwardFile.handleNewRequests(dstFile);
00183         this.forwardFile.forward(dstFile,block);
00184         
00185         BufferedOutputStream bos = getWritingBuffer(dstFile);
00186         try {
00187                         block.saveCurrentBlock(bos);
00188                 } catch (IOException e) {
00189                         
00190                 }
00191     }
00192 
00200     public FileBlock getFileBlock(File file, long offset, int bsize){
00201 
00202         FileBlock newBlock = new FileBlock(offset, bsize);
00203         BufferedInputStream bis = getReadingBuffer(file);
00204         try {
00205                         newBlock.loadNextBlock(bis);
00206                 } catch (IOException e) {
00207                         newBlock.setException(e);
00208                 }
00209         return newBlock;
00210     }
00211 
00221     public OperationStatus sendFile(FileTransferService ftsRemote, File srcFile,File dstFile, int bsize, int numFlyingBlocks) {
00222         long init = System.currentTimeMillis();
00223         long numBlocks = 0;
00224         
00225         
00226         BufferedInputStream bis=getReadingBuffer(srcFile);
00227         if(bis==null){
00228             return new OperationStatus(new ProActiveException("Can not open for sending:" + srcFile.getAbsoluteFile()));
00229         }
00230         
00231         long totalNumBlocks = Math.round(Math.ceil((double) srcFile.length() / bsize));
00232         if (totalNumBlocks == 0) {
00233             closeRead(srcFile);
00234             return new OperationStatus(new ProActiveException("Can not send an empty File:" + srcFile.getAbsolutePath()));
00235         }
00236 
00237         BooleanWrapper bw = ftsRemote.openWrite(dstFile);
00238         if (bw.booleanValue() != true) {
00239                 closeRead(srcFile);
00240             return new OperationStatus(new ProActiveException("Unable to open remote file for writting: " +dstFile.getAbsolutePath()));
00241         }
00242 
00243         FileBlock fileBlock = new FileBlock(0, bsize);
00244         while (numBlocks < (totalNumBlocks - 1)) {
00245                 for (int i = 0; (i < numFlyingBlocks) && (numBlocks < (totalNumBlocks - 1)); i++) {
00246                         try{
00247                                 fileBlock.loadNextBlock(bis);
00248 
00249                                 if (i == (numFlyingBlocks - 1)) { 
00250                                         ftsRemote.saveFileBlockAndForward(dstFile,fileBlock);
00251                                 } else { 
00252                                         ftsRemote.saveFileBlockAndForwardWithoutThrowingException(dstFile,fileBlock); 
00253                                 }
00254                                 numBlocks++;
00255                         } catch (IOException e) {
00256                                 ftsRemote.closeForwardingService(dstFile,e); 
00257                                 return new OperationStatus(new ProActiveException("Can not send file block to:" + ProActive.getActiveObjectNodeUrl(ftsRemote),e));
00258                         }
00259                 }
00260         }
00261 
00262         
00263         try {
00264             fileBlock.loadNextBlock(bis);
00265             ftsRemote.saveFileBlock(dstFile,fileBlock);
00266             numBlocks++;
00267         } catch (IOException e) {
00268                 ftsRemote.closeForwardingService(dstFile,e); 
00269             return new OperationStatus(new ProActiveException("Can not send File to:" + ProActive.getActiveObjectNodeUrl(ftsRemote),e));
00270         }
00271 
00272         
00273         ProActive.waitFor(ftsRemote.closeWrite(dstFile));
00274         closeRead(srcFile);
00275         ftsRemote.closeForwardingService(dstFile); 
00276         
00277         if (logger.isDebugEnabled()) {
00278             long fin = System.currentTimeMillis();
00279             long delta = (fin - init);
00280             logger.debug("File "+dstFile.getAbsolutePath()+" sent using " + numBlocks + " blocks,  in: " +
00281                 delta + "[ms]");
00282         }
00283 
00284         return new OperationStatus();
00285     }
00286 
00287         public void closeForwardingService(File dstFile) {
00288                 this.forwardFile.closeForwardingService(dstFile);
00289         }
00290 
00291     public void closeForwardingService(File dstFile, Exception e) {
00292                 this.forwardFile.closeForwardingService(dstFile, e);
00293         }
00294 
00304         public OperationStatus receiveFile(FileTransferService ftsRemote, File srcFile, File dstFile, int bsize, int numFlyingBlocks) {
00305         long numBlocks = 0;
00306                 long init = System.currentTimeMillis();
00307 
00308         LongWrapper length = ftsRemote.getFileLength(srcFile);
00309         if (length.longValue() <= 0) {
00310             closeWrite(dstFile);
00311             OperationStatus fc = new OperationStatus(new Exception("Unable to open remote file for reading:" + srcFile.getAbsolutePath()));
00312             return fc;
00313         }
00314         
00315         long totalNumBlocks = Math.round(Math.ceil((double) length.longValue() / bsize));
00316         FileBlock[] flyingBlocks = new FileBlock[numFlyingBlocks];
00317         while (numBlocks < totalNumBlocks) {
00318             int i;
00319             
00320             for (i = 0; (i < flyingBlocks.length) && (numBlocks < totalNumBlocks); i++) {
00321                 flyingBlocks[i] = ftsRemote.getFileBlock(srcFile, bsize * numBlocks, bsize); 
00322                 numBlocks++;
00323             }
00324             
00325             forwardFile.handleNewRequests(dstFile);
00326             forwardFile.forward(dstFile,flyingBlocks);
00327             
00328             
00329             for (int j = 0; j < i; j++) {
00330                 if(flyingBlocks[j].hasException()){
00331                         Exception e= flyingBlocks[j].getException();
00332                         forwardFile.closeForwardingService(dstFile,e);
00333                         return new OperationStatus(e);
00334                 }
00335                 try {
00336                                         saveFileBlock(dstFile,flyingBlocks[j]);
00337                                 } catch (IOException e) {
00338                                         forwardFile.closeForwardingService(dstFile,e);
00339                                         return new OperationStatus(e);
00340                                 }
00341             }
00342         }
00343 
00344         
00345         ftsRemote.closeRead(srcFile); 
00346         closeWrite(dstFile);
00347         
00348         forwardFile.closeForwardingService(dstFile); 
00349 
00350         if (logger.isDebugEnabled()) {
00351             long fin = System.currentTimeMillis();
00352             long delta = (fin - init);
00353             logger.debug("File "+dstFile.getAbsolutePath()+" received using " + numBlocks + " blocks,  in: " +
00354                 delta + "[ms]");
00355         }
00356 
00357         return new OperationStatus();
00358     }
00359 
00360         
00361 
00362 
00363 
00364 
00365         
00366     public OperationStatus sendFile(FileTransferService remoteFTS, File srcFile,
00367         File dstFile) {
00368         return sendFile(remoteFTS, srcFile, dstFile,
00369             FileBlock.DEFAULT_BLOCK_SIZE, DEFAULT_MAX_SIMULTANEOUS_BLOCKS);
00370     }
00371     
00376     public void requestFileTransfer(FileTransferRequest fti) {
00377         forwardFile.requestFileTransfer(fti);
00378     }
00379 
00380     public OperationStatus getFileTransferRequestStatus(FileTransferRequest fti) {
00381         return forwardFile.getFileTransferRequestStatus(fti);
00382     }
00383 
00397     public OperationStatus submitFileTransferRequest(FileTransferService meFTS,
00398         FileTransferRequest ftr, OperationStatus parentOpStat) {
00399         FileTransferService srcFTS = ftr.getSourceFTS();
00400 
00401         OperationStatus opstat=null;
00402 
00403         
00404         if (ProActive.isAwaited(parentOpStat)) {
00405                 if(logger.isDebugEnabled()){
00406                         logger.debug("File transfer request will be handled using forwarding");
00407                 }
00408             srcFTS.requestFileTransfer(ftr); 
00409             ProActive.waitFor(parentOpStat);
00410             if(parentOpStat.hasException()){ 
00411                 srcFTS.getFileTransferRequestStatus(ftr); 
00412                 return parentOpStat; 
00413             }
00414             
00415             opstat = srcFTS.getFileTransferRequestStatus(ftr); 
00416             if(opstat.hasException()) return opstat; 
00417         }
00418 
00419         
00420         if (opstat ==null || opstat.isPending()) {
00421                 if(logger.isDebugEnabled()){
00422                         logger.debug("File transfer request will be handled directly");
00423                 }
00424         
00425                 String myNodeURL=ProActive.getActiveObjectNodeUrl(meFTS);
00426                 String srcNodeURL=ProActive.getActiveObjectNodeUrl(ftr.getSourceFTS());
00427 
00428                 
00429                 if(!myNodeURL.equals(srcNodeURL)){
00430                         try {
00431                                         srcFTS = FileTransferEngine.getFileTransferEngine().getFTS(srcNodeURL);
00432                                         opstat = srcFTS.sendFile(ftr.getDestinationFTS(), ftr.getSrcFile(),ftr.getDstFile());
00433                                         ftr.setSourceFTS(srcFTS);
00434                                 } catch (Exception e) {
00435                                         return new OperationStatus( new ProActiveException("Unable to create File Transfer Service on node: "+ srcNodeURL, e));
00436                                 }
00437                 }
00438                 else{ 
00439                         opstat=sendFile(ftr.getDestinationFTS(), ftr.getSrcFile(),ftr.getDstFile());
00440                         ftr.setSourceFTS(meFTS);
00441                 }
00442         }
00443         return opstat;
00444     }
00445 
00452     public long checkSum(File file) throws IOException {
00453         
00454         CheckedInputStream cis = new CheckedInputStream(new FileInputStream(
00455                     file.getAbsoluteFile()), new Adler32());
00456         byte[] tempBuf = new byte[1024 * 1024]; 
00457         while (cis.read(tempBuf) >= 0)
00458             ;
00459 
00460         return cis.getChecksum().getValue();
00461     }
00462     
00466     public String sayHello() {
00467         return "Hello World from " + getHostName() + " at " +
00468         new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date());
00469     }
00470 
00475     public String getHostName() {
00476         try {
00477             return java.net.InetAddress.getLocalHost().toString();
00478         } catch (Exception e) {
00479             return "unknown";
00480         }
00481     }
00482 }