00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.scheduler.policy;
00032
00033 import java.io.Serializable;
00034 import java.util.Collection;
00035 import java.util.HashMap;
00036 import java.util.Iterator;
00037 import java.util.Set;
00038 import java.util.Vector;
00039
00040 import org.apache.log4j.Logger;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.RunActive;
00044 import org.objectweb.proactive.Service;
00045 import org.objectweb.proactive.core.node.Node;
00046 import org.objectweb.proactive.core.util.log.Loggers;
00047 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00048 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00049 import org.objectweb.proactive.scheduler.Agent;
00050 import org.objectweb.proactive.scheduler.DeployedTask;
00051 import org.objectweb.proactive.scheduler.GenericJob;
00052 import org.objectweb.proactive.scheduler.JobConstants;
00053 import org.objectweb.proactive.scheduler.Queue;
00054 import org.objectweb.proactive.scheduler.QueueFullException;
00055 import org.objectweb.proactive.scheduler.RessourceManager;
00056 import org.objectweb.proactive.scheduler.SchedulerConstants;
00057
00058
00067 public abstract class AbstractPolicy implements Serializable, RunActive,
00068 JobConstants, SchedulerConstants {
00069 private static Logger logger = ProActiveLogger.getLogger(Loggers.JOB_MANAGER);
00070 protected RessourceManager ressourceManager;
00071 protected Queue queue;
00072 protected HashMap deployedTasks;
00073
00074 public AbstractPolicy() {
00075 }
00076
00082 public AbstractPolicy(RessourceManager ressourceManager) {
00083 if (ressourceManager != null) {
00084 deployedTasks = new HashMap();
00085 queue = new Queue();
00086 this.ressourceManager = ressourceManager;
00087 logger.debug("job manager created");
00088 }
00089 }
00090
00095 public void runActivity(Body body) {
00096 Service service = new Service(body);
00097
00098 while (true) {
00099 this.execute();
00100
00101 if (service.hasRequestToServe()) {
00102 service.blockingServeOldest();
00103 }
00104
00105 this.checkRunningTasks();
00106 }
00107 }
00108
00114 public void checkRunningTasks() {
00115 if (!this.deployedTasks.isEmpty()) {
00116 Set runningProcessIds = this.deployedTasks.keySet();
00117 Iterator iterator = runningProcessIds.iterator();
00118
00119 while (iterator.hasNext()) {
00120 String processId = (String) iterator.next();
00121 DeployedTask deployedTask = (DeployedTask) this.deployedTasks.get(processId);
00122
00123 if (!(deployedTask.isAlive()).booleanValue()) {
00124 this.deployedTasks.remove(processId);
00125
00126 GenericJob job = deployedTask.getTaskDescription();
00127 int ressourceNb = job.getRessourceNb() - 1;
00128 System.out.println("Task '" + job.getClassName() +
00129 "' finished ...");
00130 System.out.println("Freeing " + ressourceNb +
00131 ((ressourceNb > 1) ? " ressources ..." : " ressource ..."));
00132
00133 ressourceManager.freeNodes(job.getJobID(), true);
00134 job.setJobStatus("finished");
00135 logger.debug("job finished " + job.getJobID());
00136
00137 break;
00138 }
00139 }
00140 }
00141 }
00142
00148 public BooleanWrapper sub(GenericJob job) {
00149 try {
00150 job.setJobStatus("queued");
00151 queue.put(job);
00152
00153 logger.debug("job added to the queue");
00154
00155 return new BooleanWrapper(true);
00156 } catch (QueueFullException e) {
00157 logger.error("Couldn't add job to the queue. Queue full ...");
00158 return new BooleanWrapper(false);
00159 }
00160 }
00161
00167 public BooleanWrapper del(String jobId) {
00168 try {
00169 if (queue.remove(jobId) == null) {
00170 this.deployedTasks.remove(jobId);
00171 this.ressourceManager.freeNodes(jobId, false);
00172 }
00173 logger.debug("job deleted");
00174 return new BooleanWrapper(true);
00175 } catch (Exception e) {
00176 logger.error("couldn't delete job");
00177 return new BooleanWrapper(false);
00178 }
00179 }
00180
00187 public Vector stat(String jobId) {
00188 Vector vector = new Vector();
00189 GenericJob jobDescription;
00190 if (jobId != null) {
00191 jobDescription = queue.get(jobId);
00192 if (jobDescription == null) {
00193 DeployedTask deployedTask = (DeployedTask) this.deployedTasks.get(jobId);
00194 if (deployedTask != null) {
00195 jobDescription = deployedTask.getTaskDescription();
00196 }
00197 }
00198
00199 if (jobDescription != null) {
00200 vector.add(jobDescription);
00201 }
00202 } else {
00203 vector.addAll(this.queue.values());
00204
00205 Collection c = this.deployedTasks.values();
00206 Iterator iterator = c.iterator();
00207
00208 while (iterator.hasNext()) {
00209 DeployedTask deployedTask = (DeployedTask) iterator.next();
00210 vector.add(deployedTask.getTaskDescription());
00211 }
00212 }
00213 logger.debug("job status evoqued");
00214 return vector;
00215 }
00216
00224 abstract public boolean isToBeServed(GenericJob job1, GenericJob job2);
00225
00230 public String nextTask() {
00231
00232 String jobId = null;
00233 GenericJob job = null;
00234
00235
00236 String tmpId;
00237 GenericJob tmpTask;
00238
00239 Set set = queue.keySet();
00240 Iterator iterator = set.iterator();
00241
00242
00243 if (iterator.hasNext()) {
00244 jobId = (String) iterator.next();
00245 job = (GenericJob) queue.get(jobId);
00246 }
00247
00248
00249 while (iterator.hasNext()) {
00250 tmpId = (String) iterator.next();
00251
00252 tmpTask = (GenericJob) queue.get(tmpId);
00253
00254 if (isToBeServed(tmpTask, job)) {
00255 jobId = tmpId;
00256 job = tmpTask;
00257 }
00258 }
00259 logger.debug("next task evoked");
00260 return jobId;
00261 }
00262
00267 public HashMap getDeployedTasks() {
00268 return this.deployedTasks;
00269 }
00270
00276 public void execute() {
00277 int ressourceNb;
00278 int minRessourceNb;
00279
00280 if (!queue.isEmpty()) {
00281 String jobId = nextTask();
00282 GenericJob job = queue.get(jobId);
00283 ressourceNb = job.getRessourceNb();
00284 minRessourceNb = job.getMinNbOfNodes();
00285
00286 if (!ressourceManager.isAvailable(ressourceNb).booleanValue()) {
00287 if (!ressourceManager.isAvailable(minRessourceNb).booleanValue()) {
00288 return;
00289 } else {
00290 ressourceNb = minRessourceNb;
00291 }
00292 }
00293
00294
00295
00296 queue.remove(jobId);
00297
00298 try {
00299 Node node = ressourceManager.reserveNodes(jobId, ressourceNb);
00300
00301 Agent agent = null;
00302 String jobName = job.getClassName();
00303
00304
00305
00306
00307
00308
00309
00310 if (jobName != null) {
00311
00312 String xmlPath = job.getXMLFullPath();
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322 HashMap systemProperties = new HashMap();
00323 systemProperties.put(XML_DESC, xmlPath);
00324 systemProperties.put(JOB_ID, jobId);
00325
00326 agent = (Agent) ProActive.newActive(Agent.class.getName(),
00327 new Object[] { systemProperties }, node);
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341 ProActive.newMain(jobName, job.getMainParameters(), node);
00342
00343 System.out.println("Starting '" + jobName + "' job ...");
00344 }
00345
00346 System.out.println("Allocation of " + ressourceNb +
00347 ((ressourceNb > 1) ? " ressources ..." : " ressource ..."));
00348 job.setJobStatus("deployed");
00349 this.deployedTasks.put(jobId, new DeployedTask(job, agent));
00350 logger.debug("job deployed successfully");
00351 } catch (Exception e) {
00352 logger.error("error executing job");
00353 }
00354 }
00355 }
00356
00361 public void end() {
00362 }
00363
00368 public Queue getQueue() {
00369 return this.queue;
00370 }
00371 }