00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.branchnbound.core.queue;
00032
00033 import java.io.FileNotFoundException;
00034 import java.io.IOException;
00035 import java.io.InputStream;
00036 import java.io.ObjectInputStream;
00037 import java.io.ObjectOutputStream;
00038 import java.util.Collection;
00039 import java.util.Vector;
00040
00041 import org.objectweb.proactive.ProActive;
00042 import org.objectweb.proactive.branchnbound.core.Result;
00043 import org.objectweb.proactive.branchnbound.core.Task;
00044 import org.objectweb.proactive.core.ProActiveRuntimeException;
00045 import org.objectweb.proactive.core.util.wrapper.BooleanMutableWrapper;
00046 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00047 import org.objectweb.proactive.core.util.wrapper.IntMutableWrapper;
00048
00049
00057 public class LargerQueueImpl extends TaskQueue {
00058 private static final String BCK_SEPARTOR = "End pending tasks backup -- Starting not started tasks backup";
00059 private Vector<Collection> queue = new Vector<Collection>();
00060 private int size = 0;
00061 private int hungryLevel;
00062 private int current = 0;
00063 private Vector<Object> pendingTasksFromBackup = new Vector<Object>();
00064 private Task rootTaskFromBackup = null;
00065 private Vector<Object> allResults = new Vector<Object>();
00066
00070 public LargerQueueImpl() {
00071 }
00072
00076 public void addAll(Collection tasks) {
00077 tasks = (Collection) ProActive.getFutureValue(tasks);
00078 if (tasks.size() > 0) {
00079 this.queue.add(tasks);
00080 this.size += tasks.size();
00081 if (logger.isDebugEnabled()) {
00082 logger.debug("Task provider just received and added " +
00083 tasks.size());
00084 }
00085 }
00086 }
00087
00091 public IntMutableWrapper size() {
00092 return new IntMutableWrapper(this.size);
00093 }
00094
00098 public BooleanMutableWrapper hasNext() {
00099 return new BooleanMutableWrapper(this.size > 0);
00100 }
00101
00105 public Task next() {
00106 if (this.size == 0) {
00107 throw new RuntimeException("No more elements");
00108 }
00109 if (current >= this.queue.size()) {
00110 current = 0;
00111 }
00112 Vector subTasks = (Vector) this.queue.get(current);
00113 if (subTasks.size() == 0) {
00114 this.queue.remove(current);
00115 current++;
00116 return this.next();
00117 }
00118 this.size--;
00119 return (Task) subTasks.remove(0);
00120 }
00121
00125 public void flushAll() {
00126 queue = new Vector<Collection>();
00127 size = 0;
00128 hungryLevel = 0;
00129 current = 0;
00130 pendingTasksFromBackup = new Vector<Object>();
00131 rootTaskFromBackup = null;
00132 allResults = new Vector<Object>();
00133 }
00134
00138 public BooleanWrapper isHungry() {
00139 if (logger.isDebugEnabled()) {
00140 logger.debug("Queue size is " + this.size + " - Hungry level is " +
00141 hungryLevel);
00142 }
00143 return new BooleanWrapper(this.size <= hungryLevel);
00144 }
00145
00149 public void setHungryLevel(int level) {
00150 this.hungryLevel = level;
00151 }
00152
00156 public void backupTasks(Task rootTask, Vector pendingTasks,
00157 java.io.OutputStream backupOutputStream) {
00158 try {
00159 ObjectOutputStream oos = new ObjectOutputStream(backupOutputStream);
00160 oos.writeObject(rootTask);
00161 for (int i = 0; i < pendingTasks.size(); i++) {
00162 oos.writeObject(pendingTasks.get(i));
00163 }
00164 oos.writeObject(BCK_SEPARTOR);
00165 oos.writeObject(this.queue);
00166 oos.close();
00167 backupOutputStream.close();
00168 } catch (FileNotFoundException e) {
00169 logger.warn("Backup tasks failed", e);
00170 } catch (IOException e) {
00171 logger.warn("Backup tasks failed", e);
00172 }
00173 }
00174
00178 public void loadTasks(InputStream taskInputStream) {
00179 try {
00180 ObjectInputStream ois = new ObjectInputStream(taskInputStream);
00181 this.rootTaskFromBackup = (Task) ois.readObject();
00182 boolean separationReached = false;
00183 while (!separationReached) {
00184 Object read = ois.readObject();
00185 if (!separationReached && read instanceof String &&
00186 (((String) read).compareTo(BCK_SEPARTOR) == 0)) {
00187 separationReached = true;
00188 continue;
00189 }
00190 this.pendingTasksFromBackup.add(read);
00191 }
00192 this.queue = (Vector<Collection>) ois.readObject();
00193
00194 ois.close();
00195 taskInputStream.close();
00196 } catch (Exception e) {
00197 logger.fatal("Failed to read tasks", e);
00198 throw new ProActiveRuntimeException(e);
00199 }
00200 }
00201
00205 public Task getRootTaskFromBackup() {
00206 return this.rootTaskFromBackup;
00207 }
00208
00212 public void addResult(Result result) {
00213 this.allResults.add(result);
00214 }
00215
00219 public IntMutableWrapper howManyResults() {
00220 return new IntMutableWrapper(this.allResults.size());
00221 }
00222
00226 public Collection<Object> getAllResults() {
00227 return this.allResults;
00228 }
00229
00233 public void backupResults(java.io.OutputStream backupOutputStream) {
00234 try {
00235 ObjectOutputStream oos = new ObjectOutputStream(backupOutputStream);
00236 for (int i = 0; i < this.allResults.size(); i++) {
00237 oos.writeObject(this.allResults.get(i));
00238 }
00239 oos.close();
00240 backupOutputStream.close();
00241 } catch (FileNotFoundException e) {
00242 logger.fatal("The file is not found", e);
00243 } catch (IOException e) {
00244 logger.warn("Problem I/O with the reulst backup", e);
00245 }
00246 }
00247
00251 public void loadResults(InputStream backupResultInputStream) {
00252 try {
00253 ObjectInputStream ois = new ObjectInputStream(backupResultInputStream);
00254 while (ois.available() > 0) {
00255 this.allResults.add(ois.readObject());
00256 }
00257 ois.close();
00258 backupResultInputStream.close();
00259 } catch (Exception e) {
00260 logger.fatal("Problem to read result file.");
00261 throw new ProActiveRuntimeException(e);
00262 }
00263 }
00264
00268 public void addTask(Task t) {
00269 Vector<Task> v = new Vector<Task>();
00270 v.add(t);
00271 this.queue.add(v);
00272 size++;
00273 }
00274 }