org/objectweb/proactive/scheduler/policy/AbstractPolicy.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.scheduler.policy;
00032 
00033 import java.io.Serializable;
00034 import java.util.Collection;
00035 import java.util.HashMap;
00036 import java.util.Iterator;
00037 import java.util.Set;
00038 import java.util.Vector;
00039 
00040 import org.apache.log4j.Logger;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.RunActive;
00044 import org.objectweb.proactive.Service;
00045 import org.objectweb.proactive.core.node.Node;
00046 import org.objectweb.proactive.core.util.log.Loggers;
00047 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00048 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00049 import org.objectweb.proactive.scheduler.Agent;
00050 import org.objectweb.proactive.scheduler.DeployedTask;
00051 import org.objectweb.proactive.scheduler.GenericJob;
00052 import org.objectweb.proactive.scheduler.JobConstants;
00053 import org.objectweb.proactive.scheduler.Queue;
00054 import org.objectweb.proactive.scheduler.QueueFullException;
00055 import org.objectweb.proactive.scheduler.RessourceManager;
00056 import org.objectweb.proactive.scheduler.SchedulerConstants;
00057 
00058 
00067 public abstract class AbstractPolicy implements Serializable, RunActive,
00068     JobConstants, SchedulerConstants {
00069     private static Logger logger = ProActiveLogger.getLogger(Loggers.JOB_MANAGER);
00070     protected RessourceManager ressourceManager;
00071     protected Queue queue;
00072     protected HashMap deployedTasks;
00073 
00074     public AbstractPolicy() {
00075     }
00076 
00082     public AbstractPolicy(RessourceManager ressourceManager) {
00083         if (ressourceManager != null) {
00084             deployedTasks = new HashMap();
00085             queue = new Queue();
00086             this.ressourceManager = ressourceManager;
00087             logger.debug("job manager created");
00088         }
00089     }
00090 
00095     public void runActivity(Body body) {
00096         Service service = new Service(body);
00097 
00098         while (true) {
00099             this.execute();
00100 
00101             if (service.hasRequestToServe()) {
00102                 service.blockingServeOldest();
00103             }
00104 
00105             this.checkRunningTasks();
00106         }
00107     }
00108 
00114     public void checkRunningTasks() {
00115         if (!this.deployedTasks.isEmpty()) {
00116             Set runningProcessIds = this.deployedTasks.keySet();
00117             Iterator iterator = runningProcessIds.iterator();
00118 
00119             while (iterator.hasNext()) {
00120                 String processId = (String) iterator.next();
00121                 DeployedTask deployedTask = (DeployedTask) this.deployedTasks.get(processId);
00122 
00123                 if (!(deployedTask.isAlive()).booleanValue()) {
00124                     this.deployedTasks.remove(processId);
00125 
00126                     GenericJob job = deployedTask.getTaskDescription();
00127                     int ressourceNb = job.getRessourceNb() - 1;
00128                     System.out.println("Task '" + job.getClassName() +
00129                         "' finished ...");
00130                     System.out.println("Freeing " + ressourceNb +
00131                         ((ressourceNb > 1) ? " ressources ..." : " ressource ..."));
00132 
00133                     ressourceManager.freeNodes(job.getJobID(), true);
00134                     job.setJobStatus("finished");
00135                     logger.debug("job finished " + job.getJobID());
00136                     // if we want we can always keep track of the finished jobs ...
00137                     break;
00138                 }
00139             }
00140         }
00141     }
00142 
00148     public BooleanWrapper sub(GenericJob job) {
00149         try {
00150             job.setJobStatus("queued");
00151             queue.put(job);
00152 
00153             logger.debug("job added to the queue");
00154 
00155             return new BooleanWrapper(true);
00156         } catch (QueueFullException e) {
00157             logger.error("Couldn't add job to the queue. Queue full ...");
00158             return new BooleanWrapper(false);
00159         }
00160     }
00161 
00167     public BooleanWrapper del(String jobId) {
00168         try {
00169             if (queue.remove(jobId) == null) {
00170                 this.deployedTasks.remove(jobId);
00171                 this.ressourceManager.freeNodes(jobId, false);
00172             }
00173             logger.debug("job deleted");
00174             return new BooleanWrapper(true);
00175         } catch (Exception e) {
00176             logger.error("couldn't delete job");
00177             return new BooleanWrapper(false);
00178         }
00179     }
00180 
00187     public Vector stat(String jobId) {
00188         Vector vector = new Vector();
00189         GenericJob jobDescription;
00190         if (jobId != null) {
00191             jobDescription = queue.get(jobId);
00192             if (jobDescription == null) {
00193                 DeployedTask deployedTask = (DeployedTask) this.deployedTasks.get(jobId);
00194                 if (deployedTask != null) {
00195                     jobDescription = deployedTask.getTaskDescription();
00196                 }
00197             }
00198 
00199             if (jobDescription != null) {
00200                 vector.add(jobDescription);
00201             }
00202         } else {
00203             vector.addAll(this.queue.values());
00204 
00205             Collection c = this.deployedTasks.values();
00206             Iterator iterator = c.iterator();
00207 
00208             while (iterator.hasNext()) {
00209                 DeployedTask deployedTask = (DeployedTask) iterator.next();
00210                 vector.add(deployedTask.getTaskDescription());
00211             }
00212         }
00213         logger.debug("job status evoqued");
00214         return vector;
00215     }
00216 
00224     abstract public boolean isToBeServed(GenericJob job1, GenericJob job2);
00225 
00230     public String nextTask() {
00231         // TODO Auto-generated method stub
00232         String jobId = null;
00233         GenericJob job = null;
00234 
00235         // TODO Auto-generated method stub
00236         String tmpId;
00237         GenericJob tmpTask;
00238 
00239         Set set = queue.keySet();
00240         Iterator iterator = set.iterator();
00241 
00242         // initialize jobId and job wwith the first job in the queue
00243         if (iterator.hasNext()) {
00244             jobId = (String) iterator.next();
00245             job = (GenericJob) queue.get(jobId);
00246         }
00247 
00248         // try to find the appropriate job to be served
00249         while (iterator.hasNext()) {
00250             tmpId = (String) iterator.next();
00251 
00252             tmpTask = (GenericJob) queue.get(tmpId);
00253 
00254             if (isToBeServed(tmpTask, job)) {
00255                 jobId = tmpId;
00256                 job = tmpTask;
00257             }
00258         }
00259         logger.debug("next task evoked");
00260         return jobId;
00261     }
00262 
00267     public HashMap getDeployedTasks() {
00268         return this.deployedTasks;
00269     }
00270 
00276     public void execute() {
00277         int ressourceNb;
00278         int minRessourceNb;
00279 
00280         if (!queue.isEmpty()) {
00281             String jobId = nextTask();
00282             GenericJob job = queue.get(jobId);
00283             ressourceNb = job.getRessourceNb();
00284             minRessourceNb = job.getMinNbOfNodes();
00285 
00286             if (!ressourceManager.isAvailable(ressourceNb).booleanValue()) {
00287                 if (!ressourceManager.isAvailable(minRessourceNb).booleanValue()) {
00288                     return;
00289                 } else {
00290                     ressourceNb = minRessourceNb;
00291                 }
00292             }
00293 
00294             // execute job ....
00295             // the job has to signal the ressources available after finishing it's processing ...
00296             queue.remove(jobId);
00297 
00298             try {
00299                 Node node = ressourceManager.reserveNodes(jobId, ressourceNb);
00300 
00301                 Agent agent = null;
00302                 String jobName = job.getClassName();
00303 
00304                 // Here we may have one of 2 cases:
00305                 //  1- the job we're running is already running meaning
00306                 //     that it's a job without an XML deployment descriptor
00307                 //     and is asking in real time programming for nodes
00308                 //     from the scheduler service
00309                 //  2- the job isn't created yet and we need to deploy it
00310                 if (jobName != null) {
00311                     // here we shall have to test that the job is non local:
00312                     String xmlPath = job.getXMLFullPath();
00313 
00314                     /*                        if (jobName != null) {
00315                                                     xmlPath = "/user/cjarjouh/home/";
00316                                                     File localSource = new File(job.getXMLFullPath());
00317                                                     File remoteDest = new File(xmlPath);
00318                                                     FileVector filePushed =FileTransfer.pushFile(node,localSource, remoteDest);
00319                                                     filePushed.waitForAll();  //wait for push to finish
00320                                             }
00321                      */
00322                     HashMap systemProperties = new HashMap();
00323                     systemProperties.put(XML_DESC, xmlPath);
00324                     systemProperties.put(JOB_ID, jobId);
00325 
00326                     agent = (Agent) ProActive.newActive(Agent.class.getName(),
00327                             new Object[] { systemProperties }, node);
00328 
00329                     // we need here to check if the node is local or remote to
00330                     // find out if we have to do some file transfering
00331                     // and we also have to set the system property file ...
00332                     // we also need to put a try catch here to find if the job is in
00333                     // the class path and maybe we shall need some file transfert:
00334                     /*
00335                      *                                         // we shall have to get the node of the scheduler first then do a transfert
00336                                         File remoteSource = new File("/remote/source/path/file");
00337                                             File localDest = new File("/local/destination/path/file");
00338                                             FileVector filePulled = FileTransfer.pullFile(examplenode[0], remoteSource, localDest);
00339                                             File  file = filePulled.getFile(0); //wait for pull to finish
00340                     */
00341                     ProActive.newMain(jobName, job.getMainParameters(), node);
00342 
00343                     System.out.println("Starting '" + jobName + "' job ...");
00344                 }
00345 
00346                 System.out.println("Allocation of " + ressourceNb +
00347                     ((ressourceNb > 1) ? " ressources ..." : " ressource ..."));
00348                 job.setJobStatus("deployed");
00349                 this.deployedTasks.put(jobId, new DeployedTask(job, agent));
00350                 logger.debug("job deployed successfully");
00351             } catch (Exception e) {
00352                 logger.error("error executing job");
00353             }
00354         }
00355     }
00356 
00361     public void end() {
00362     }
00363 
00368     public Queue getQueue() {
00369         return this.queue;
00370     }
00371 }

Generated on Mon Jan 22 15:16:12 2007 for ProActive by  doxygen 1.5.1