00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.ext.scilab.monitor;
00032
00033 import java.io.File;
00034 import java.io.IOException;
00035 import java.io.Serializable;
00036 import java.util.ArrayList;
00037 import java.util.HashMap;
00038
00039 import javasci.SciData;
00040
00041 import org.apache.log4j.Logger;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.core.util.log.Loggers;
00044 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00045 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00046 import org.objectweb.proactive.ext.scilab.SciDeployEngine;
00047 import org.objectweb.proactive.ext.scilab.SciEngine;
00048 import org.objectweb.proactive.ext.scilab.SciResult;
00049 import org.objectweb.proactive.ext.scilab.SciTask;
00050
00060 public class ScilabService implements Serializable{
00061
00062 private HashMap<String, SciEngineInfo> mapEngine;
00063 private ArrayList<String> listIdEngineFree;
00064
00065 private ArrayList<SciTaskInfo> listTaskWait;
00066 private HashMap<String, SciTaskInfo> mapTaskRun;
00067 private HashMap<String, SciTaskInfo> mapTaskEnd;
00068
00069 private long countIdTask;
00070 private long countIdEngine;
00071
00072 private SciEventSource taskObservable;
00073 private SciEventSource engineObservable;
00074
00075 private static Logger logger = ProActiveLogger.getLogger(Loggers.SCILAB_SERVICE);
00076
00080 public ScilabService() {
00081 this.mapEngine = new HashMap<String, SciEngineInfo>();
00082 this.listIdEngineFree = new ArrayList<String>();
00083
00084 this.listTaskWait = new ArrayList<SciTaskInfo>();
00085 this.mapTaskRun = new HashMap<String, SciTaskInfo>();
00086 this.mapTaskEnd = new HashMap<String, SciTaskInfo>();
00087
00088 this.taskObservable = new SciEventSource();
00089 this.engineObservable = new SciEventSource();
00090
00091 (new Thread(){
00092 public void run(){
00093 executeTasks();
00094 }
00095 }).start();
00096
00097
00098 (new Thread(){
00099 public void run(){
00100 retrieveResults();
00101 }
00102 }).start();
00103 }
00104
00113 public synchronized int deployEngine(String nameVirtualNode, String pathDescriptor, String arrayIdEngine[]){
00114 logger.debug("->ScilabService In:deployEngine:" + nameVirtualNode);
00115 HashMap mapNewEngine = SciDeployEngine.deploy(nameVirtualNode, pathDescriptor, arrayIdEngine);
00116 SciEngine sciEngine;
00117 String idEngine;
00118 BooleanWrapper isActivate;
00119
00120 for(int i=0; i<arrayIdEngine.length; i++){
00121 idEngine = arrayIdEngine[i];
00122 sciEngine = (SciEngine)mapNewEngine.get(idEngine);
00123
00124 if(sciEngine == null)
00125 continue;
00126
00127 isActivate = sciEngine.activate();
00128 mapEngine.put(idEngine, new SciEngineInfo(idEngine, sciEngine, isActivate));
00129 }
00130
00131 this.listIdEngineFree.addAll(mapNewEngine.keySet());
00132 this.engineObservable.fireSciEvent(null);
00133 notifyAll();
00134 return mapNewEngine.size();
00135 }
00136
00137 public synchronized int deployEngine(String nameVirtualNode, String pathDescriptor){
00138 long countTmp = this.countIdEngine;
00139 int nbEngine = SciDeployEngine.getNbMappedNodes(nameVirtualNode, pathDescriptor);
00140 String arrayIdEngine[] = new String[nbEngine];
00141 for(int i=0; i<arrayIdEngine.length; i++){
00142 arrayIdEngine[i] = "Engine" + countTmp++;
00143 }
00144
00145 nbEngine = this.deployEngine(nameVirtualNode, pathDescriptor, arrayIdEngine);
00146 this.countIdEngine += nbEngine;
00147 return nbEngine;
00148 }
00149
00150 public synchronized int deployEngine(String nameVirtualNode, String pathDescriptor, int nbEngine){
00151 long countTmp = this.countIdEngine;
00152 String arrayIdEngine[] = new String[nbEngine];
00153 for(int i=0; i<arrayIdEngine.length; i++){
00154 arrayIdEngine[i] = "Engine" + countTmp++;
00155 }
00156
00157 nbEngine = this.deployEngine(nameVirtualNode, pathDescriptor, arrayIdEngine);
00158 this.countIdEngine += nbEngine;
00159 return nbEngine;
00160 }
00161
00167 public synchronized void sendTask(SciTask sciTask){
00168 logger.debug("->ScilabService In:sendTask:" + sciTask.getId());
00169 SciTaskInfo sciTaskInfo = new SciTaskInfo(sciTask);
00170 sciTaskInfo.setState(SciTaskInfo.PENDING);
00171 this.listTaskWait.add(sciTaskInfo);
00172 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00173 notifyAll();
00174 }
00175
00176 public synchronized void sendTask(String pathScript, String jobInit, String dataOut[]) throws IOException{
00177 this.sendTask(pathScript, jobInit, dataOut, SciTaskInfo.NORMAL);
00178 }
00179
00180 public synchronized void sendTask(String pathScript, String jobInit, String dataOut[], int Priority) throws IOException{
00181 logger.debug("->ScilabService In:sendTask");
00182
00183
00184 SciTask sciTask = new SciTask("Task" + this.countIdTask++);
00185
00186 File f = new File(pathScript);
00187
00188 sciTask.setJob(f);
00189 sciTask.setJobInit(jobInit);
00190
00191 for(int i=0; i< dataOut.length; i++){
00192 logger.debug("->ScilabService DataOut:sendTask:" + dataOut[i]);
00193 if(dataOut[i].trim().equals("")){
00194 continue;
00195 }
00196 sciTask.addDataOut(new SciData(dataOut[i]));
00197 }
00198
00199 SciTaskInfo sciTaskInfo = new SciTaskInfo(sciTask);
00200 sciTaskInfo.setFileScript(f);
00201
00202 sciTaskInfo.setState(SciTaskInfo.PENDING);
00203 sciTaskInfo.setPriority(Priority);
00204
00205 this.listTaskWait.add(sciTaskInfo);
00206 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00207 notifyAll();
00208
00209 }
00210
00215 public synchronized void killTask(String idTask){
00216 logger.debug("->ScilabService In:killTask:" + idTask);
00217
00218 SciTaskInfo sciTaskInfo = this.mapTaskRun.remove(idTask);
00219
00220 if(sciTaskInfo == null){
00221 return;
00222 }
00223
00224 String idEngine = sciTaskInfo.getIdEngine();
00225 SciEngineInfo sciEngineInfo = mapEngine.get(idEngine);
00226
00227
00228 SciEngine sciEngine = sciEngineInfo.getSciEngine();
00229 sciEngine.killWorker();
00230
00231 BooleanWrapper isActivate = sciEngine.activate();
00232 sciEngineInfo.setIsActivate(isActivate);
00233 sciEngineInfo.setIdCurrentTask(null);
00234
00235 sciTaskInfo.setState(SciTaskInfo.KILLED);
00236
00237 this.listIdEngineFree.add(idEngine);
00238 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00239 }
00240
00245 public synchronized void restartEngine(String idEngine){
00246 logger.debug("->ScilabService In:restartEngine:" + idEngine);
00247
00248 SciEngineInfo sciEngineInfo = this.mapEngine.get(idEngine);
00249
00250 if(sciEngineInfo == null){
00251 return;
00252 }
00253
00254 String idTask = sciEngineInfo.getIdCurrentTask();
00255 SciTaskInfo sciTaskInfo;
00256
00257 if(idTask != null){
00258 sciTaskInfo = this.mapTaskRun.remove(idTask);
00259 sciTaskInfo.setState(SciTaskInfo.KILLED);
00260 sciEngineInfo.setIdCurrentTask(null);
00261 this.listIdEngineFree.add(idEngine);
00262 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00263 }
00264
00265 SciEngine sciEngine = sciEngineInfo.getSciEngine();
00266 sciEngine.killWorker();
00267 BooleanWrapper isActivate = sciEngine.activate();
00268 sciEngineInfo.setIsActivate(isActivate);
00269 }
00270
00271
00276 public synchronized void cancelTask(String idTask){
00277 logger.debug("->ScilabService In:cancelTask:" + idTask);
00278 SciTaskInfo sciTaskInfo;
00279 for(int i=0; i<this.listTaskWait.size(); i++){
00280 sciTaskInfo = this.listTaskWait.get(i);
00281
00282 if(idTask.equals(sciTaskInfo.getIdTask())){
00283 this.listTaskWait.remove(i);
00284 sciTaskInfo.setState(SciTaskInfo.CANCELLED);
00285 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00286 break;
00287 }
00288 }
00289 }
00290
00295 public synchronized void removeTask(String idTask){
00296 logger.debug("->ScilabService In:removeTask:" + idTask);
00297 SciTaskInfo sciTaskInfo = this.mapTaskEnd.remove(idTask);
00298 sciTaskInfo.setState(SciTaskInfo.REMOVED);
00299 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00300 }
00301
00302
00303 private synchronized void retrieveResults(){
00304 logger.debug("->ScilabService In:retrieveResult");
00305 SciResult sciResult;
00306 SciTaskInfo sciTaskInfo;
00307 String idEngine;
00308 SciEngineInfo sciEngineInfo;
00309 Object keys[];
00310
00311 while (true) {
00312 keys= this.mapTaskRun.keySet().toArray();
00313 for (int i = 0; i < keys.length; i++) {
00314 sciTaskInfo = this.mapTaskRun.get(keys[i]);
00315 sciResult = sciTaskInfo.getSciResult();
00316 if (!ProActive.isAwaited(sciResult)) {
00317 logger.debug("->ScilabService loop:retrieveResult:" + keys[i]);
00318
00319 this.mapTaskRun.remove(keys[i]);
00320 sciTaskInfo.setState(sciResult.getState());
00321
00322 idEngine = sciTaskInfo.getIdEngine();
00323 sciEngineInfo = mapEngine.get(idEngine);
00324
00325 sciEngineInfo.setIdCurrentTask(null);
00326 this.listIdEngineFree.add(idEngine);
00327 sciTaskInfo.setDateEnd();
00328 this.mapTaskEnd.put(sciTaskInfo.getIdTask(), sciTaskInfo);
00329 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00330 notifyAll();
00331 }
00332 }
00333 try {
00334 wait(1000);
00335 } catch (InterruptedException e) {
00336 }
00337 }
00338 }
00339
00340 private SciTaskInfo getNextTask(){
00341 logger.debug("->ScilabService loop:getNextTask");
00342 SciTaskInfo sciTaskInfo;
00343
00344 if (this.listTaskWait.size() == 0) return null;
00345
00346 for(int i=0; i<this.listTaskWait.size(); i++){
00347 sciTaskInfo = this.listTaskWait.get(i);
00348
00349 if(sciTaskInfo.getPriority() == SciTaskInfo.HIGH){
00350 return this.listTaskWait.remove(i);
00351 }
00352 }
00353
00354 for(int i=0; i<this.listTaskWait.size(); i++){
00355 sciTaskInfo = this.listTaskWait.get(i);
00356
00357 if(sciTaskInfo.getPriority() == SciTaskInfo.NORMAL){
00358 return this.listTaskWait.remove(i);
00359 }
00360 }
00361
00362 return this.listTaskWait.remove(0);
00363 }
00364
00365
00366 private SciEngineInfo getNextEngine(){
00367 logger.debug("->ScilabService loop:getNextEngine");
00368 String idEngine;
00369 SciEngineInfo sciEngineInfo;
00370 BooleanWrapper isActivate;
00371 int i = 0;
00372 int count = this.listIdEngineFree.size();
00373
00374 while (i<count){
00375 idEngine = this.listIdEngineFree.remove(0);
00376 sciEngineInfo = mapEngine.get(idEngine);
00377 isActivate = sciEngineInfo.getIsActivate();
00378 logger.debug("->ScilabService test0:getNextEngine:" + idEngine);
00379 if(ProActive.isAwaited(isActivate)){
00380 logger.debug("->ScilabService test1:getNextEngine:" + idEngine);
00381 this.listIdEngineFree.add(idEngine);
00382 }
00383 else if(isActivate.booleanValue()){
00384 logger.debug("->ScilabService test2:getNextEngine:" + idEngine);
00385 return sciEngineInfo;
00386 }else{
00387 logger.debug("->ScilabService test3:getNextEngine:" + idEngine);
00388 this.listIdEngineFree.add(idEngine);
00389 SciEngine sciEngine = sciEngineInfo.getSciEngine();
00390 sciEngineInfo.setIsActivate(sciEngine.activate());
00391 }
00392 i++;
00393 }
00394 return null;
00395 }
00396
00397
00398 private synchronized void executeTasks(){
00399 logger.debug("->ScilabService In:executeTasks");
00400
00401 SciTaskInfo sciTaskInfo;
00402 SciEngineInfo sciEngineInfo;
00403
00404
00405 while(true){
00406 if (this.listTaskWait.size() == 0) {
00407 try {
00408 logger.debug("->ScilabService test0:executeTask");
00409 wait();
00410 } catch (InterruptedException e) {
00411 }
00412
00413 continue;
00414 }
00415
00416 sciEngineInfo = this.getNextEngine();
00417 if(sciEngineInfo == null){
00418 try {
00419 logger.debug("->ScilabService test1:executeTask");
00420 wait(1000);
00421 } catch (InterruptedException e) {
00422 }
00423 continue;
00424 }
00425
00426 sciTaskInfo = this.getNextTask();
00427 if(sciTaskInfo == null){
00428 try {
00429 logger.debug("->ScilabService test2:executeTask");
00430 wait(1000);
00431 } catch (InterruptedException e) {
00432 }
00433 continue;
00434 }
00435
00436 this.executeTask(sciEngineInfo, sciTaskInfo);
00437 }
00438 }
00439
00440 private synchronized void executeTask(SciEngineInfo sciEngineInfo, SciTaskInfo sciTaskInfo) {
00441 logger.debug("->ScilabService In:executeTask");
00442 SciResult sciResult;
00443 SciEngine sciEngine;
00444
00445 sciEngineInfo.setIdCurrentTask(sciTaskInfo.getIdTask());
00446
00447 sciEngine = sciEngineInfo.getSciEngine();
00448 sciTaskInfo.setIdEngine(sciEngineInfo.getIdEngine());
00449 sciTaskInfo.setState(SciTaskInfo.RUNNING);
00450 sciResult = sciEngine.execute(sciTaskInfo.getSciTask());
00451
00452 sciTaskInfo.setSciResult(sciResult);
00453 this.mapTaskRun.put(sciTaskInfo.getIdTask(), sciTaskInfo);
00454 this.taskObservable.fireSciEvent(new SciEvent(sciTaskInfo));
00455 notifyAll();
00456 }
00457
00458
00459 public synchronized void addEventListenerTask(SciEventListener evtListener){
00460 taskObservable.addSciEventListener(evtListener);
00461 }
00462
00463 public synchronized void addEventListenerEngine(SciEventListener evtListener){
00464 engineObservable.addSciEventListener(evtListener);
00465 }
00466
00467 public synchronized void removeEventListenerTask(SciEventListener evtListener){
00468 taskObservable.removeSciEventListener(evtListener);
00469 }
00470
00471 public synchronized void removeAllEventListenerTask(){
00472 taskObservable = null;
00473 taskObservable = new SciEventSource();
00474 }
00475
00476 public synchronized void removeAllEventListenerEngine(){
00477 engineObservable = null;
00478 engineObservable = new SciEventSource();
00479 }
00480
00481 public synchronized void removeEventListenerEngine(SciEventListener evtListener){
00482 engineObservable.removeSciEventListener(evtListener);
00483 }
00484
00489 public synchronized SciTaskInfo getTaskEnd(String idTask){
00490 return mapTaskEnd.get(idTask);
00491 }
00492
00496 public synchronized int getNbEngine(){
00497 return mapEngine.size();
00498 }
00499
00504 public synchronized HashMap<String, SciTaskInfo> getMapTaskEnd() {
00505 return (HashMap<String, SciTaskInfo>) mapTaskEnd.clone();
00506 }
00507
00508
00512 public synchronized HashMap<String, SciTaskInfo> getMapTaskRun() {
00513 return (HashMap<String, SciTaskInfo>) mapTaskRun.clone();
00514 }
00515
00519 public synchronized HashMap<String, SciEngineInfo> getMapEngine() {
00520 return (HashMap<String, SciEngineInfo>) mapEngine.clone();
00521 }
00522
00526 public synchronized ArrayList<SciTaskInfo> getListTaskWait() {
00527 return (ArrayList<SciTaskInfo>) listTaskWait.clone();
00528 }
00529
00534 public synchronized void exit(){
00535 logger.debug("->ScilabService In:exit");
00536 SciEngineInfo sciEngineInfo;
00537 SciEngine sciEngine;
00538
00539 Object keys[] = mapEngine.keySet().toArray();
00540 for(int i=0; i<keys.length; i++){
00541 sciEngineInfo = mapEngine.get(keys[i]);
00542 sciEngine = sciEngineInfo.getSciEngine();
00543 try{
00544 sciEngine.exit();
00545 }catch(RuntimeException e ){
00546
00547 }
00548 }
00549 }
00550 }