org/objectweb/proactive/scheduler/Scheduler.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.scheduler;
00032 
00033 import java.util.HashMap;
00034 import java.util.Vector;
00035 
00036 import org.apache.log4j.Logger;
00037 import org.objectweb.proactive.ProActive;
00038 import org.objectweb.proactive.core.node.Node;
00039 import org.objectweb.proactive.core.node.NodeFactory;
00040 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00041 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00042 import org.objectweb.proactive.core.util.log.Loggers;
00043 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00044 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00045 import org.objectweb.proactive.core.util.wrapper.StringMutableWrapper;
00046 import org.objectweb.proactive.scheduler.policy.*;
00047 
00048 
00054 public class Scheduler implements java.io.Serializable, SchedulerConstants {
00055     private static Logger logger = ProActiveLogger.getLogger(Loggers.SCHEDULER);
00056     static private Scheduler scheduler; // singleton
00057     private AbstractPolicy policy;
00058     private RessourceManager ressourceManager;
00059     private HashMap tmpJobs;
00060 
00061     public Scheduler() {
00062     }
00063 
00069     public Scheduler(String policyClass) {
00070         try {
00071             String nodeURL = "//localhost:" +
00072                 System.getProperty("proactive.rmi.port") + "/" +
00073                 SCHEDULER_NODE_NAME;
00074             this.tmpJobs = new HashMap();
00075             this.ressourceManager = (RessourceManager) ProActive.newActive(RessourceManager.class.getName(),
00076                     new Object[] { new BooleanWrapper(true) }, nodeURL);
00077 
00078             Object[] constructorParameters = new Object[1];
00079             constructorParameters[0] = this.ressourceManager;
00080             this.policy = (AbstractPolicy) ProActive.newActive(policyClass,
00081                     constructorParameters, nodeURL);
00082 
00083             Scheduler.scheduler = this;
00084             logger.debug("Scheduler created ...");
00085         } catch (Exception e) {
00086             // TODO Auto-generated catch block
00087             e.printStackTrace();
00088         }
00089     }
00090 
00091     public void init() {
00092     }
00093 
00099     public BooleanWrapper sub(GenericJob job) {
00100         logger.debug("sub method evoked ....");
00101 
00102         return this.policy.sub(job);
00103     }
00104 
00110     public BooleanWrapper del(String jobId) {
00111         logger.debug("del method evoked ....");
00112 
00113         return this.policy.del(jobId);
00114     }
00115 
00122     public Vector stat(String jobId) {
00123         logger.debug("stat method evoked ....");
00124         return this.policy.stat(jobId);
00125     }
00126 
00132     public Vector nodes(String nodeURL) {
00133         logger.debug("nodes method evoked ....");
00134         return this.ressourceManager.nodes(nodeURL);
00135     }
00136 
00144     static public void createScheduler(String policyName) {
00145         // tests if the service already exists
00146         String nodeURL = System.getProperty(SCHEDULER_URL) + "/" +
00147             SCHEDULER_NODE_NAME;
00148         Scheduler.scheduler = Scheduler.connectTo(nodeURL);
00149 
00150         if (Scheduler.scheduler == null) {
00151             //                  JVMProcessImpl rsh = new JVMProcessImpl(new StandardOutputMessageLogger());
00152             //          rsh.setClassname("org.objectweb.proactive.core.node.StartNode");
00153             //          rsh.setParameters(nodeURL);
00154             //          rsh.startProcess();                             
00155 
00156             //          Thread.sleep(3000);
00157 
00158             //String nodeURL = "//localhost/" + SCHEDULER_NODE_NAME;
00159             ProActiveRuntime paRuntime;
00160 
00161             try {
00162                 paRuntime = RuntimeFactory.getProtocolSpecificRuntime(System.getProperty(
00163                             "proactive.communication.protocol") + ":");
00164                 nodeURL = paRuntime.createLocalNode(SCHEDULER_NODE_NAME, false,
00165                         null, paRuntime.getVMInformation().getName(),
00166                         paRuntime.getJobID());
00167 
00168                 Object[] constructorParameters = { policyName };
00169                 scheduler = (Scheduler) ProActive.newActive(Scheduler.class.getName(),
00170                         constructorParameters, nodeURL);
00171 
00172                 logger.debug("created object scheduler");
00173                 //scheduler.init();
00174             } catch (Exception e) {
00175                 // TODO Auto-generated catch block
00176                 logger.error("error creating object scheduler");
00177             }
00178         }
00179     }
00180 
00185     public static void start(String policyName) {
00186         try {
00187             Scheduler.createScheduler(policyName);
00188             logger.debug("service started successfully");
00189         } catch (Exception e) {
00190             // TODO Auto-generated catch block
00191             logger.error("error starting service");
00192         }
00193     }
00194 
00200     public void end() {
00201         logger.debug("shutting down scheduler service");
00202         policy.end();
00203     }
00204 
00209     public StringMutableWrapper info() {
00210         return new StringMutableWrapper("Scheduler beta");
00211     }
00212 
00216     protected void finalize() {
00217         this.end();
00218         Scheduler.scheduler = null;
00219     }
00220 
00226     public StringMutableWrapper fetchJobDescription(String xmlDescriptorUrl) {
00227         try {
00228             // it is here that we shall enter the modifications if the submission of the 
00229             // xml file is remote...
00230             GenericJob tmp = new GenericJob();
00231             String xmlFileName = xmlDescriptorUrl.substring(xmlDescriptorUrl.lastIndexOf(
00232                         '/') + 1);
00233             String jobId = tmp.getJobID();
00234             tmp.setXMLDescriptorName(xmlFileName);
00235             tmp.setXMLFullPath(xmlDescriptorUrl);
00236             this.tmpJobs.put(jobId, tmp);
00237 
00238             ProActiveJobHandler h = new ProActiveJobHandler(this, jobId,
00239                     xmlDescriptorUrl);
00240             String uri = xmlDescriptorUrl;
00241             org.objectweb.proactive.core.xml.io.StreamReader sr = new org.objectweb.proactive.core.xml.io.StreamReader(new org.xml.sax.InputSource(
00242                         uri), h);
00243             sr.read();
00244 
00245             logger.debug("starting the parsing of the newly added XML file");
00246 
00247             return new StringMutableWrapper(jobId);
00248         } catch (Exception e) {
00249             // log it for futur reference ...
00250             logger.error("error parsing the file");
00251             return null;
00252         }
00253     }
00254 
00261     public static Scheduler connectTo(String schedulerURL) {
00262         try {
00263             Node node = NodeFactory.getNode(schedulerURL);
00264 
00265             Object[] ao = node.getActiveObjects(Scheduler.class.getName());
00266 
00267             if (ao.length == 1) {
00268                 logger.debug("scheduler object fetched");
00269                 return ((Scheduler) ao[0]);
00270             } else {
00271                 logger.debug("no scheduler object created");
00272                 return null;
00273             }
00274         } catch (Exception e) {
00275             logger.error("error connecting the scheduler service:" + e);
00276             return null;
00277         }
00278     }
00279 
00280     public static void main(String[] args) throws Exception {
00281         String policyName = "org.objectweb.proactive.scheduler.policy.FIFOPolicy";
00282         Scheduler.start(policyName);
00283         Thread.sleep(3000);
00284     }
00285 
00290     public Vector getNodes(int ressourceNb, int estimatedTime) {
00291         Object[] constructorParameters = new Object[3];
00292         constructorParameters[0] = this.ressourceManager;
00293         constructorParameters[1] = new Integer(ressourceNb);
00294         constructorParameters[2] = new Integer(estimatedTime);
00295         try {
00296             JobNoDescriptor job = (JobNoDescriptor) ProActive.newActive(JobNoDescriptor.class.getName(),
00297                     constructorParameters);
00298             GenericJob jobDescription = job.getJobDescription();
00299             this.sub(jobDescription);
00300             logger.debug("new object created in the queue");
00301             return job.getNodes();
00302         } catch (Exception e) {
00303             // TODO Auto-generated catch block
00304             logger.error("error submitting the command to the scheduler");
00305 
00306             return null;
00307         }
00308     }
00309 
00310     // ------------------
00311     // methods used for parsing .....
00312 
00316     public Node[] getReservedNodes(String jobID, int askedNodes) {
00317         logger.debug("returning reserved nodes");
00318         return this.ressourceManager.getNodes(jobID, askedNodes);
00319     }
00320 
00325     public void commit(String jobID) {
00326         if (this.tmpJobs.containsKey(jobID)) {
00327             System.out.println("committing the job's description");
00328 
00329             GenericJob job = (GenericJob) this.tmpJobs.remove(jobID);
00330             this.sub(job);
00331             logger.debug("commiting the job's description");
00332         } else {
00333             logger.debug("no object to commit");
00334         }
00335     }
00336 
00343     public GenericJob getTmpJob(String jobID) {
00344         if (this.tmpJobs.containsKey(jobID)) {
00345             return (GenericJob) this.tmpJobs.get(jobID);
00346         } else {
00347             return null;
00348         }
00349     }
00350 }

Generated on Mon Jan 22 15:16:12 2007 for ProActive by  doxygen 1.5.1