00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.branchnbound.core.queue;
00032
00033 import java.io.FileNotFoundException;
00034 import java.io.IOException;
00035 import java.io.InputStream;
00036 import java.io.ObjectInputStream;
00037 import java.io.ObjectOutputStream;
00038 import java.io.OutputStream;
00039 import java.util.Collection;
00040 import java.util.Vector;
00041
00042 import org.objectweb.proactive.branchnbound.core.Result;
00043 import org.objectweb.proactive.branchnbound.core.Task;
00044 import org.objectweb.proactive.core.ProActiveRuntimeException;
00045 import org.objectweb.proactive.core.util.wrapper.BooleanMutableWrapper;
00046 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00047 import org.objectweb.proactive.core.util.wrapper.IntMutableWrapper;
00048
00049
00058 public class BasicQueueImpl extends TaskQueue {
00059 private static final String BCK_SEPARTOR = "End pending tasks backup -- Starting not started tasks backup";
00060 private Vector queue = new Vector();
00061 private int hungryLevel;
00062 private Task rootTaskFromBackup = null;
00063 private Vector pendingTasksFromBackup = new Vector();
00064 private Vector allResults = new Vector();
00065
00069 public BasicQueueImpl() {
00070 }
00071
00075 public void addAll(Collection tasks) {
00076 if (tasks.size() > 0) {
00077 queue.addAll(tasks);
00078 if (logger.isDebugEnabled()) {
00079 logger.debug("Task provider just received and added " +
00080 tasks.size());
00081 }
00082 }
00083 }
00084
00088 public IntMutableWrapper size() {
00089 return new IntMutableWrapper(this.queue.size());
00090 }
00091
00095 public BooleanMutableWrapper hasNext() {
00096 return new BooleanMutableWrapper(this.queue.size() > 0);
00097 }
00098
00102 public Task next() {
00103 return (Task) this.queue.remove(0);
00104 }
00105
00109 public void flushAll() {
00110 queue = new Vector();
00111 hungryLevel = 0;
00112 rootTaskFromBackup = null;
00113 pendingTasksFromBackup = new Vector();
00114 allResults = new Vector();
00115 }
00116
00120 public BooleanWrapper isHungry() {
00121 if (logger.isDebugEnabled()) {
00122 logger.debug("Queue size is " + this.queue.size() +
00123 " - Hungry level is " + this.hungryLevel);
00124 }
00125 return new BooleanWrapper(this.queue.size() <= this.hungryLevel);
00126 }
00127
00131 public void setHungryLevel(int level) {
00132 this.hungryLevel = level;
00133 }
00134
00138 public void backupTasks(Task rootTask, Vector pendingTasks,
00139 OutputStream backupOutputStream) {
00140 try {
00141 ObjectOutputStream oos = new ObjectOutputStream(backupOutputStream);
00142 oos.writeObject(rootTask);
00143 for (int j = 0; j < pendingTasks.size(); j++) {
00144 oos.writeObject(pendingTasks.get(j));
00145 }
00146 oos.writeObject(BCK_SEPARTOR);
00147 for (int i = 0; i < this.queue.size(); i++) {
00148 oos.writeObject(this.queue.get(i));
00149 }
00150 oos.close();
00151 backupOutputStream.close();
00152 } catch (FileNotFoundException e) {
00153 logger.warn("Backup tasks failed", e);
00154 } catch (IOException e) {
00155 logger.warn("Backup tasks failed", e);
00156 }
00157 }
00158
00162 public void loadTasks(InputStream taskInputStream) {
00163 try {
00164 ObjectInputStream ois = new ObjectInputStream(taskInputStream);
00165 this.rootTaskFromBackup = (Task) ois.readObject();
00166 boolean separationReached = false;
00167 while (ois.available() > 0) {
00168 Object read = ois.readObject();
00169 if (!separationReached && read instanceof String &&
00170 (((String) read).compareTo(BCK_SEPARTOR) == 0)) {
00171 separationReached = true;
00172 }
00173 if (!separationReached) {
00174 this.pendingTasksFromBackup.add(read);
00175 } else {
00176 this.queue.add(read);
00177 }
00178 }
00179 ois.close();
00180 taskInputStream.close();
00181 } catch (Exception e) {
00182 logger.fatal("Failed to read tasks", e);
00183 throw new ProActiveRuntimeException(e);
00184 }
00185 }
00186
00190 public Task getRootTaskFromBackup() {
00191 return this.rootTaskFromBackup;
00192 }
00193
00197 public void addResult(Result result) {
00198 this.allResults.add(result);
00199 }
00200
00204 public IntMutableWrapper howManyResults() {
00205 return new IntMutableWrapper(this.allResults.size());
00206 }
00207
00211 public Collection getAllResults() {
00212 return this.allResults;
00213 }
00214
00218 public void backupResults(OutputStream backupResultOutputStream) {
00219 try {
00220 ObjectOutputStream oos = new ObjectOutputStream(backupResultOutputStream);
00221 for (int i = 0; i < this.allResults.size(); i++) {
00222 oos.writeObject(this.allResults.get(i));
00223 }
00224 oos.close();
00225 backupResultOutputStream.close();
00226 } catch (FileNotFoundException e) {
00227 logger.fatal("The file is not found", e);
00228 } catch (IOException e) {
00229 logger.warn("Problem I/O with the reulst backup", e);
00230 }
00231 }
00232
00236 public void loadResults(InputStream backupResultInputStream) {
00237 try {
00238 ObjectInputStream ois = new ObjectInputStream(backupResultInputStream);
00239 while (ois.available() > 0) {
00240 this.allResults.add(ois.readObject());
00241 }
00242 ois.close();
00243 backupResultInputStream.close();
00244 } catch (Exception e) {
00245 logger.fatal("Problem to read result file.");
00246 throw new ProActiveRuntimeException(e);
00247 }
00248 }
00249
00253 public void addTask(Task t) {
00254 queue.add(t);
00255 }
00256 }