org/objectweb/proactive/branchnbound/core/queue/BasicQueueImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.branchnbound.core.queue;
00032 
00033 import java.io.FileNotFoundException;
00034 import java.io.IOException;
00035 import java.io.InputStream;
00036 import java.io.ObjectInputStream;
00037 import java.io.ObjectOutputStream;
00038 import java.io.OutputStream;
00039 import java.util.Collection;
00040 import java.util.Vector;
00041 
00042 import org.objectweb.proactive.branchnbound.core.Result;
00043 import org.objectweb.proactive.branchnbound.core.Task;
00044 import org.objectweb.proactive.core.ProActiveRuntimeException;
00045 import org.objectweb.proactive.core.util.wrapper.BooleanMutableWrapper;
00046 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00047 import org.objectweb.proactive.core.util.wrapper.IntMutableWrapper;
00048 
00049 
00058 public class BasicQueueImpl extends TaskQueue {
00059     private static final String BCK_SEPARTOR = "End pending tasks backup -- Starting not started tasks backup";
00060     private Vector queue = new Vector();
00061     private int hungryLevel;
00062     private Task rootTaskFromBackup = null;
00063     private Vector pendingTasksFromBackup = new Vector();
00064     private Vector allResults = new Vector();
00065 
00069     public BasicQueueImpl() {
00070     }
00071 
00075     public void addAll(Collection tasks) {
00076         if (tasks.size() > 0) {
00077             queue.addAll(tasks);
00078             if (logger.isDebugEnabled()) {
00079                 logger.debug("Task provider just received and added " +
00080                     tasks.size());
00081             }
00082         }
00083     }
00084 
00088     public IntMutableWrapper size() {
00089         return new IntMutableWrapper(this.queue.size());
00090     }
00091 
00095     public BooleanMutableWrapper hasNext() {
00096         return new BooleanMutableWrapper(this.queue.size() > 0);
00097     }
00098 
00102     public Task next() {
00103         return (Task) this.queue.remove(0);
00104     }
00105 
00109     public void flushAll() {
00110         queue = new Vector();
00111         hungryLevel = 0;
00112         rootTaskFromBackup = null;
00113         pendingTasksFromBackup = new Vector();
00114         allResults = new Vector();
00115     }
00116 
00120     public BooleanWrapper isHungry() {
00121         if (logger.isDebugEnabled()) {
00122             logger.debug("Queue size is " + this.queue.size() +
00123                 " - Hungry level is " + this.hungryLevel);
00124         }
00125         return new BooleanWrapper(this.queue.size() <= this.hungryLevel);
00126     }
00127 
00131     public void setHungryLevel(int level) {
00132         this.hungryLevel = level;
00133     }
00134 
00138     public void backupTasks(Task rootTask, Vector pendingTasks,
00139         OutputStream backupOutputStream) {
00140         try {
00141             ObjectOutputStream oos = new ObjectOutputStream(backupOutputStream);
00142             oos.writeObject(rootTask);
00143             for (int j = 0; j < pendingTasks.size(); j++) {
00144                 oos.writeObject(pendingTasks.get(j));
00145             }
00146             oos.writeObject(BCK_SEPARTOR);
00147             for (int i = 0; i < this.queue.size(); i++) {
00148                 oos.writeObject(this.queue.get(i));
00149             }
00150             oos.close();
00151             backupOutputStream.close();
00152         } catch (FileNotFoundException e) {
00153             logger.warn("Backup tasks failed", e);
00154         } catch (IOException e) {
00155             logger.warn("Backup tasks failed", e);
00156         }
00157     }
00158 
00162     public void loadTasks(InputStream taskInputStream) {
00163         try {
00164             ObjectInputStream ois = new ObjectInputStream(taskInputStream);
00165             this.rootTaskFromBackup = (Task) ois.readObject();
00166             boolean separationReached = false;
00167             while (ois.available() > 0) {
00168                 Object read = ois.readObject();
00169                 if (!separationReached && read instanceof String &&
00170                         (((String) read).compareTo(BCK_SEPARTOR) == 0)) {
00171                     separationReached = true;
00172                 }
00173                 if (!separationReached) {
00174                     this.pendingTasksFromBackup.add(read);
00175                 } else {
00176                     this.queue.add(read);
00177                 }
00178             }
00179             ois.close();
00180             taskInputStream.close();
00181         } catch (Exception e) {
00182             logger.fatal("Failed to read tasks", e);
00183             throw new ProActiveRuntimeException(e);
00184         }
00185     }
00186 
00190     public Task getRootTaskFromBackup() {
00191         return this.rootTaskFromBackup;
00192     }
00193 
00197     public void addResult(Result result) {
00198         this.allResults.add(result);
00199     }
00200 
00204     public IntMutableWrapper howManyResults() {
00205         return new IntMutableWrapper(this.allResults.size());
00206     }
00207 
00211     public Collection getAllResults() {
00212         return this.allResults;
00213     }
00214 
00218     public void backupResults(OutputStream backupResultOutputStream) {
00219         try {
00220             ObjectOutputStream oos = new ObjectOutputStream(backupResultOutputStream);
00221             for (int i = 0; i < this.allResults.size(); i++) {
00222                 oos.writeObject(this.allResults.get(i));
00223             }
00224             oos.close();
00225             backupResultOutputStream.close();
00226         } catch (FileNotFoundException e) {
00227             logger.fatal("The file is not found", e);
00228         } catch (IOException e) {
00229             logger.warn("Problem I/O with the reulst backup", e);
00230         }
00231     }
00232 
00236     public void loadResults(InputStream backupResultInputStream) {
00237         try {
00238             ObjectInputStream ois = new ObjectInputStream(backupResultInputStream);
00239             while (ois.available() > 0) {
00240                 this.allResults.add(ois.readObject());
00241             }
00242             ois.close();
00243             backupResultInputStream.close();
00244         } catch (Exception e) {
00245             logger.fatal("Problem to read result file.");
00246             throw new ProActiveRuntimeException(e);
00247         }
00248     }
00249 
00253     public void addTask(Task t) {
00254         queue.add(t);
00255     }
00256 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1