00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.scheduler;
00032
00033 import java.util.HashMap;
00034 import java.util.Vector;
00035
00036 import org.apache.log4j.Logger;
00037 import org.objectweb.proactive.ProActive;
00038 import org.objectweb.proactive.core.node.Node;
00039 import org.objectweb.proactive.core.node.NodeFactory;
00040 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00041 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00042 import org.objectweb.proactive.core.util.log.Loggers;
00043 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00044 import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
00045 import org.objectweb.proactive.core.util.wrapper.StringMutableWrapper;
00046 import org.objectweb.proactive.scheduler.policy.*;
00047
00048
00054 public class Scheduler implements java.io.Serializable, SchedulerConstants {
00055 private static Logger logger = ProActiveLogger.getLogger(Loggers.SCHEDULER);
00056 static private Scheduler scheduler;
00057 private AbstractPolicy policy;
00058 private RessourceManager ressourceManager;
00059 private HashMap tmpJobs;
00060
00061 public Scheduler() {
00062 }
00063
00069 public Scheduler(String policyClass) {
00070 try {
00071 String nodeURL = "//localhost:" +
00072 System.getProperty("proactive.rmi.port") + "/" +
00073 SCHEDULER_NODE_NAME;
00074 this.tmpJobs = new HashMap();
00075 this.ressourceManager = (RessourceManager) ProActive.newActive(RessourceManager.class.getName(),
00076 new Object[] { new BooleanWrapper(true) }, nodeURL);
00077
00078 Object[] constructorParameters = new Object[1];
00079 constructorParameters[0] = this.ressourceManager;
00080 this.policy = (AbstractPolicy) ProActive.newActive(policyClass,
00081 constructorParameters, nodeURL);
00082
00083 Scheduler.scheduler = this;
00084 logger.debug("Scheduler created ...");
00085 } catch (Exception e) {
00086
00087 e.printStackTrace();
00088 }
00089 }
00090
00091 public void init() {
00092 }
00093
00099 public BooleanWrapper sub(GenericJob job) {
00100 logger.debug("sub method evoked ....");
00101
00102 return this.policy.sub(job);
00103 }
00104
00110 public BooleanWrapper del(String jobId) {
00111 logger.debug("del method evoked ....");
00112
00113 return this.policy.del(jobId);
00114 }
00115
00122 public Vector stat(String jobId) {
00123 logger.debug("stat method evoked ....");
00124 return this.policy.stat(jobId);
00125 }
00126
00132 public Vector nodes(String nodeURL) {
00133 logger.debug("nodes method evoked ....");
00134 return this.ressourceManager.nodes(nodeURL);
00135 }
00136
00144 static public void createScheduler(String policyName) {
00145
00146 String nodeURL = System.getProperty(SCHEDULER_URL) + "/" +
00147 SCHEDULER_NODE_NAME;
00148 Scheduler.scheduler = Scheduler.connectTo(nodeURL);
00149
00150 if (Scheduler.scheduler == null) {
00151
00152
00153
00154
00155
00156
00157
00158
00159 ProActiveRuntime paRuntime;
00160
00161 try {
00162 paRuntime = RuntimeFactory.getProtocolSpecificRuntime(System.getProperty(
00163 "proactive.communication.protocol") + ":");
00164 nodeURL = paRuntime.createLocalNode(SCHEDULER_NODE_NAME, false,
00165 null, paRuntime.getVMInformation().getName(),
00166 paRuntime.getJobID());
00167
00168 Object[] constructorParameters = { policyName };
00169 scheduler = (Scheduler) ProActive.newActive(Scheduler.class.getName(),
00170 constructorParameters, nodeURL);
00171
00172 logger.debug("created object scheduler");
00173
00174 } catch (Exception e) {
00175
00176 logger.error("error creating object scheduler");
00177 }
00178 }
00179 }
00180
00185 public static void start(String policyName) {
00186 try {
00187 Scheduler.createScheduler(policyName);
00188 logger.debug("service started successfully");
00189 } catch (Exception e) {
00190
00191 logger.error("error starting service");
00192 }
00193 }
00194
00200 public void end() {
00201 logger.debug("shutting down scheduler service");
00202 policy.end();
00203 }
00204
00209 public StringMutableWrapper info() {
00210 return new StringMutableWrapper("Scheduler beta");
00211 }
00212
00216 protected void finalize() {
00217 this.end();
00218 Scheduler.scheduler = null;
00219 }
00220
00226 public StringMutableWrapper fetchJobDescription(String xmlDescriptorUrl) {
00227 try {
00228
00229
00230 GenericJob tmp = new GenericJob();
00231 String xmlFileName = xmlDescriptorUrl.substring(xmlDescriptorUrl.lastIndexOf(
00232 '/') + 1);
00233 String jobId = tmp.getJobID();
00234 tmp.setXMLDescriptorName(xmlFileName);
00235 tmp.setXMLFullPath(xmlDescriptorUrl);
00236 this.tmpJobs.put(jobId, tmp);
00237
00238 ProActiveJobHandler h = new ProActiveJobHandler(this, jobId,
00239 xmlDescriptorUrl);
00240 String uri = xmlDescriptorUrl;
00241 org.objectweb.proactive.core.xml.io.StreamReader sr = new org.objectweb.proactive.core.xml.io.StreamReader(new org.xml.sax.InputSource(
00242 uri), h);
00243 sr.read();
00244
00245 logger.debug("starting the parsing of the newly added XML file");
00246
00247 return new StringMutableWrapper(jobId);
00248 } catch (Exception e) {
00249
00250 logger.error("error parsing the file");
00251 return null;
00252 }
00253 }
00254
00261 public static Scheduler connectTo(String schedulerURL) {
00262 try {
00263 Node node = NodeFactory.getNode(schedulerURL);
00264
00265 Object[] ao = node.getActiveObjects(Scheduler.class.getName());
00266
00267 if (ao.length == 1) {
00268 logger.debug("scheduler object fetched");
00269 return ((Scheduler) ao[0]);
00270 } else {
00271 logger.debug("no scheduler object created");
00272 return null;
00273 }
00274 } catch (Exception e) {
00275 logger.error("error connecting the scheduler service:" + e);
00276 return null;
00277 }
00278 }
00279
00280 public static void main(String[] args) throws Exception {
00281 String policyName = "org.objectweb.proactive.scheduler.policy.FIFOPolicy";
00282 Scheduler.start(policyName);
00283 Thread.sleep(3000);
00284 }
00285
00290 public Vector getNodes(int ressourceNb, int estimatedTime) {
00291 Object[] constructorParameters = new Object[3];
00292 constructorParameters[0] = this.ressourceManager;
00293 constructorParameters[1] = new Integer(ressourceNb);
00294 constructorParameters[2] = new Integer(estimatedTime);
00295 try {
00296 JobNoDescriptor job = (JobNoDescriptor) ProActive.newActive(JobNoDescriptor.class.getName(),
00297 constructorParameters);
00298 GenericJob jobDescription = job.getJobDescription();
00299 this.sub(jobDescription);
00300 logger.debug("new object created in the queue");
00301 return job.getNodes();
00302 } catch (Exception e) {
00303
00304 logger.error("error submitting the command to the scheduler");
00305
00306 return null;
00307 }
00308 }
00309
00310
00311
00312
00316 public Node[] getReservedNodes(String jobID, int askedNodes) {
00317 logger.debug("returning reserved nodes");
00318 return this.ressourceManager.getNodes(jobID, askedNodes);
00319 }
00320
00325 public void commit(String jobID) {
00326 if (this.tmpJobs.containsKey(jobID)) {
00327 System.out.println("committing the job's description");
00328
00329 GenericJob job = (GenericJob) this.tmpJobs.remove(jobID);
00330 this.sub(job);
00331 logger.debug("commiting the job's description");
00332 } else {
00333 logger.debug("no object to commit");
00334 }
00335 }
00336
00343 public GenericJob getTmpJob(String jobID) {
00344 if (this.tmpJobs.containsKey(jobID)) {
00345 return (GenericJob) this.tmpJobs.get(jobID);
00346 } else {
00347 return null;
00348 }
00349 }
00350 }