00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.calcium;
00032
00033 import java.io.Serializable;
00034 import java.util.Enumeration;
00035 import java.util.Hashtable;
00036 import java.util.PriorityQueue;
00037 import java.util.Vector;
00038
00039 import org.apache.log4j.Logger;
00040 import org.objectweb.proactive.calcium.exceptions.PanicException;
00041 import org.objectweb.proactive.calcium.statistics.StatsGlobalImpl;
00042 import org.objectweb.proactive.core.util.log.Loggers;
00043 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00044
00057 public class Skernel implements Serializable{
00058 static Logger logger = ProActiveLogger.getLogger(Loggers.SKELETONS_KERNEL);
00059
00060
00061 private PriorityQueue<Task<?>> ready;
00062 private Hashtable<Task<?>,Task<?>> waiting;
00063 private Vector<Task<?>> results;
00064 private Hashtable<Task<?>,Task<?>> processing;
00065
00066 private PanicException panicException;
00067 private StatsGlobalImpl stats;
00068
00069 public Skernel(){
00070 this.ready= new PriorityQueue<Task<?>>();
00071 this.waiting=new Hashtable<Task<?>,Task<?>>();
00072 this.results = new Vector<Task<?>>();
00073 this.processing=new Hashtable<Task<?>,Task<?>>();
00074 this.stats=new StatsGlobalImpl();
00075 this.panicException=null;
00076 }
00077
00078 public synchronized Task<?> getResult() throws PanicException{
00079 while(results.size()<=0 && !isPaniqued()){
00080
00081 try {
00082 if(logger.isDebugEnabled()){
00083 logger.debug("Thread waiting for results:"+Thread.currentThread().getId());
00084 }
00085 wait();
00086 } catch (InterruptedException e) {
00087 e.printStackTrace();
00088 return null;
00089 }
00090 }
00091
00092 if(isPaniqued()) {
00093 notifyAll();
00094 throw panicException;
00095 }
00096
00097 Task<?> resultTask=results.remove(0);
00098 resultTask.getStats().exitResultsState();
00099
00100 stats.increaseSolvedTasks(resultTask);
00101
00102 if(resultTask.hasException()){
00103
00104 throw (RuntimeException)resultTask.getException();
00105 }
00106
00107 return resultTask;
00108 }
00109
00110
00111
00112
00113
00123 public synchronized Task<?> getReadyTask(long timeout){
00124 long lastinit = System.currentTimeMillis();
00125 timeout=timeout>0?timeout:0;
00126
00127 while(ready.isEmpty()){
00128
00129 try {
00130 if(logger.isDebugEnabled()){
00131 logger.debug("Waiting for ready task:"+Thread.currentThread().getId());
00132 }
00133 wait(timeout);
00134 long newinit=System.currentTimeMillis();
00135 timeout-=newinit-lastinit;
00136 lastinit=newinit;
00137 } catch (InterruptedException e) {
00138
00139
00140 return null;
00141 }
00142
00143 if(timeout <=0) return null;
00144 }
00145
00146 Task<?> task = ready.poll();
00147 task.getStats().exitReadyState();
00148
00149 processing.put(task,task);
00150 if(logger.isDebugEnabled()){
00151 logger.debug("Serving taskId="+task);
00152 }
00153
00154 return task;
00155 }
00156
00157 public synchronized void putTask(Task<?> task){
00158
00159
00160
00161
00162 Task<?> processingTask=processing.remove(task);
00163 if(processingTask==null && logger.isDebugEnabled()){
00164 logger.debug("Enqueing new taskId="+task);
00165 } else if( logger.isDebugEnabled()){
00166 logger.debug("Updating taskId="+task);
00167 }
00168
00169
00170 if(processingTask!=null && processingTask.isTainted()){
00171 if(logger.isDebugEnabled()){
00172 logger.debug("Dropping tainted taskId="+task);
00173 }
00174 return;
00175 }
00176
00177 task.getStats().exitProcessingState();
00178
00179 if(task.hasException()){
00180 updateExceptionedTask(task);
00181 }
00182 else if(task.isFinished()){
00183 updateFinishedTask(task);
00184 }
00185 else{
00186 updateTask(task);
00187 }
00188
00189 if(!ready.isEmpty()){
00190
00191 notifyAll();
00192 }
00193 }
00194
00195 public synchronized boolean isPaniqued(){
00196
00197 return this.panicException != null;
00198 }
00199
00200 private void updateExceptionedTask(Task<?> task){
00201
00202 if(logger.isDebugEnabled()){
00203 logger.debug("Updating Exceptioned Task taskId="+task);
00204 }
00205
00206 Exception e=task.getException();
00207
00208 if( e instanceof PanicException) {
00209 kernelPanic((PanicException)e);
00210 return;
00211 }
00212
00213 if( e instanceof RuntimeException){
00214 deleteTaskFamilyFromQueues(task);
00215 return;
00216 }
00217
00218
00219
00220
00221 if(task.isFinished()){
00222 String msg="Panic Error. Task with exceptions cannot be a finished task!";
00223 logger.error(msg);
00224 kernelPanic(new PanicException(msg));
00225 }
00226
00227 if(task.isRootTask()){
00228 deleteTaskFamilyFromQueues(task);
00229 return;
00230 }
00231
00232
00233 updateFinishedTask(task);
00234 }
00235
00236 private void updateFinishedTask(Task<?> task){
00237
00238 if(!task.isFinished()){
00239 String msg="Error, updating unfinished task as finished!";
00240 logger.debug(msg);
00241 kernelPanic(new PanicException(msg));
00242 return;
00243 }
00244
00245 if(logger.isDebugEnabled()){
00246 logger.debug("Updating Finished Task taskId="+task);
00247 }
00248
00249 task.markFinishTime();
00250
00251 if(task.isRootTask()){
00252 if(logger.isDebugEnabled()){
00253 logger.debug("Adding to results task="+task);
00254 }
00255 results.add(task);
00256 }
00257 else{
00258 stats.increaseSolvedTasks(task);
00259
00260 int parentId=task.getParentId();
00261 if(!this.waiting.containsKey(parentId)){
00262 logger.error("Error. Parent task id="+parentId+" is not waiting for child tasks");
00263 logger.error("Dropping task id="+task);
00264 return;
00265 }
00266
00267 Task<?> parent=waiting.get(parentId);
00268 if(!parent.setFinishedChild(task)){
00269 logger.error("Parent did not recognize child task. Dropping task id="+task);
00270 return;
00271 }
00272
00273
00274 if(parent.isReady()){
00275 if(logger.isDebugEnabled()){
00276 logger.debug("Parent taskId="+parent.getId() +" is ready");
00277 }
00278 if(waiting.remove(parent)==null){
00279 logger.error("Error, parent not waiting when it should have been.");
00280 }
00281 parent.getStats().exitWaitingState();
00282 ready.add(parent);
00283 }
00284 }
00285 }
00286
00287 private void updateTask(Task<?> task){
00288
00289
00290 if(task.hasReadyChildTask()){
00291 while(task.hasReadyChildTask()){
00292 Task<?> child=task.getReadyChild();
00293 if(logger.isDebugEnabled()){
00294 logger.debug("Child taskId="+child.getId() +" is ready");
00295 }
00296 ready.add(child);
00297 }
00298 if(logger.isDebugEnabled()){
00299 logger.debug("Parent Task taskId="+task.getId() +" is waiting");
00300 }
00301 waiting.put(task,task);
00302 return;
00303 }
00304 else{
00305 if(logger.isDebugEnabled()){
00306 logger.debug("Task taskId="+task.getId() +" is ready");
00307 }
00308 ready.add(task);
00309 }
00310 }
00311
00312
00313
00314
00315
00316
00317
00318 public synchronized boolean hasResults(){
00319 return !results.isEmpty();
00320 }
00321
00322 public synchronized boolean hasReadyTask(){
00323 return !ready.isEmpty();
00324 }
00325
00326 public synchronized int getReadyQueueLength() {
00327 return ready.size();
00328 }
00329
00330 public synchronized StatsGlobalImpl getStatsGlobal() {
00331 stats.setQueueLengths(ready.size(), processing.size(),
00332 waiting.size(), results.size());
00333 return stats;
00334 }
00335
00336 private void deleteTaskFamilyFromQueues(Task<?> blackSheepTask){
00337
00338
00339 Task<?> root;
00340 try {
00341 root = getRootTask(blackSheepTask);
00342 root.setException(blackSheepTask.getException());
00343 results.add(root);
00344 } catch (PanicException e) {
00345 kernelPanic(e);
00346 return;
00347 }
00348
00349
00350 for(Task<?> task:ready){
00351 if(task.getFamilyId()==blackSheepTask.getFamilyId()){
00352 ready.remove(task);
00353 task.getStats().exitReadyState();
00354 }
00355 }
00356
00357
00358 Enumeration<Task<?>> enumeration = waiting.elements();
00359 while(enumeration.hasMoreElements()){
00360 Task<?> task = enumeration.nextElement();
00361 if(task.getFamilyId()==blackSheepTask.getFamilyId()){
00362 waiting.remove(task);
00363 task.getStats().exitWaitingState();
00364 }
00365 }
00366
00367
00368 enumeration = processing.elements();
00369 while(enumeration.hasMoreElements()){
00370 Task<?> task = enumeration.nextElement();
00371 if(task.getFamilyId()==blackSheepTask.getFamilyId()){
00372 task.setTainted(true);
00373 }
00374 }
00375 }
00376
00393 private Task<?> getRootTask(Task<?> task) throws PanicException{
00394 if(task.isRootTask()) return task;
00395
00396 if(this.waiting.contains(task.getFamilyId())){
00397 Task<?> root = waiting.remove(task.getFamilyId());
00398 root.getStats().exitWaitingState();
00399 return root;
00400 }
00401
00402 if(this.processing.contains(task.getFamilyId())){
00403 throw new PanicException("Error, root taskId="
00404 +task.getFamilyId()+" found in processing queue");
00405 }
00406
00407 for(Task<?> r:ready){
00408 if(r.getId()==task.getFamilyId()){
00409 throw new PanicException("Error, root taskId="
00410 +task.getFamilyId()+" found in ready queue");
00411 }
00412 }
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423 return null;
00424 }
00425
00426 private void kernelPanic(PanicException e){
00427 logger.error("Kernel Panic:"+e.getCause());
00428 this.panicException=e;
00429
00430 notifyAll();
00431 }
00432 }