00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.calcium;
00032
00033 import java.util.Hashtable;
00034 import java.util.Vector;
00035
00036 import org.apache.log4j.Logger;
00037 import org.objectweb.proactive.calcium.exceptions.MuscleException;
00038 import org.objectweb.proactive.calcium.exceptions.PanicException;
00039 import org.objectweb.proactive.calcium.interfaces.Instruction;
00040 import org.objectweb.proactive.calcium.interfaces.Skeleton;
00041 import org.objectweb.proactive.calcium.statistics.Stats;
00042 import org.objectweb.proactive.core.util.log.Loggers;
00043 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00044
00045
00046 public class Stream<T>{
00047 static Logger logger = ProActiveLogger.getLogger(Loggers.SKELETONS_KERNEL);
00048
00049 private int streamId;
00050 private Facade facade;
00051 private Skeleton<T> skeleton;
00052 private int pendingTasks;
00053
00054 private Hashtable<T, Stats> taskStats;
00055 protected Stream(Facade facade, Skeleton<T> skeleton){
00056
00057 this.streamId=(int)(Math.random()*Integer.MAX_VALUE);
00058 this.skeleton=skeleton;
00059 this.facade=facade;
00060 this.taskStats = new Hashtable<T, Stats>();
00061 this.pendingTasks=0;
00062 }
00063
00068 public void input(T param){
00069 Vector<T> paramV= new Vector<T>(1);
00070 paramV.add(param);
00071 input(paramV);
00072 }
00073
00078 public void input(Vector<T> paramV){
00079
00080 Vector<Instruction<T>> v = (Vector<Instruction<T>>) skeleton.getInstructionStack();
00081
00082
00083 for(T param:paramV){
00084 Task<T> task = new Task<T>(param);
00085 task.setStack(v);
00086 task.setStreamId(streamId);
00087 facade.putTask(task);
00088 pendingTasks++;
00089 }
00090 }
00091
00103 @SuppressWarnings("unchecked")
00104 public T getResult() throws PanicException, MuscleException{
00105 if(pendingTasks==0) return null;
00106
00107 Task<T> task = (Task<T>) facade.getResult(streamId);
00108 T res= task.getObject();
00109
00110 taskStats.put(res,task.getStats());
00111 pendingTasks--;
00112
00113 return res;
00114 }
00115
00122 public Stats getStats(T res) {
00123 return this.taskStats.get(res);
00124 }
00125 }