00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.calcium;
00032
00033 import java.util.concurrent.Callable;
00034 import java.util.concurrent.ExecutorService;
00035 import java.util.concurrent.Executors;
00036
00045 public class MultiThreadedManager extends ResourceManager{
00046
00047 ExecutorService threadPool;
00048 int numThreads;
00049
00050 public MultiThreadedManager(int numThreads){
00051
00052 this.threadPool=Executors.newFixedThreadPool(numThreads);
00053 this.numThreads=numThreads;
00054 }
00055
00056 @Override
00057 public Skernel boot(Skernel skernel) {
00058
00059 for(int i=0;i<numThreads;i++){
00060 threadPool.submit(new CallableInterpreter(skernel));
00061 }
00062 return skernel;
00063 }
00064
00065 @Override
00066 public void shutdown(){
00067 threadPool.shutdownNow();
00068 }
00069
00074 protected class CallableInterpreter extends Interpreter implements Callable<Task<?>>{
00075
00076 Skernel skernel;
00077
00078 public CallableInterpreter(Skernel skernel){
00079 this.skernel=skernel;
00080 }
00081
00082 public Task<?> call() throws Exception {
00083
00084 Task<?> task = skernel.getReadyTask(DEFAULT_GET_READY_TASK_TIMEOUT);
00085
00086 while(task!=null){
00087 task = super.interpret(task);
00088 skernel.putTask(task);
00089 task = skernel.getReadyTask(DEFAULT_GET_READY_TASK_TIMEOUT);
00090 }
00091 return task;
00092 }
00093 }
00094 }