00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.branchnbound.core.queue;
00032 
00033 import java.io.FileNotFoundException;
00034 import java.io.IOException;
00035 import java.io.InputStream;
00036 import java.io.ObjectInputStream;
00037 import java.io.ObjectOutputStream;
00038 import java.io.OutputStream;
00039 import java.util.Collection;
00040 import java.util.Vector;
00041 
00042 import org.objectweb.proactive.branchnbound.core.Result;
00043 import org.objectweb.proactive.branchnbound.core.Task;
00044 import org.objectweb.proactive.core.ProActiveRuntimeException;
00045 import org.objectweb.proactive.core.util.wrapper.BooleanMutableWrapper;
00046 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00047 import org.objectweb.proactive.core.util.wrapper.IntMutableWrapper;
00048 
00049 
00058 public class BasicQueueImpl extends TaskQueue {
00059     private static final String BCK_SEPARTOR = "End pending tasks backup -- Starting not started tasks backup";
00060     private Vector queue = new Vector();
00061     private int hungryLevel;
00062     private Task rootTaskFromBackup = null;
00063     private Vector pendingTasksFromBackup = new Vector();
00064     private Vector allResults = new Vector();
00065 
00069     public BasicQueueImpl() {
00070     }
00071 
00075     public void addAll(Collection tasks) {
00076         if (tasks.size() > 0) {
00077             queue.addAll(tasks);
00078             if (logger.isDebugEnabled()) {
00079                 logger.debug("Task provider just received and added " +
00080                     tasks.size());
00081             }
00082         }
00083     }
00084 
00088     public IntMutableWrapper size() {
00089         return new IntMutableWrapper(this.queue.size());
00090     }
00091 
00095     public BooleanMutableWrapper hasNext() {
00096         return new BooleanMutableWrapper(this.queue.size() > 0);
00097     }
00098 
00102     public Task next() {
00103         return (Task) this.queue.remove(0);
00104     }
00105 
00109     public void flushAll() {
00110         queue = new Vector();
00111         hungryLevel = 0;
00112         rootTaskFromBackup = null;
00113         pendingTasksFromBackup = new Vector();
00114         allResults = new Vector();
00115     }
00116 
00120     public BooleanWrapper isHungry() {
00121         if (logger.isDebugEnabled()) {
00122             logger.debug("Queue size is " + this.queue.size() +
00123                 " - Hungry level is " + this.hungryLevel);
00124         }
00125         return new BooleanWrapper(this.queue.size() <= this.hungryLevel);
00126     }
00127 
00131     public void setHungryLevel(int level) {
00132         this.hungryLevel = level;
00133     }
00134 
00138     public void backupTasks(Task rootTask, Vector pendingTasks,
00139         OutputStream backupOutputStream) {
00140         try {
00141             ObjectOutputStream oos = new ObjectOutputStream(backupOutputStream);
00142             oos.writeObject(rootTask);
00143             for (int j = 0; j < pendingTasks.size(); j++) {
00144                 oos.writeObject(pendingTasks.get(j));
00145             }
00146             oos.writeObject(BCK_SEPARTOR);
00147             for (int i = 0; i < this.queue.size(); i++) {
00148                 oos.writeObject(this.queue.get(i));
00149             }
00150             oos.close();
00151             backupOutputStream.close();
00152         } catch (FileNotFoundException e) {
00153             logger.warn("Backup tasks failed", e);
00154         } catch (IOException e) {
00155             logger.warn("Backup tasks failed", e);
00156         }
00157     }
00158 
00162     public void loadTasks(InputStream taskInputStream) {
00163         try {
00164             ObjectInputStream ois = new ObjectInputStream(taskInputStream);
00165             this.rootTaskFromBackup = (Task) ois.readObject();
00166             boolean separationReached = false;
00167             while (ois.available() > 0) {
00168                 Object read = ois.readObject();
00169                 if (!separationReached && read instanceof String &&
00170                         (((String) read).compareTo(BCK_SEPARTOR) == 0)) {
00171                     separationReached = true;
00172                 }
00173                 if (!separationReached) {
00174                     this.pendingTasksFromBackup.add(read);
00175                 } else {
00176                     this.queue.add(read);
00177                 }
00178             }
00179             ois.close();
00180             taskInputStream.close();
00181         } catch (Exception e) {
00182             logger.fatal("Failed to read tasks", e);
00183             throw new ProActiveRuntimeException(e);
00184         }
00185     }
00186 
00190     public Task getRootTaskFromBackup() {
00191         return this.rootTaskFromBackup;
00192     }
00193 
00197     public void addResult(Result result) {
00198         this.allResults.add(result);
00199     }
00200 
00204     public IntMutableWrapper howManyResults() {
00205         return new IntMutableWrapper(this.allResults.size());
00206     }
00207 
00211     public Collection getAllResults() {
00212         return this.allResults;
00213     }
00214 
00218     public void backupResults(OutputStream backupResultOutputStream) {
00219         try {
00220             ObjectOutputStream oos = new ObjectOutputStream(backupResultOutputStream);
00221             for (int i = 0; i < this.allResults.size(); i++) {
00222                 oos.writeObject(this.allResults.get(i));
00223             }
00224             oos.close();
00225             backupResultOutputStream.close();
00226         } catch (FileNotFoundException e) {
00227             logger.fatal("The file is not found", e);
00228         } catch (IOException e) {
00229             logger.warn("Problem I/O with the reulst backup", e);
00230         }
00231     }
00232 
00236     public void loadResults(InputStream backupResultInputStream) {
00237         try {
00238             ObjectInputStream ois = new ObjectInputStream(backupResultInputStream);
00239             while (ois.available() > 0) {
00240                 this.allResults.add(ois.readObject());
00241             }
00242             ois.close();
00243             backupResultInputStream.close();
00244         } catch (Exception e) {
00245             logger.fatal("Problem to read result file.");
00246             throw new ProActiveRuntimeException(e);
00247         }
00248     }
00249 
00253     public void addTask(Task t) {
00254         queue.add(t);
00255     }
00256 }