org/objectweb/proactive/core/filetransfer/FileTransferService.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.filetransfer;
00032 
00033 import java.io.BufferedInputStream;
00034 import java.io.BufferedOutputStream;
00035 import java.io.File;
00036 import java.io.FileInputStream;
00037 import java.io.FileNotFoundException;
00038 import java.io.FileOutputStream;
00039 import java.io.IOException;
00040 import java.util.HashMap;
00041 import java.util.zip.Adler32;
00042 import java.util.zip.CheckedInputStream;
00043 
00044 import org.apache.log4j.Logger;
00045 
00046 import org.objectweb.proactive.ProActive;
00047 import org.objectweb.proactive.ProActiveInternalObject;
00048 import org.objectweb.proactive.core.ProActiveException;
00049 import org.objectweb.proactive.core.util.log.Loggers;
00050 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00051 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00052 import org.objectweb.proactive.core.util.wrapper.LongWrapper;
00053 
00054 
00064 public class FileTransferService implements ProActiveInternalObject {
00065     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00066     public final static int DEFAULT_MAX_SIMULTANEOUS_BLOCKS = 8;
00067     public static final int DEFAULT_BUFFER_SIZE = 512 * 1024; //Bytes
00068     protected HashMap<File, BufferedInputStream> readBufferMap; //Map for storing the opened reading sockets
00069     protected HashMap<File, BufferedOutputStream> writeBufferMap; //Map for storing the opened output sockets
00070     protected FileForwarder forwardFile; 
00071     
00072     public FileTransferService() {
00073         readBufferMap = new HashMap<File, BufferedInputStream>();
00074         writeBufferMap = new HashMap<File, BufferedOutputStream>();
00075         forwardFile = new FileForwarder(this);
00076     }
00077 
00078     public LongWrapper getFileLength(File f) {
00079         return new LongWrapper(f.length());
00080     }
00081 
00082     public BooleanWrapper canRead(File f) {
00083         return new BooleanWrapper(f.canRead());
00084     }
00085 
00086     public BooleanWrapper openWrite(File file) {
00087                 return new BooleanWrapper(getWritingBuffer(file, false) != null);
00088         }
00089     
00090         public BooleanWrapper canWrite(File file, boolean append) {
00091                 return new BooleanWrapper(getWritingBuffer(file, append) != null);
00092         }
00093         
00094         private BufferedOutputStream getWritingBuffer(File f) {
00095                 return getWritingBuffer(f, false);
00096         }
00097         
00098     protected synchronized BufferedOutputStream getWritingBuffer(File f, boolean append) {
00099         if (!writeBufferMap.containsKey(f)) {
00100             try {
00101                 BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(
00102                             f.getAbsolutePath(), append), DEFAULT_BUFFER_SIZE);
00103                 writeBufferMap.put(f, bos);
00104             } catch (FileNotFoundException e) {
00105                 logger.error("Unable to open file: " + f.getAbsolutePath());
00106                 return null;
00107             }
00108         }
00109 
00110         return writeBufferMap.get(f);
00111     }
00112 
00113     private synchronized BufferedInputStream getReadingBuffer(File f) {
00114         if (!readBufferMap.containsKey(f)) {
00115             try {
00116                 BufferedInputStream bis = new BufferedInputStream(new FileInputStream(
00117                             f.getAbsolutePath()), DEFAULT_BUFFER_SIZE);
00118                 readBufferMap.put(f, bis);
00119             } catch (FileNotFoundException e) {
00120                 logger.error("Unable to open file: " + f.getAbsolutePath());
00121                 return null;
00122             }
00123         }
00124 
00125         return readBufferMap.get(f);
00126     }
00127 
00128     public synchronized void closeRead(File f) {
00129         BufferedInputStream bis = readBufferMap.remove(f);
00130         try {
00131             if (bis != null) {
00132                 bis.close();
00133             }
00134         } catch (IOException e) {
00135         }
00136     }
00137 
00138     public synchronized BooleanWrapper closeWrite(File f) {
00139         BufferedOutputStream bos = writeBufferMap.remove(f);
00140         try {
00141             if (bos != null) {
00142                 bos.close();
00143             }
00144         } catch (IOException e) {
00145                 return new BooleanWrapper(false);
00146         }
00147         
00148         return new BooleanWrapper(true);
00149     }
00150 
00155     public void saveFileBlock(File dstFile, FileBlock block) throws IOException {
00156         
00157         BufferedOutputStream bos = getWritingBuffer(dstFile);
00158         block.saveCurrentBlock(bos);
00159     }
00160 
00161     public void saveFileBlockWithoutThrowingException(File dstFile, FileBlock block) {
00162         
00163         BufferedOutputStream bos = getWritingBuffer(dstFile);
00164         try {
00165                         block.saveCurrentBlock(bos);
00166                 } catch (IOException e) {
00167                         //ignore exception
00168                 }
00169     }
00170     
00171     public void saveFileBlockAndForward(File dstFile, FileBlock block) throws IOException {
00172         
00173         this.forwardFile.handleNewRequests(dstFile);
00174         this.forwardFile.forward(dstFile,block);
00175         
00176         BufferedOutputStream bos = getWritingBuffer(dstFile);
00177         block.saveCurrentBlock(bos);
00178     }
00179 
00180     public void saveFileBlockAndForwardWithoutThrowingException(File dstFile, FileBlock block) {
00181         
00182         this.forwardFile.handleNewRequests(dstFile);
00183         this.forwardFile.forward(dstFile,block);
00184         
00185         BufferedOutputStream bos = getWritingBuffer(dstFile);
00186         try {
00187                         block.saveCurrentBlock(bos);
00188                 } catch (IOException e) {
00189                         //ignore exception
00190                 }
00191     }
00192 
00200     public FileBlock getFileBlock(File file, long offset, int bsize){
00201 
00202         FileBlock newBlock = new FileBlock(offset, bsize);
00203         BufferedInputStream bis = getReadingBuffer(file);
00204         try {
00205                         newBlock.loadNextBlock(bis);
00206                 } catch (IOException e) {
00207                         newBlock.setException(e);
00208                 }
00209         return newBlock;
00210     }
00211 
00221     public OperationStatus sendFile(FileTransferService ftsRemote, File srcFile,File dstFile, int bsize, int numFlyingBlocks) {
00222         long init = System.currentTimeMillis();
00223         long numBlocks = 0;
00224         
00225         //Open the local reading buffer
00226         BufferedInputStream bis=getReadingBuffer(srcFile);
00227         if(bis==null){
00228             return new OperationStatus(new ProActiveException("Can not open for sending:" + srcFile.getAbsoluteFile()));
00229         }
00230         
00231         long totalNumBlocks = Math.round(Math.ceil((double) srcFile.length() / bsize));
00232         if (totalNumBlocks == 0) {
00233             closeRead(srcFile);
00234             return new OperationStatus(new ProActiveException("Can not send an empty File:" + srcFile.getAbsolutePath()));
00235         }
00236 
00237         BooleanWrapper bw = ftsRemote.openWrite(dstFile);
00238         if (bw.booleanValue() != true) {
00239                 closeRead(srcFile);
00240             return new OperationStatus(new ProActiveException("Unable to open remote file for writting: " +dstFile.getAbsolutePath()));
00241         }
00242 
00243         FileBlock fileBlock = new FileBlock(0, bsize);
00244         while (numBlocks < (totalNumBlocks - 1)) {
00245                 for (int i = 0; (i < numFlyingBlocks) && (numBlocks < (totalNumBlocks - 1)); i++) {
00246                         try{
00247                                 fileBlock.loadNextBlock(bis);
00248 
00249                                 if (i == (numFlyingBlocks - 1)) { //rendevouz the burst, so the remote AO will not be drowned
00250                                         ftsRemote.saveFileBlockAndForward(dstFile,fileBlock);
00251                                 } else { //simply burst
00252                                         ftsRemote.saveFileBlockAndForwardWithoutThrowingException(dstFile,fileBlock); //remote (async) invocation
00253                                 }
00254                                 numBlocks++;
00255                         } catch (IOException e) {
00256                                 ftsRemote.closeForwardingService(dstFile,e); 
00257                                 return new OperationStatus(new ProActiveException("Can not send file block to:" + ProActive.getActiveObjectNodeUrl(ftsRemote),e));
00258                         }
00259                 }
00260         }
00261 
00262         //Handle a rendevouz with last block here!
00263         try {
00264             fileBlock.loadNextBlock(bis);
00265             ftsRemote.saveFileBlock(dstFile,fileBlock);
00266             numBlocks++;
00267         } catch (IOException e) {
00268                 ftsRemote.closeForwardingService(dstFile,e); 
00269             return new OperationStatus(new ProActiveException("Can not send File to:" + ProActive.getActiveObjectNodeUrl(ftsRemote),e));
00270         }
00271 
00272         //Close the remote/local buffers
00273         ProActive.waitFor(ftsRemote.closeWrite(dstFile));
00274         closeRead(srcFile);
00275         ftsRemote.closeForwardingService(dstFile); //sync-call
00276         
00277         if (logger.isDebugEnabled()) {
00278             long fin = System.currentTimeMillis();
00279             long delta = (fin - init);
00280             logger.debug("File "+dstFile.getAbsolutePath()+" sent using " + numBlocks + " blocks,  in: " +
00281                 delta + "[ms]");
00282         }
00283 
00284         return new OperationStatus();
00285     }
00286 
00287         public void closeForwardingService(File dstFile) {
00288                 this.forwardFile.closeForwardingService(dstFile);
00289         }
00290 
00291     public void closeForwardingService(File dstFile, Exception e) {
00292                 this.forwardFile.closeForwardingService(dstFile, e);
00293         }
00294 
00304         public OperationStatus receiveFile(FileTransferService ftsRemote, File srcFile, File dstFile, int bsize, int numFlyingBlocks) {
00305         long numBlocks = 0;
00306                 long init = System.currentTimeMillis();
00307 
00308         LongWrapper length = ftsRemote.getFileLength(srcFile);
00309         if (length.longValue() <= 0) {
00310             closeWrite(dstFile);
00311             OperationStatus fc = new OperationStatus(new Exception("Unable to open remote file for reading:" + srcFile.getAbsolutePath()));
00312             return fc;
00313         }
00314         
00315         long totalNumBlocks = Math.round(Math.ceil((double) length.longValue() / bsize));
00316         FileBlock[] flyingBlocks = new FileBlock[numFlyingBlocks];
00317         while (numBlocks < totalNumBlocks) {
00318             int i;
00319             
00320             for (i = 0; (i < flyingBlocks.length) && (numBlocks < totalNumBlocks); i++) {
00321                 flyingBlocks[i] = ftsRemote.getFileBlock(srcFile, bsize * numBlocks, bsize); //async call
00322                 numBlocks++;
00323             }
00324             
00325             forwardFile.handleNewRequests(dstFile);
00326             forwardFile.forward(dstFile,flyingBlocks);
00327             
00328             //here we sync (wait-by-necessity)                
00329             for (int j = 0; j < i; j++) {
00330                 if(flyingBlocks[j].hasException()){
00331                         Exception e= flyingBlocks[j].getException();
00332                         forwardFile.closeForwardingService(dstFile,e);
00333                         return new OperationStatus(e);
00334                 }
00335                 try {
00336                                         saveFileBlock(dstFile,flyingBlocks[j]);
00337                                 } catch (IOException e) {
00338                                         forwardFile.closeForwardingService(dstFile,e);
00339                                         return new OperationStatus(e);
00340                                 }
00341             }
00342         }
00343 
00344         //close remote and local buffers
00345         ftsRemote.closeRead(srcFile); //async-call
00346         closeWrite(dstFile);
00347         
00348         forwardFile.closeForwardingService(dstFile); //sync-call
00349 
00350         if (logger.isDebugEnabled()) {
00351             long fin = System.currentTimeMillis();
00352             long delta = (fin - init);
00353             logger.debug("File "+dstFile.getAbsolutePath()+" received using " + numBlocks + " blocks,  in: " +
00354                 delta + "[ms]");
00355         }
00356 
00357         return new OperationStatus();
00358     }
00359 
00360         /*
00361     public void putInThePool(FileTransferService me) {
00362         FileTransferEngine.getFileTransferEngine().putFTS(me);
00363     }
00364     */
00365         
00366     public OperationStatus sendFile(FileTransferService remoteFTS, File srcFile,
00367         File dstFile) {
00368         return sendFile(remoteFTS, srcFile, dstFile,
00369             FileBlock.DEFAULT_BLOCK_SIZE, DEFAULT_MAX_SIMULTANEOUS_BLOCKS);
00370     }
00371     
00376     public void requestFileTransfer(FileTransferRequest fti) {
00377         forwardFile.requestFileTransfer(fti);
00378     }
00379 
00380     public OperationStatus getFileTransferRequestStatus(FileTransferRequest fti) {
00381         return forwardFile.getFileTransferRequestStatus(fti);
00382     }
00383 
00397     public OperationStatus submitFileTransferRequest(FileTransferService meFTS,
00398         FileTransferRequest ftr, OperationStatus parentOpStat) {
00399         FileTransferService srcFTS = ftr.getSourceFTS();
00400 
00401         OperationStatus opstat=null;
00402 
00403         //if the future is allready here, don't bother making a request for the file transfer
00404         if (ProActive.isAwaited(parentOpStat)) {
00405                 if(logger.isDebugEnabled()){
00406                         logger.debug("File transfer request will be handled using forwarding");
00407                 }
00408             srcFTS.requestFileTransfer(ftr); //immediate service call
00409             ProActive.waitFor(parentOpStat);//we wait for the original file transfer to finish
00410             if(parentOpStat.hasException()){ //if the original file transfer had an error we propagate it
00411                 srcFTS.getFileTransferRequestStatus(ftr); //we need to remove the desubmit the ftr
00412                 return parentOpStat; 
00413             }
00414             
00415             opstat = srcFTS.getFileTransferRequestStatus(ftr); //immediate service call
00416             if(opstat.hasException()) return opstat; //error was encountered
00417         }
00418 
00419         //If the file was not handled by the forwarding AO, then we take care of it here.
00420         if (opstat ==null || opstat.isPending()) {
00421                 if(logger.isDebugEnabled()){
00422                         logger.debug("File transfer request will be handled directly");
00423                 }
00424         
00425                 String myNodeURL=ProActive.getActiveObjectNodeUrl(meFTS);
00426                 String srcNodeURL=ProActive.getActiveObjectNodeUrl(ftr.getSourceFTS());
00427 
00428                 //If the file is not here, we get a FTS AO in the source node.
00429                 if(!myNodeURL.equals(srcNodeURL)){
00430                         try {
00431                                         srcFTS = FileTransferEngine.getFileTransferEngine().getFTS(srcNodeURL);
00432                                         opstat = srcFTS.sendFile(ftr.getDestinationFTS(), ftr.getSrcFile(),ftr.getDstFile());
00433                                         ftr.setSourceFTS(srcFTS);
00434                                 } catch (Exception e) {
00435                                         return new OperationStatus( new ProActiveException("Unable to create File Transfer Service on node: "+ srcNodeURL, e));
00436                                 }
00437                 }
00438                 else{ //If the file is in this node, we send it from this Active Object
00439                         opstat=sendFile(ftr.getDestinationFTS(), ftr.getSrcFile(),ftr.getDstFile());
00440                         ftr.setSourceFTS(meFTS);
00441                 }
00442         }
00443         return opstat;
00444     }
00445 
00452     public long checkSum(File file) throws IOException {
00453         // Compute Adler-32 checksum
00454         CheckedInputStream cis = new CheckedInputStream(new FileInputStream(
00455                     file.getAbsoluteFile()), new Adler32());
00456         byte[] tempBuf = new byte[1024 * 1024]; //1MB loops
00457         while (cis.read(tempBuf) >= 0)
00458             ;
00459 
00460         return cis.getChecksum().getValue();
00461     }
00462     
00466     public String sayHello() {
00467         return "Hello World from " + getHostName() + " at " +
00468         new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date());
00469     }
00470 
00475     public String getHostName() {
00476         try {
00477             return java.net.InetAddress.getLocalHost().toString();
00478         } catch (Exception e) {
00479             return "unknown";
00480         }
00481     }
00482 }

Generated on Mon Jan 22 15:16:08 2007 for ProActive by  doxygen 1.5.1