org/objectweb/proactive/calcium/Skernel.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.calcium;
00032 
00033 import java.io.Serializable;
00034 import java.util.Enumeration;
00035 import java.util.Hashtable;
00036 import java.util.PriorityQueue;
00037 import java.util.Vector;
00038 
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.calcium.exceptions.PanicException;
00041 import org.objectweb.proactive.calcium.statistics.StatsGlobalImpl;
00042 import org.objectweb.proactive.core.util.log.Loggers;
00043 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00044 
00057 public class Skernel implements Serializable{
00058         static Logger logger = ProActiveLogger.getLogger(Loggers.SKELETONS_KERNEL);
00059         
00060         //State Queues
00061         private PriorityQueue<Task<?>> ready; //Tasks ready for execution
00062         private Hashtable<Task<?>,Task<?>> waiting; //Tasks waiting for subtasks completition
00063         private Vector<Task<?>> results; //Finished root-tasks
00064         private Hashtable<Task<?>,Task<?>> processing; //Tasks being processed at this moment
00065         
00066         private PanicException panicException;  //In case the system colapses
00067         private StatsGlobalImpl stats;  //Statistics
00068         
00069         public Skernel(){
00070                 this.ready= new PriorityQueue<Task<?>>();
00071                 this.waiting=new Hashtable<Task<?>,Task<?>>();
00072                 this.results = new Vector<Task<?>>();
00073                 this.processing=new Hashtable<Task<?>,Task<?>>();
00074                 this.stats=new StatsGlobalImpl();
00075                 this.panicException=null;
00076         }
00077         
00078         public synchronized Task<?> getResult() throws PanicException{
00079                 while(results.size()<=0 && !isPaniqued()){
00080 
00081                         try {
00082                                 if(logger.isDebugEnabled()){
00083                                         logger.debug("Thread waiting for results:"+Thread.currentThread().getId());
00084                                 }
00085                                 wait();
00086                         } catch (InterruptedException e) {
00087                                 e.printStackTrace();
00088                                 return null;
00089                         }
00090                 }
00091                 
00092                 if(isPaniqued()) {
00093                         notifyAll();
00094                         throw panicException;
00095                 }
00096                         
00097                 Task<?> resultTask=results.remove(0);
00098                 resultTask.getStats().exitResultsState();
00099                 
00100                 stats.increaseSolvedTasks(resultTask);
00101                 
00102                 if(resultTask.hasException()){
00103                         //Only runtime exception can be found here
00104                         throw (RuntimeException)resultTask.getException(); 
00105                 }
00106                 
00107                 return resultTask;
00108         }
00109 /*
00110         public synchronized Task<?> getReadyTask(){
00111                 return getReadyTask(0);
00112         }
00113         */
00123         public synchronized Task<?> getReadyTask(long timeout){
00124                 long lastinit = System.currentTimeMillis();
00125                 timeout=timeout>0?timeout:0; //if timeout<0 => timeout=0
00126                 
00127                 while(ready.isEmpty()){
00128 
00129                         try {
00130                                 if(logger.isDebugEnabled()){
00131                                         logger.debug("Waiting for ready task:"+Thread.currentThread().getId());
00132                                 }
00133                                 wait(timeout);
00134                                 long newinit=System.currentTimeMillis();
00135                                 timeout-=newinit-lastinit;
00136                                 lastinit=newinit;
00137                         } catch (InterruptedException e) {
00138                                 //logger.error("Error while waiting for ready task");
00139                                 //e.printStackTrace();
00140                                 return null;
00141                         }
00142                         
00143                         if(timeout <=0) return null;
00144                 }
00145                         
00146                 Task<?> task = ready.poll(); //get the highest priority task
00147                 task.getStats().exitReadyState();
00148 
00149                 processing.put(task,task);
00150                 if(logger.isDebugEnabled()){
00151                         logger.debug("Serving taskId="+task);
00152                 }
00153 
00154                 return task;
00155         }
00156         
00157         public synchronized void putTask(Task<?> task){
00158                 
00159                 //TODO think about throwing an exception here!
00160                 //if(isPaniqued()) throw panicException;
00161                 
00162                 Task<?> processingTask=processing.remove(task);
00163                 if(processingTask==null && logger.isDebugEnabled()){
00164                         logger.debug("Enqueing new taskId="+task);
00165                 } else if( logger.isDebugEnabled()){
00166                         logger.debug("Updating taskId="+task);
00167                 }
00168                 
00169                 //The task can be tainted by exceptions on other family members
00170                 if(processingTask!=null && processingTask.isTainted()){
00171                         if(logger.isDebugEnabled()){
00172                                 logger.debug("Dropping tainted taskId="+task);
00173                         }
00174                         return;
00175                 }
00176                 
00177                 task.getStats().exitProcessingState();
00178                 
00179                 if(task.hasException()){
00180                         updateExceptionedTask(task);
00181                 }
00182                 else if(task.isFinished()){
00183                         updateFinishedTask(task);
00184                 }
00185                 else{
00186                         updateTask(task);
00187                 }
00188                 
00189                 if(!ready.isEmpty()){
00190                 //if(!ready.isEmpty() || results.size()!=0){
00191                         notifyAll();
00192                 }
00193         }
00194         
00195         public synchronized boolean isPaniqued(){
00196                 
00197                 return this.panicException != null;
00198         }
00199         
00200         private void updateExceptionedTask(Task<?> task){
00201                 
00202                 if(logger.isDebugEnabled()){
00203                         logger.debug("Updating Exceptioned Task taskId="+task);
00204                 }
00205                 
00206                 Exception e=task.getException();
00207                 
00208                 if( e instanceof PanicException) {
00209                         kernelPanic((PanicException)e);         //Panic exception
00210                         return;
00211                 }
00212                                 
00213                 if( e instanceof RuntimeException){     //Fatal Exception
00214                         deleteTaskFamilyFromQueues(task);
00215                         return;
00216                 }
00217 
00218                 //TODO handle Scheduling Exceptions
00219                 
00220                 //Else: handle regular exceptions
00221                 if(task.isFinished()){
00222                         String msg="Panic Error. Task with exceptions cannot be a finished task!";
00223                         logger.error(msg);
00224                         kernelPanic(new PanicException(msg));
00225                 }
00226                 
00227                 if(task.isRootTask()){  //if its a root task then thats all folks
00228                         deleteTaskFamilyFromQueues(task);
00229                         return;
00230                 }
00231                 
00232                 //if its a child task, we update it as a finished task
00233                 updateFinishedTask(task); 
00234         }
00235         
00236         private void updateFinishedTask(Task<?> task){
00237                 
00238                 if(!task.isFinished()){
00239                         String msg="Error, updating unfinished task as finished!";
00240                         logger.debug(msg);
00241                         kernelPanic(new PanicException(msg));
00242                         return;
00243                 }
00244                 
00245                 if(logger.isDebugEnabled()){
00246                         logger.debug("Updating Finished Task taskId="+task);
00247                 }
00248                 
00249                 task.markFinishTime();
00250                 
00251                 if(task.isRootTask()){ //Task finished
00252                         if(logger.isDebugEnabled()){
00253                                 logger.debug("Adding to results task="+task);
00254                         }
00255                         results.add(task);
00256                 }
00257                 else{ //task is a subtask
00258                         stats.increaseSolvedTasks(task);
00259                         
00260                         int parentId=task.getParentId();
00261                         if(!this.waiting.containsKey(parentId)){
00262                                 logger.error("Error. Parent task id="+parentId+" is not waiting for child tasks");
00263                                 logger.error("Dropping task id="+task);
00264                                 return;
00265                         }
00266                         
00267                         Task<?> parent=waiting.get(parentId);
00268                         if(!parent.setFinishedChild(task)){
00269                                 logger.error("Parent did not recognize child task. Dropping task id="+task);
00270                                 return;
00271                         }
00272                         
00273                         //If this was the last subtask, then the parent is ready for execution
00274                         if(parent.isReady()){
00275                                 if(logger.isDebugEnabled()){
00276                                         logger.debug("Parent taskId="+parent.getId() +" is ready");
00277                                 }
00278                                 if(waiting.remove(parent)==null){
00279                                         logger.error("Error, parent not waiting when it should have been.");
00280                                 }
00281                                 parent.getStats().exitWaitingState();
00282                                 ready.add(parent);
00283                         }
00284                 }
00285         }//if its a child task, we update it as a finished task
00286         
00287         private void updateTask(Task<?> task){
00288                 
00289                 //logger.debug("Unfinished taskId="+task);
00290                 if(task.hasReadyChildTask()){
00291                         while(task.hasReadyChildTask()){
00292                                 Task<?> child=task.getReadyChild();
00293                                 if(logger.isDebugEnabled()){
00294                                         logger.debug("Child taskId="+child.getId() +" is ready");
00295                                 }
00296                                 ready.add(child); //child will have more priority than uncles
00297                         }
00298                         if(logger.isDebugEnabled()){
00299                                 logger.debug("Parent Task taskId="+task.getId() +" is waiting");
00300                         }                               
00301                         waiting.put(task,task); //the parent task will wait for it's subtasks
00302                         return;
00303                 }
00304                 else{
00305                         if(logger.isDebugEnabled()){
00306                                 logger.debug("Task taskId="+task.getId() +" is ready");
00307                         }
00308                         ready.add(task);
00309                 }
00310         }//method
00311         
00312         /*
00313         public synchronized boolean isFinished(){
00314 
00315                 return ready.isEmpty() && processing.isEmpty();
00316         } 
00317     */
00318         public synchronized boolean hasResults(){
00319                 return !results.isEmpty();
00320         }
00321 
00322         public synchronized boolean hasReadyTask(){
00323                 return !ready.isEmpty();
00324         }
00325         
00326         public synchronized int getReadyQueueLength() {
00327                 return ready.size();
00328         }
00329         
00330         public synchronized StatsGlobalImpl getStatsGlobal() {
00331                 stats.setQueueLengths(ready.size(), processing.size(), 
00332                                                                 waiting.size(), results.size());
00333                 return stats;
00334         }
00335         
00336         private void deleteTaskFamilyFromQueues(Task<?> blackSheepTask){
00337                 
00338                 //1. Put the root tasks in the results queue
00339                 Task<?> root;
00340                 try {
00341                         root = getRootTask(blackSheepTask);
00342                         root.setException(blackSheepTask.getException());
00343                         results.add(root);
00344                 } catch (PanicException e) {
00345                         kernelPanic(e); //panic if can not get root task
00346                         return;
00347                 }
00348                 
00349                 //2. Delete ready family tasks
00350                 for(Task<?> task:ready){
00351                         if(task.getFamilyId()==blackSheepTask.getFamilyId()){
00352                                 ready.remove(task);
00353                                 task.getStats().exitReadyState();
00354                         }
00355                 }
00356                 
00357                 //3. Delete waiting family tasks
00358                 Enumeration<Task<?>> enumeration = waiting.elements();
00359                 while(enumeration.hasMoreElements()){
00360                         Task<?> task = enumeration.nextElement();
00361                         if(task.getFamilyId()==blackSheepTask.getFamilyId()){
00362                                 waiting.remove(task);
00363                                 task.getStats().exitWaitingState();
00364                         }
00365                 }
00366                 
00367                 //4. Mark family tasks in the processing queue as tainted.
00368                 enumeration = processing.elements();
00369                 while(enumeration.hasMoreElements()){
00370                         Task<?> task = enumeration.nextElement();
00371                         if(task.getFamilyId()==blackSheepTask.getFamilyId()){
00372                                 task.setTainted(true);
00373                         }
00374                 }
00375         }
00376         
00393         private Task<?> getRootTask(Task<?> task) throws PanicException{
00394                 if(task.isRootTask()) return task;
00395                 
00396                 if(this.waiting.contains(task.getFamilyId())){
00397                         Task<?> root = waiting.remove(task.getFamilyId());
00398                         root.getStats().exitWaitingState();
00399                         return root;
00400                 }
00401                 
00402                 if(this.processing.contains(task.getFamilyId())){
00403                         throw new PanicException("Error, root taskId="
00404                                         +task.getFamilyId()+" found in processing queue");
00405                 }
00406                 
00407                 for(Task<?> r:ready){
00408                         if(r.getId()==task.getFamilyId()){
00409                                 throw new PanicException("Error, root taskId="
00410                                                 +task.getFamilyId()+" found in ready queue");
00411                         }
00412                 }
00413                 
00414                 /*
00415                 for(Task<?> r:results){
00416                         if(r.getId()==task.getFamilyId()){
00417                                 throw new PanicException("Error, root taskId="
00418                                                 +task.getFamilyId()+" found in results queue");
00419                         }       
00420                 }
00421                 */
00422                 
00423                 return null;
00424         }
00425         
00426         private void kernelPanic(PanicException e){
00427                 logger.error("Kernel Panic:"+e.getCause());
00428                 this.panicException=e;
00429                 
00430                 notifyAll();
00431         }
00432 }//class

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1