00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.calcium.proactive;
00032 
00033 import java.util.concurrent.Callable;
00034 import java.util.concurrent.ExecutorService;
00035 import java.util.concurrent.Executors;
00036 
00037 import org.objectweb.proactive.ProActive;
00038 import org.objectweb.proactive.calcium.Interpreter;
00039 import org.objectweb.proactive.calcium.Skernel;
00040 import org.objectweb.proactive.calcium.Task;
00041 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00042 import org.objectweb.proactive.core.node.Node;
00043 
00044 public class ProActiveThreadedManager extends AbstractProActiveManager {
00045 
00046         ExecutorService threadPool;
00047 
00048         public ProActiveThreadedManager(Node nodes[]){          
00049                 super(nodes);
00050         }
00051         
00052         public ProActiveThreadedManager(VirtualNode vn){
00053                 super(vn);
00054         }
00055         
00056         public ProActiveThreadedManager(String descriptorPath, String virtualNodeName){
00057                 super(descriptorPath,virtualNodeName);
00058         }
00059 
00060         @Override
00061         public Skernel boot(Skernel skernel) {
00062                 logger.info("ProActive skeleton manager is using "+nodes.length+" nodes");
00063                 threadPool=Executors.newCachedThreadPool();
00064                 try {
00065                         for(int i=0;i<nodes.length;i++){
00066                                 Interpreter interp=(Interpreter)ProActive.newActive(Interpreter.class.getName(),null,nodes[i]);
00067                                 threadPool.submit(new ProActiveCallableInterpreter(skernel,interp));
00068                         }
00069                 } catch (Exception e) {
00070                         logger.error("Error, unable to create interpreter active objects");
00071                         e.printStackTrace();
00072                 }
00073                 return skernel;
00074         }
00075         
00076         @Override
00077         public void shutdown() {
00078                 super.shutdown();
00079                 if(threadPool != null){
00080                         threadPool.shutdownNow();
00081                 }
00082         }
00083         
00084         protected class ProActiveCallableInterpreter implements Callable<Task<?>>{
00085                         
00086                 Interpreter interp;
00087                 Skernel skernel;
00088                 
00089                 public ProActiveCallableInterpreter(Skernel skernel, Interpreter interp){
00090                         this.skernel=skernel;
00091                         this.interp=interp;
00092                 }
00093 
00094                 public Task<?> call() throws Exception {
00095                         
00096                         Task<?> task = skernel.getReadyTask(0);
00097                         
00098                         while(task!=null){
00099                                 task = interp.interpret(task);
00100                                 
00101                                 ProActive.waitFor(task); 
00102 
00103                                 skernel.putTask(task);
00104 
00105                                 task = skernel.getReadyTask(0);
00106                         }
00107 
00108                         return task;
00109                 }
00110         }
00111 }