00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.calcium.proactive;
00032
00033 import java.util.concurrent.Callable;
00034 import java.util.concurrent.ExecutorService;
00035 import java.util.concurrent.Executors;
00036
00037 import org.objectweb.proactive.ProActive;
00038 import org.objectweb.proactive.calcium.Interpreter;
00039 import org.objectweb.proactive.calcium.Skernel;
00040 import org.objectweb.proactive.calcium.Task;
00041 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00042 import org.objectweb.proactive.core.node.Node;
00043
00044 public class ProActiveThreadedManager extends AbstractProActiveManager {
00045
00046 ExecutorService threadPool;
00047
00048 public ProActiveThreadedManager(Node nodes[]){
00049 super(nodes);
00050 }
00051
00052 public ProActiveThreadedManager(VirtualNode vn){
00053 super(vn);
00054 }
00055
00056 public ProActiveThreadedManager(String descriptorPath, String virtualNodeName){
00057 super(descriptorPath,virtualNodeName);
00058 }
00059
00060 @Override
00061 public Skernel boot(Skernel skernel) {
00062 logger.info("ProActive skeleton manager is using "+nodes.length+" nodes");
00063 threadPool=Executors.newCachedThreadPool();
00064 try {
00065 for(int i=0;i<nodes.length;i++){
00066 Interpreter interp=(Interpreter)ProActive.newActive(Interpreter.class.getName(),null,nodes[i]);
00067 threadPool.submit(new ProActiveCallableInterpreter(skernel,interp));
00068 }
00069 } catch (Exception e) {
00070 logger.error("Error, unable to create interpreter active objects");
00071 e.printStackTrace();
00072 }
00073 return skernel;
00074 }
00075
00076 @Override
00077 public void shutdown() {
00078 super.shutdown();
00079 if(threadPool != null){
00080 threadPool.shutdownNow();
00081 }
00082 }
00083
00084 protected class ProActiveCallableInterpreter implements Callable<Task<?>>{
00085
00086 Interpreter interp;
00087 Skernel skernel;
00088
00089 public ProActiveCallableInterpreter(Skernel skernel, Interpreter interp){
00090 this.skernel=skernel;
00091 this.interp=interp;
00092 }
00093
00094 public Task<?> call() throws Exception {
00095
00096 Task<?> task = skernel.getReadyTask(0);
00097
00098 while(task!=null){
00099 task = interp.interpret(task);
00100
00101 ProActive.waitFor(task);
00102
00103 skernel.putTask(task);
00104
00105 task = skernel.getReadyTask(0);
00106 }
00107
00108 return task;
00109 }
00110 }
00111 }