org/objectweb/proactive/branchnbound/core/queue/LargerQueueImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.branchnbound.core.queue;
00032 
00033 import java.io.FileNotFoundException;
00034 import java.io.IOException;
00035 import java.io.InputStream;
00036 import java.io.ObjectInputStream;
00037 import java.io.ObjectOutputStream;
00038 import java.util.Collection;
00039 import java.util.Vector;
00040 
00041 import org.objectweb.proactive.ProActive;
00042 import org.objectweb.proactive.branchnbound.core.Result;
00043 import org.objectweb.proactive.branchnbound.core.Task;
00044 import org.objectweb.proactive.core.ProActiveRuntimeException;
00045 import org.objectweb.proactive.core.util.wrapper.BooleanMutableWrapper;
00046 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00047 import org.objectweb.proactive.core.util.wrapper.IntMutableWrapper;
00048 
00049 
00057 public class LargerQueueImpl extends TaskQueue {
00058     private static final String BCK_SEPARTOR = "End pending tasks backup -- Starting not started tasks backup";
00059     private Vector<Collection> queue = new Vector<Collection>();
00060     private int size = 0;
00061     private int hungryLevel;
00062     private int current = 0;
00063     private Vector<Object> pendingTasksFromBackup = new Vector<Object>();
00064     private Task rootTaskFromBackup = null;
00065     private Vector<Object> allResults = new Vector<Object>();
00066 
00070     public LargerQueueImpl() {
00071     }
00072 
00076     public void addAll(Collection tasks) {
00077         tasks = (Collection) ProActive.getFutureValue(tasks);
00078         if (tasks.size() > 0) {
00079             this.queue.add(tasks);
00080             this.size += tasks.size();
00081             if (logger.isDebugEnabled()) {
00082                 logger.debug("Task provider just received and added " +
00083                     tasks.size());
00084             }
00085         }
00086     }
00087 
00091     public IntMutableWrapper size() {
00092         return new IntMutableWrapper(this.size);
00093     }
00094 
00098     public BooleanMutableWrapper hasNext() {
00099         return new BooleanMutableWrapper(this.size > 0);
00100     }
00101 
00105     public Task next() {
00106         if (this.size == 0) {
00107             throw new RuntimeException("No more elements");
00108         }
00109         if (current >= this.queue.size()) {
00110             current = 0;
00111         }
00112         Vector subTasks = (Vector) this.queue.get(current);
00113         if (subTasks.size() == 0) {
00114             this.queue.remove(current);
00115             current++;
00116             return this.next();
00117         }
00118         this.size--;
00119         return (Task) subTasks.remove(0);
00120     }
00121 
00125     public void flushAll() {
00126         queue = new Vector<Collection>();
00127         size = 0;
00128         hungryLevel = 0;
00129         current = 0;
00130         pendingTasksFromBackup = new Vector<Object>();
00131         rootTaskFromBackup = null;
00132         allResults = new Vector<Object>();
00133     }
00134 
00138     public BooleanWrapper isHungry() {
00139         if (logger.isDebugEnabled()) {
00140             logger.debug("Queue size is " + this.size + " - Hungry level is " +
00141                 hungryLevel);
00142         }
00143         return new BooleanWrapper(this.size <= hungryLevel);
00144     }
00145 
00149     public void setHungryLevel(int level) {
00150         this.hungryLevel = level;
00151     }
00152 
00156     public void backupTasks(Task rootTask, Vector pendingTasks,
00157         java.io.OutputStream backupOutputStream) {
00158         try {
00159             ObjectOutputStream oos = new ObjectOutputStream(backupOutputStream);
00160             oos.writeObject(rootTask);
00161             for (int i = 0; i < pendingTasks.size(); i++) {
00162                 oos.writeObject(pendingTasks.get(i));
00163             }
00164             oos.writeObject(BCK_SEPARTOR);
00165             oos.writeObject(this.queue);
00166             oos.close();
00167             backupOutputStream.close();
00168         } catch (FileNotFoundException e) {
00169             logger.warn("Backup tasks failed", e);
00170         } catch (IOException e) {
00171             logger.warn("Backup tasks failed", e);
00172         }
00173     }
00174 
00178     public void loadTasks(InputStream taskInputStream) {
00179         try {
00180             ObjectInputStream ois = new ObjectInputStream(taskInputStream);
00181             this.rootTaskFromBackup = (Task) ois.readObject();
00182             boolean separationReached = false;
00183             while (!separationReached) {
00184                 Object read = ois.readObject();
00185                 if (!separationReached && read instanceof String &&
00186                         (((String) read).compareTo(BCK_SEPARTOR) == 0)) {
00187                     separationReached = true;
00188                     continue;
00189                 }
00190                 this.pendingTasksFromBackup.add(read);
00191             }
00192             this.queue = (Vector<Collection>) ois.readObject();
00193 
00194             ois.close();
00195             taskInputStream.close();
00196         } catch (Exception e) {
00197             logger.fatal("Failed to read tasks", e);
00198             throw new ProActiveRuntimeException(e);
00199         }
00200     }
00201 
00205     public Task getRootTaskFromBackup() {
00206         return this.rootTaskFromBackup;
00207     }
00208 
00212     public void addResult(Result result) {
00213         this.allResults.add(result);
00214     }
00215 
00219     public IntMutableWrapper howManyResults() {
00220         return new IntMutableWrapper(this.allResults.size());
00221     }
00222 
00226     public Collection<Object> getAllResults() {
00227         return this.allResults;
00228     }
00229 
00233     public void backupResults(java.io.OutputStream backupOutputStream) {
00234         try {
00235             ObjectOutputStream oos = new ObjectOutputStream(backupOutputStream);
00236             for (int i = 0; i < this.allResults.size(); i++) {
00237                 oos.writeObject(this.allResults.get(i));
00238             }
00239             oos.close();
00240             backupOutputStream.close();
00241         } catch (FileNotFoundException e) {
00242             logger.fatal("The file is not found", e);
00243         } catch (IOException e) {
00244             logger.warn("Problem I/O with the reulst backup", e);
00245         }
00246     }
00247 
00251     public void loadResults(InputStream backupResultInputStream) {
00252         try {
00253             ObjectInputStream ois = new ObjectInputStream(backupResultInputStream);
00254             while (ois.available() > 0) {
00255                 this.allResults.add(ois.readObject());
00256             }
00257             ois.close();
00258             backupResultInputStream.close();
00259         } catch (Exception e) {
00260             logger.fatal("Problem to read result file.");
00261             throw new ProActiveRuntimeException(e);
00262         }
00263     }
00264 
00268     public void addTask(Task t) {
00269         Vector<Task> v = new Vector<Task>();
00270         v.add(t);
00271         this.queue.add(v);
00272         size++;
00273     }
00274 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1