00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.scheduler.policy;
00032 
00033 import java.io.Serializable;
00034 import java.util.Collection;
00035 import java.util.HashMap;
00036 import java.util.Iterator;
00037 import java.util.Set;
00038 import java.util.Vector;
00039 
00040 import org.apache.log4j.Logger;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.RunActive;
00044 import org.objectweb.proactive.Service;
00045 import org.objectweb.proactive.core.node.Node;
00046 import org.objectweb.proactive.core.util.log.Loggers;
00047 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00048 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00049 import org.objectweb.proactive.scheduler.Agent;
00050 import org.objectweb.proactive.scheduler.DeployedTask;
00051 import org.objectweb.proactive.scheduler.GenericJob;
00052 import org.objectweb.proactive.scheduler.JobConstants;
00053 import org.objectweb.proactive.scheduler.Queue;
00054 import org.objectweb.proactive.scheduler.QueueFullException;
00055 import org.objectweb.proactive.scheduler.RessourceManager;
00056 import org.objectweb.proactive.scheduler.SchedulerConstants;
00057 
00058 
00067 public abstract class AbstractPolicy implements Serializable, RunActive,
00068     JobConstants, SchedulerConstants {
00069     private static Logger logger = ProActiveLogger.getLogger(Loggers.JOB_MANAGER);
00070     protected RessourceManager ressourceManager;
00071     protected Queue queue;
00072     protected HashMap deployedTasks;
00073 
00074     public AbstractPolicy() {
00075     }
00076 
00082     public AbstractPolicy(RessourceManager ressourceManager) {
00083         if (ressourceManager != null) {
00084             deployedTasks = new HashMap();
00085             queue = new Queue();
00086             this.ressourceManager = ressourceManager;
00087             logger.debug("job manager created");
00088         }
00089     }
00090 
00095     public void runActivity(Body body) {
00096         Service service = new Service(body);
00097 
00098         while (true) {
00099             this.execute();
00100 
00101             if (service.hasRequestToServe()) {
00102                 service.blockingServeOldest();
00103             }
00104 
00105             this.checkRunningTasks();
00106         }
00107     }
00108 
00114     public void checkRunningTasks() {
00115         if (!this.deployedTasks.isEmpty()) {
00116             Set runningProcessIds = this.deployedTasks.keySet();
00117             Iterator iterator = runningProcessIds.iterator();
00118 
00119             while (iterator.hasNext()) {
00120                 String processId = (String) iterator.next();
00121                 DeployedTask deployedTask = (DeployedTask) this.deployedTasks.get(processId);
00122 
00123                 if (!(deployedTask.isAlive()).booleanValue()) {
00124                     this.deployedTasks.remove(processId);
00125 
00126                     GenericJob job = deployedTask.getTaskDescription();
00127                     int ressourceNb = job.getRessourceNb() - 1;
00128                     System.out.println("Task '" + job.getClassName() +
00129                         "' finished ...");
00130                     System.out.println("Freeing " + ressourceNb +
00131                         ((ressourceNb > 1) ? " ressources ..." : " ressource ..."));
00132 
00133                     ressourceManager.freeNodes(job.getJobID(), true);
00134                     job.setJobStatus("finished");
00135                     logger.debug("job finished " + job.getJobID());
00136                     
00137                     break;
00138                 }
00139             }
00140         }
00141     }
00142 
00148     public BooleanWrapper sub(GenericJob job) {
00149         try {
00150             job.setJobStatus("queued");
00151             queue.put(job);
00152 
00153             logger.debug("job added to the queue");
00154 
00155             return new BooleanWrapper(true);
00156         } catch (QueueFullException e) {
00157             logger.error("Couldn't add job to the queue. Queue full ...");
00158             return new BooleanWrapper(false);
00159         }
00160     }
00161 
00167     public BooleanWrapper del(String jobId) {
00168         try {
00169             if (queue.remove(jobId) == null) {
00170                 this.deployedTasks.remove(jobId);
00171                 this.ressourceManager.freeNodes(jobId, false);
00172             }
00173             logger.debug("job deleted");
00174             return new BooleanWrapper(true);
00175         } catch (Exception e) {
00176             logger.error("couldn't delete job");
00177             return new BooleanWrapper(false);
00178         }
00179     }
00180 
00187     public Vector stat(String jobId) {
00188         Vector vector = new Vector();
00189         GenericJob jobDescription;
00190         if (jobId != null) {
00191             jobDescription = queue.get(jobId);
00192             if (jobDescription == null) {
00193                 DeployedTask deployedTask = (DeployedTask) this.deployedTasks.get(jobId);
00194                 if (deployedTask != null) {
00195                     jobDescription = deployedTask.getTaskDescription();
00196                 }
00197             }
00198 
00199             if (jobDescription != null) {
00200                 vector.add(jobDescription);
00201             }
00202         } else {
00203             vector.addAll(this.queue.values());
00204 
00205             Collection c = this.deployedTasks.values();
00206             Iterator iterator = c.iterator();
00207 
00208             while (iterator.hasNext()) {
00209                 DeployedTask deployedTask = (DeployedTask) iterator.next();
00210                 vector.add(deployedTask.getTaskDescription());
00211             }
00212         }
00213         logger.debug("job status evoqued");
00214         return vector;
00215     }
00216 
00224     abstract public boolean isToBeServed(GenericJob job1, GenericJob job2);
00225 
00230     public String nextTask() {
00231         
00232         String jobId = null;
00233         GenericJob job = null;
00234 
00235         
00236         String tmpId;
00237         GenericJob tmpTask;
00238 
00239         Set set = queue.keySet();
00240         Iterator iterator = set.iterator();
00241 
00242         
00243         if (iterator.hasNext()) {
00244             jobId = (String) iterator.next();
00245             job = (GenericJob) queue.get(jobId);
00246         }
00247 
00248         
00249         while (iterator.hasNext()) {
00250             tmpId = (String) iterator.next();
00251 
00252             tmpTask = (GenericJob) queue.get(tmpId);
00253 
00254             if (isToBeServed(tmpTask, job)) {
00255                 jobId = tmpId;
00256                 job = tmpTask;
00257             }
00258         }
00259         logger.debug("next task evoked");
00260         return jobId;
00261     }
00262 
00267     public HashMap getDeployedTasks() {
00268         return this.deployedTasks;
00269     }
00270 
00276     public void execute() {
00277         int ressourceNb;
00278         int minRessourceNb;
00279 
00280         if (!queue.isEmpty()) {
00281             String jobId = nextTask();
00282             GenericJob job = queue.get(jobId);
00283             ressourceNb = job.getRessourceNb();
00284             minRessourceNb = job.getMinNbOfNodes();
00285 
00286             if (!ressourceManager.isAvailable(ressourceNb).booleanValue()) {
00287                 if (!ressourceManager.isAvailable(minRessourceNb).booleanValue()) {
00288                     return;
00289                 } else {
00290                     ressourceNb = minRessourceNb;
00291                 }
00292             }
00293 
00294             
00295             
00296             queue.remove(jobId);
00297 
00298             try {
00299                 Node node = ressourceManager.reserveNodes(jobId, ressourceNb);
00300 
00301                 Agent agent = null;
00302                 String jobName = job.getClassName();
00303 
00304                 
00305                 
00306                 
00307                 
00308                 
00309                 
00310                 if (jobName != null) {
00311                     
00312                     String xmlPath = job.getXMLFullPath();
00313 
00314                     
00315 
00316 
00317 
00318 
00319 
00320 
00321 
00322                     HashMap systemProperties = new HashMap();
00323                     systemProperties.put(XML_DESC, xmlPath);
00324                     systemProperties.put(JOB_ID, jobId);
00325 
00326                     agent = (Agent) ProActive.newActive(Agent.class.getName(),
00327                             new Object[] { systemProperties }, node);
00328 
00329                     
00330                     
00331                     
00332                     
00333                     
00334                     
00335 
00336 
00337 
00338 
00339 
00340 
00341                     ProActive.newMain(jobName, job.getMainParameters(), node);
00342 
00343                     System.out.println("Starting '" + jobName + "' job ...");
00344                 }
00345 
00346                 System.out.println("Allocation of " + ressourceNb +
00347                     ((ressourceNb > 1) ? " ressources ..." : " ressource ..."));
00348                 job.setJobStatus("deployed");
00349                 this.deployedTasks.put(jobId, new DeployedTask(job, agent));
00350                 logger.debug("job deployed successfully");
00351             } catch (Exception e) {
00352                 logger.error("error executing job");
00353             }
00354         }
00355     }
00356 
00361     public void end() {
00362     }
00363 
00368     public Queue getQueue() {
00369         return this.queue;
00370     }
00371 }