00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.ft.servers.util;
00032
00033 import java.util.Hashtable;
00034
00035
00041 public class ActiveQueue extends Thread {
00042 private java.util.ArrayList<ActiveQueueJob> queue;
00043 private int counter;
00044 private boolean kill;
00045 private Hashtable<ActiveQueueJob, JobBarrier> barriers;
00046
00047 public ActiveQueue(String name) {
00048 queue = new java.util.ArrayList<ActiveQueueJob>();
00049 counter = 0;
00050 kill = false;
00051 barriers = new Hashtable<ActiveQueueJob, JobBarrier>();
00052 this.setName(name);
00053 }
00054
00055
00056
00057
00058
00063 public java.util.ArrayList<ActiveQueueJob> getQueue() {
00064 return queue;
00065 }
00066
00071 public synchronized void addJob(ActiveQueueJob j) {
00072 queue.add(j);
00073 counter++;
00074 notifyAll();
00075 }
00076
00083 public synchronized JobBarrier addJobWithBarrier(ActiveQueueJob j) {
00084 JobBarrier b = new JobBarrier();
00085 this.barriers.put(j, b);
00086 queue.add(j);
00087 counter++;
00088 notifyAll();
00089 return b;
00090 }
00091
00096 public synchronized ActiveQueueJob removeJob() {
00097 counter--;
00098 return (queue.remove(0));
00099 }
00100
00104 public synchronized void killMe() {
00105 kill = true;
00106 notifyAll();
00107 }
00108
00113 public void run() {
00114 while (true) {
00115
00116 waitForJob();
00117
00118 if (kill) {
00119 break;
00120 }
00121
00122
00123 ActiveQueueJob toDo = this.removeJob();
00124 if (toDo != null) {
00125 toDo.doTheJob();
00126
00127 JobBarrier b = (this.barriers.get(toDo));
00128 if (b != null) {
00129
00130 b.signalJobCompletion();
00131 this.barriers.remove(toDo);
00132 }
00133 }
00134 }
00135 }
00136
00137
00138 private synchronized void waitForJob() {
00139 try {
00140 while ((counter == 0) && !kill) {
00141 wait();
00142 }
00143 } catch (InterruptedException e) {
00144 e.printStackTrace();
00145 }
00146 }
00147 }