00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.calcium;
00032
00033 import java.io.Serializable;
00034 import java.util.Hashtable;
00035 import java.util.Vector;
00036
00037 import org.objectweb.proactive.ProActive;
00038 import org.objectweb.proactive.calcium.Skernel;
00039 import org.objectweb.proactive.calcium.exceptions.PanicException;
00040
00050 public class Facade {
00051
00052 private Skernel skernel;
00053 private TaskStreamQueue results;
00054
00055 public Facade(Skernel skernel){
00056 this.skernel=skernel;
00057 this.results = new TaskStreamQueue();
00058 }
00059
00060 public synchronized void putTask(Task<?> task){
00061 skernel.putTask(task);
00062 }
00063
00064 public void boot(Skernel skernel){
00065 this.skernel=skernel;
00066 }
00067
00068 @SuppressWarnings("unchecked")
00069 public synchronized <T> Task<T> getResult(int streamId) throws PanicException{
00070
00071
00072 try {
00073 while(results.isEmpty(streamId)){
00074 wait(1000);
00075 loadResultsFromSkernel();
00076 }
00077 } catch (InterruptedException e) {
00078 e.printStackTrace();
00079 return null;
00080 }
00081
00082 return (Task<T>)results.get(streamId);
00083 }
00084
00085 private synchronized void loadResultsFromSkernel() throws PanicException{
00086 while(skernel.hasResults()){
00087 Task<?> taskResult = (Task<?>) skernel.getResult();
00088
00089
00090
00091 taskResult=(Task<?>)ProActive.getFutureValue(taskResult);
00092 results.put(taskResult);
00093 }
00094 }
00095
00103 class TaskStreamQueue implements Serializable{
00104
00105 Hashtable<Integer, Vector<Task<?>>> results;
00106 int size;
00107
00108 public TaskStreamQueue(){
00109 results = new Hashtable<Integer, Vector<Task<?>>>();
00110 size=0;
00111 }
00112
00118 public synchronized void put(Task<?> task){
00119 int streamId=task.getStreamId();
00120
00121 if(!results.containsKey(streamId)){
00122 results.put(streamId, new Vector<Task<?>>());
00123 }
00124
00125 Vector<Task<?>> vector=results.get(streamId);
00126
00127 vector.add(task);
00128 size++;
00129 }
00130
00138 public synchronized Task<?> get(int streamId){
00139 if(isEmpty(streamId)) return null;
00140
00141 Vector<Task<?>> vector=results.get(streamId);
00142 if(vector.size()==0) return null;
00143 if(vector.size()==1) results.remove(streamId);
00144 size--;
00145 return vector.remove(0);
00146 }
00147
00154 public synchronized boolean isEmpty(int streamId){
00155 return !results.containsKey(streamId);
00156 }
00157
00161 public synchronized int size(){
00162 return size;
00163 }
00164
00170 public synchronized int size(int streamId){
00171 if(isEmpty(streamId)) return 0;
00172 Vector<Task<?>> vector=results.get(streamId);
00173 return vector.size();
00174 }
00175 }
00176 }