org/objectweb/proactive/core/filetransfer/FileForwarder.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.filetransfer;
00032 
00033 import java.io.File;
00034 import java.util.HashMap;
00035 import java.util.Vector;
00036 
00037 import org.apache.log4j.Logger;
00038 import org.objectweb.proactive.ProActive;
00039 import org.objectweb.proactive.core.ProActiveException;
00040 import org.objectweb.proactive.core.util.log.Loggers;
00041 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00042 
00043 
00051 class FileForwarder {
00052     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FILETRANSFER);
00053 
00054     //  Map of Vectors of FileTransferService holding remote FTS to send the file (key of the hash)
00055     protected HashMap<File, Vector<FileTransferRequest>> servingRequests;
00056 
00057     //  Map of Vectors of FileTransferService holding new remote FTS to send the file (key of the hash)
00058     protected HashMap<File, Vector<FileTransferRequest>> newRequests;
00059 
00060     //  Map of Vectors of failed FileTransferRequests
00061     protected HashMap<File, Vector<FileTransferRequest>> failedRequests;
00062     protected FileTransferService servingFTS;
00063     protected FileDispatcher dispatcher;
00064 
00065     public FileForwarder(FileTransferService fts) {
00066         servingRequests = new HashMap<File, Vector<FileTransferRequest>>();
00067         newRequests = new HashMap<File, Vector<FileTransferRequest>>();
00068         failedRequests = new HashMap<File, Vector<FileTransferRequest>>();
00069         servingFTS = fts; //direct reference to the object (not a stub)
00070 
00071         try { //TODO handle the exception
00072             dispatcher = (FileDispatcher) ProActive.newActive(FileDispatcher.class.getName(),
00073                     null);
00074         } catch (Exception e) {
00075         }
00076     }
00077 
00084     public synchronized void requestFileTransfer(FileTransferRequest fti) {
00085         if (logger.isDebugEnabled()) {
00086             logger.debug("Setting file transfer request for:" + fti);
00087         }
00088         if (!newRequests.containsKey(fti.getSrcFile())) {
00089             newRequests.put(fti.getSrcFile(), new Vector<FileTransferRequest>());
00090         }
00091 
00092         Vector<FileTransferRequest> requests = newRequests.get(fti.getSrcFile());
00093         requests.add(fti);
00094     }
00095 
00102     /*
00103     private synchronized void forward(FileTransferService dispatcherFile, File srcFile, FileBlock[] blocks) {
00104         if (!servingRequests.containsKey(srcFile) || blocks.length<=0) {
00105             return; //nothing to do
00106         }
00107 
00108         Vector forward = (Vector) servingRequests.get(srcFile);
00109 
00110         for (int i = 0; i < forward.size(); i++) {
00111             FileTransferRequest fti = (FileTransferRequest) forward.get(i);
00112             for (int j = 0; j < blocks.length; j++) {
00113                     dispatcherFile.sendBlock(fti.getDestinationFTS(), blocks[j],fti.getDstFile().getAbsolutePath()); //async call
00114             }
00115         }
00116     }
00117     */
00118     protected synchronized void forward(File srcFile, FileBlock block) {
00119         if (!servingRequests.containsKey(srcFile)) {
00120             return; //nothing to do
00121         }
00122 
00123         Vector<FileTransferRequest> forward = servingRequests.get(srcFile);
00124         for (int i = 0; i < forward.size(); i++) {
00125             FileTransferRequest ftr = forward.get(i);
00126             OperationStatus opStat = dispatcher.sendBlock(ftr.getDestinationFTS(),
00127                     block, ftr.getDstFile()); //async call
00128 
00129             if (opStat.hasException()) { //call failed
00130                 forward.remove(i);
00131                 ftr.setDstFuture(opStat);
00132                 addToHash(failedRequests, srcFile, ftr);
00133             }
00134         }
00135     }
00136 
00137     protected synchronized void forwardWithoutThrowingException(File srcFile,
00138         FileBlock block) {
00139         if (!servingRequests.containsKey(srcFile)) {
00140             return; //nothing to do
00141         }
00142 
00143         Vector<FileTransferRequest> forward = servingRequests.get(srcFile);
00144         for (int i = 0; i < forward.size(); i++) {
00145             FileTransferRequest ftr = forward.get(i);
00146             dispatcher.sendBlockFileBlockWithoutThrowingException(ftr.getDestinationFTS(),
00147                 block, ftr.getDstFile()); //async call
00148                                           /*
00149             if(opStat.hasException()){ //call failed
00150             forward.remove(i);
00151             ftr.setDstFuture(opStat);
00152             addToHash(failedRequests,srcFile, ftr);
00153             }
00154             */
00155         }
00156     }
00157 
00158     protected synchronized void forward(File srcFile, FileBlock[] block) {
00159         if (!servingRequests.containsKey(srcFile)) {
00160             return; //nothing to do
00161         }
00162 
00163         Vector<FileTransferRequest> forward = servingRequests.get(srcFile);
00164         for (int i = 0; i < forward.size(); i++) {
00165             FileTransferRequest ftr = forward.get(i);
00166             for (int j = 0; j < block.length; j++) {
00167                 dispatcher.sendBlockFileBlockWithoutThrowingException(ftr.getDestinationFTS(),
00168                     block[j], ftr.getDstFile()); //async call
00169             }
00170 
00171             /*
00172             if(opStat.hasException()){ //call failed
00173                     forward.remove(i);
00174                     ftr.setDstFuture(opStat);
00175                     addToHash(failedRequests,srcFile, ftr);
00176             }
00177             */
00178         }
00179     }
00180 
00186     protected synchronized void handleNewRequests(File file) {
00187         if (!newRequests.containsKey(file)) {
00188             return; //nothing to do
00189         }
00190 
00191         if (logger.isDebugEnabled()) {
00192             logger.debug("Sending already saved data for file: " +
00193                 file.getAbsolutePath() + " " + file.length() + "[bytes]");
00194         }
00195 
00196         servingFTS.closeWrite(file); //close the buffer to generate a flush
00197 
00198         Vector<FileTransferRequest> requests = newRequests.remove(file);
00199         for (int i = 0; i < requests.size(); i++) {
00200             FileTransferRequest ftr = requests.get(i);
00201 
00202             if (!ftr.getSrcFile().equals(file)) {
00203                 ftr.setDstFuture(new OperationStatus(
00204                         new ProActiveException(
00205                             "Error when sending saved data. Source files do not match:" +
00206                             ftr.getSrcFile().getAbsolutePath() + " != " +
00207                             file.getAbsolutePath())));
00208                 addToHash(failedRequests, file, ftr);
00209                 continue;
00210             }
00211 
00212             //Send the current file
00213             if (file.length() > 0) {
00214                 OperationStatus opRes = servingFTS.sendFile(ftr.getDestinationFTS(),
00215                         ftr.getSrcFile(), ftr.getDstFile());
00216                 ProActive.waitFor(opRes); //wait for the send to finish (possibly with errors)
00217                 if (opRes.hasException()) {
00218                     ftr.setDstFuture(opRes); //Update the future with the error.
00219                     addToHash(failedRequests, file, ftr);
00220                     continue;
00221                 } else {
00222                     //Open the remote buffer for further appending
00223                     ftr.getDestinationFTS().canWrite(ftr.getDstFile(), true);
00224                 }
00225             }
00226 
00227             addToHash(servingRequests, file, ftr);
00228         }
00229 
00230         servingFTS.getWritingBuffer(file, true); //open the writting buffer for appending
00231     }
00232 
00233     private void addToHash(HashMap<File, Vector<FileTransferRequest>> hash,
00234         File file, FileTransferRequest ftr) {
00235         if (!hash.containsKey(file)) {
00236             hash.put(file, new Vector<FileTransferRequest>());
00237         }
00238         Vector<FileTransferRequest> v = hash.get(file);
00239 
00240         v.add(ftr);
00241     }
00242 
00243     public synchronized void closeForwardingService(File srcFile) {
00244         closeForwardingService(srcFile, null);
00245     }
00246 
00247     public synchronized void closeForwardingService(File srcFile, Exception e) {
00248         servingFTS.closeWrite(srcFile);
00249 
00250         Vector requests = (Vector) this.servingRequests.remove(srcFile);
00251         if (requests == null) {
00252             return;
00253         }
00254 
00255         Vector<OperationStatus> opStat = new Vector<OperationStatus>();
00256         for (int i = 0; i < requests.size(); i++) {
00257             FileTransferRequest ftr = (FileTransferRequest) requests.get(i);
00258             FileTransferService remoteFTS = ftr.getDestinationFTS();
00259 
00260             if (e != null) {
00261                 opStat.add(dispatcher.closeForwardingService(remoteFTS,
00262                         ftr.getDstFile(), e));
00263                 ftr.setDstFuture(new OperationStatus(e));
00264                 addToHash(failedRequests, srcFile, ftr);
00265             } else {
00266                 opStat.add(dispatcher.closeForwardingService(remoteFTS,
00267                         ftr.getDstFile()));
00268             }
00269         }
00270         ProActive.waitForAll(opStat);
00271     }
00272 
00273     public synchronized void clearNewRequests(File srcFile) {
00274         this.newRequests.remove(srcFile);
00275     }
00276 
00283     public synchronized OperationStatus getFileTransferRequestStatus(
00284         FileTransferRequest ftr) {
00285         //check if the operation was never performed
00286         Vector<FileTransferRequest> v = newRequests.get(ftr.getSrcFile());
00287         for (int i = 0; (v != null) && (i < v.size()); i++) {
00288             if (ftr.equals(v.get(i))) {
00289                 v.remove(i);
00290                 return new OperationStatus(true);
00291             }
00292         }
00293         if ((v != null) && v.isEmpty()) {
00294             newRequests.remove(ftr.getSrcFile());
00295         }
00296 
00297         //check if the operation encountered errors
00298         v = failedRequests.get(ftr.getSrcFile());
00299         for (int i = 0; (v != null) && (i < v.size()); i++) {
00300             if (ftr.equals(v.get(i))) {
00301                 v.remove(i);
00302                 return ftr.getOperationFuture();
00303             }
00304         }
00305 
00306         if ((v != null) && v.isEmpty()) {
00307             failedRequests.remove(ftr.getSrcFile());
00308         }
00309 
00310         return new OperationStatus();
00311     }
00312 }

Generated on Mon Jan 22 15:16:08 2007 for ProActive by  doxygen 1.5.1