org/objectweb/proactive/branchnbound/core/Manager.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.branchnbound.core;
00032 
00033 import java.io.FileNotFoundException;
00034 import java.io.FileOutputStream;
00035 import java.io.InputStream;
00036 import java.io.Serializable;
00037 import java.util.Collection;
00038 import java.util.Vector;
00039 
00040 import org.apache.log4j.Logger;
00041 import org.objectweb.proactive.ActiveObjectCreationException;
00042 import org.objectweb.proactive.Body;
00043 import org.objectweb.proactive.InitActive;
00044 import org.objectweb.proactive.ProActive;
00045 import org.objectweb.proactive.branchnbound.core.queue.TaskQueue;
00046 import org.objectweb.proactive.core.ProActiveRuntimeException;
00047 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00048 import org.objectweb.proactive.core.exceptions.proxy.FailedGroupRendezVousException;
00049 import org.objectweb.proactive.core.group.Group;
00050 import org.objectweb.proactive.core.group.ProActiveGroup;
00051 import org.objectweb.proactive.core.mop.ClassNotReifiableException;
00052 import org.objectweb.proactive.core.node.Node;
00053 import org.objectweb.proactive.core.node.NodeException;
00054 import org.objectweb.proactive.core.util.log.Loggers;
00055 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00056 
00057 
00058 
00066 public class Manager implements Serializable, InitActive {
00067     private static Logger logger = ProActiveLogger.getLogger(Loggers.P2P_SKELETONS_MANAGER);
00068     private static final boolean enableRealloc = false; // TODO turn it
00069     private static final boolean enableBackup = false; // TODO turn it
00070     private static final int backupTask = 10; // TODO turn it configurable
00071     private static final String backupResultFile = System.getProperty(
00072             "user.home") + System.getProperty("file.separator") +
00073         "framework.results.backup"; // TODO turn it configurable
00074     public static final String backupTaskFile = System.getProperty("user.home") +
00075         System.getProperty("file.separator") + "framework.tasks.backup"; // TODO turn it configurable 
00076     private Task rootTask = null;
00077     private TaskQueue taskProviderQueue = null;
00078 
00079     // Worker nodes
00080     private Node[] nodes = null;
00081     private Node[][] arrayOfNodes = null;
00082     private VirtualNode[] arrayOfVns = null;
00083 
00084     // Worker group
00085     private Worker workerGroup = null;
00086 
00087     // managing task
00088     private Vector<Result> futureTaskList = new Vector<Result>();
00089     private Vector<Task> pendingTaskList = new Vector<Task>();
00090     private Vector<Worker> workingWorkerList = new Vector<Worker>();
00091     private Vector<Worker> freeWorkerList = new Vector<Worker>();
00092     private Vector<Task> toReallocTaskList = new Vector<Task>();
00093 
00097     public Manager() {
00098         // nothing to do
00099     }
00100 
00107     private Manager(Task root, Node myNode, String queueType) {
00108         // Activate the root task
00109         try {
00110             this.rootTask = (Task) ProActive.turnActive(root, myNode);
00111         } catch (ActiveObjectCreationException e) {
00112             logger.fatal("Problem with the turn active of the root task", e);
00113             throw new RuntimeException(e);
00114         } catch (NodeException e) {
00115             logger.fatal("Problem with the node of the root task", e);
00116             throw new RuntimeException(e);
00117         }
00118 
00119         // Activate the task queue
00120         try {
00121             if (logger.isInfoEnabled()) {
00122                 logger.info("Activing the task queue: " + queueType);
00123             }
00124             this.taskProviderQueue = (TaskQueue) ProActive.newActive(queueType,
00125                     null, myNode);
00126         } catch (ActiveObjectCreationException e1) {
00127             logger.fatal("Couldn't create the Task Provider", e1);
00128             throw new ProActiveRuntimeException(e1);
00129         } catch (NodeException e1) {
00130             logger.fatal("Couldn't create the Task Provider", e1);
00131             throw new ProActiveRuntimeException(e1);
00132         }
00133     }
00134 
00142     public Manager(Task root, Node myNode, Node[] nodes, String queueType) {
00143         this(root, myNode, queueType);
00144         this.nodes = nodes;
00145     }
00146 
00155     public Manager(Task root, Node myNode, Node[][] nodes, String queueType) {
00156         this(root, myNode, queueType);
00157         this.arrayOfNodes = nodes;
00158     }
00159 
00169     public Manager(Task root, Node myNode, VirtualNode[] virtualNodes,
00170         String queueType) {
00171         this(root, myNode, queueType);
00172         this.arrayOfVns = virtualNodes;
00173     }
00174 
00180     public void initActivity(Body body) {
00181         // All asynchronous call on the root Task
00182         logger.info("Compute the lower bound for the root task");
00183         this.rootTask.initLowerBound();
00184         logger.info("Compute the upper bound for the root task");
00185         this.rootTask.initUpperBound();
00186         logger.info("Calling for the first time split on the root task");
00187         Vector subTaskList = this.rootTask.split();
00188         logger.info("The ROOT task sends " + subTaskList.size());
00189         this.taskProviderQueue.addAll(subTaskList);
00190 
00191         // Group of Worker
00192         try {
00193             Object[] args = new Object[] { this.taskProviderQueue };
00194 
00195             // TODO Factoring
00196             if (this.nodes != null) {
00197                 logger.info("Manager is deploying a group of workers");
00198                 // Node[]
00199                 long singleStartTime = System.currentTimeMillis();
00200                 this.workerGroup = (Worker) ProActiveGroup.newGroupInParallel(Worker.class.getName(),
00201                         args, this.nodes);
00202                 ProActive.addNFEListenerOnGroup(this.workerGroup,
00203                     FailedGroupRendezVousException.AUTO_GROUP_PURGE);
00204                 this.freeWorkerList.addAll(ProActiveGroup.getGroup(
00205                         this.workerGroup));
00206                 long singleEndTime = System.currentTimeMillis();
00207                 if (logger.isInfoEnabled()) {
00208                     logger.info("The  Group was created in " +
00209                         (singleEndTime - singleStartTime) + " ms");
00210                 }
00211             } else if ((this.arrayOfNodes != null) &&
00212                     (this.arrayOfNodes.length > 0)) {
00213                 logger.info("Manager is deploying " + this.arrayOfNodes.length +
00214                     " groups of workers");
00215                 // Node[][]
00216                 this.workerGroup = (Worker) ProActiveGroup.newGroup(Worker.class.getName());
00217                 Group<Worker> mainGroup = ProActiveGroup.getGroup(this.workerGroup);
00218                 for (int i = 0; i < this.arrayOfNodes.length; i++) {
00219                     GroupThread gt = new GroupThread(this.arrayOfNodes[i],
00220                             args, mainGroup);
00221                     new Thread(gt).start();
00222                 }
00223             } else if ((this.arrayOfVns != null) &&
00224                     (this.arrayOfVns.length > 0)) {
00225                 logger.info("Manager is deploying " + this.arrayOfVns.length +
00226                     " groups of workers");
00227                 // VN []
00228                 this.workerGroup = (Worker) ProActiveGroup.newGroup(Worker.class.getName());
00229                 Group<Worker> vnGroup = ProActiveGroup.getGroup(this.workerGroup);
00230                 for (int i = 0; i < this.arrayOfVns.length; i++) {
00231                     VnThread vt = new VnThread(this.arrayOfVns[i], args, vnGroup);
00232                     new Thread(vt).start();
00233                 }
00234             } else {
00235                 logger.fatal("No nodes for distributing the computation");
00236                 throw new ProActiveRuntimeException(
00237                     "No nodes for distributing the computation");
00238             }
00239         } catch (ClassNotReifiableException e) {
00240             logger.fatal("The Worker is not reifiable", e);
00241             throw new ProActiveRuntimeException(e);
00242         } catch (ActiveObjectCreationException e) {
00243             logger.fatal("Problem with active objects creation", e);
00244             throw new ProActiveRuntimeException(e);
00245         } catch (NodeException e) {
00246             logger.fatal("Problem with a node", e);
00247         } catch (ClassNotFoundException e) {
00248             logger.fatal("The class for worker was not found", e);
00249             throw new ProActiveRuntimeException(e);
00250         }
00251 
00252         Group<Worker> groupOfWorkers = ProActiveGroup.getGroup(this.workerGroup);
00253         this.workerGroup.setWorkerGroup(this.workerGroup);
00254 
00255         if (logger.isInfoEnabled()) {
00256             logger.info("Manager successfuly activate with " +
00257                 this.freeWorkerList.size() + " workers");
00258         }
00259 
00260         this.nodes = null;
00261         this.arrayOfNodes = null;
00262     }
00263 
00268     public Result start() {
00269         logger.info("Starting computation");
00270         // Nothing to do if the manager is not actived
00271         if (!ProActive.getBodyOnThis().isActive()) {
00272             logger.fatal("The manager is not active");
00273             throw new ProActiveRuntimeException("The manager is not active");
00274         }
00275 
00276         int backupCounter = 0;
00277         int reallocCounter = 0;
00278 
00279         // Serving requests and waiting for results
00280         boolean hasNext;
00281         while ((hasNext = this.taskProviderQueue.hasNext().booleanValue()) ||
00282                 (this.pendingTaskList.size() != 0) ||
00283                 (!this.toReallocTaskList.isEmpty())) {
00284             boolean hasAddedTask = false;
00285             if (!this.toReallocTaskList.isEmpty() &&
00286                     !this.freeWorkerList.isEmpty()) {
00287                 Task tReallocated = this.toReallocTaskList.remove(0);
00288                 try {
00289                     this.assignTaskToWorker(this.freeWorkerList.remove(
00290                             0), tReallocated);
00291                     logger.info("A task just reallocated");
00292                 } catch (Exception e) {
00293                     logger.info("A worker is down");
00294                     this.toReallocTaskList.add(tReallocated);
00295                 }
00296             }
00297             try {
00298                 if (hasNext && (this.freeWorkerList.size() > 0)) {
00299                     if ((this.freeWorkerList.size() > 0)) {
00300                         Task t = this.taskProviderQueue.next();
00301                         try {
00302                             this.assignTaskToWorker(this.freeWorkerList.remove(
00303                                     0), t);
00304                             hasAddedTask = true;
00305                         } catch (Exception e) {
00306                             logger.info("A worker is down");
00307                             this.taskProviderQueue.addTask(t);
00308                         }
00309                     } else if (this.futureTaskList.size() == 0) {
00310                         // Waiting workers
00311                         try {
00312                             Thread.sleep(100);
00313                         } catch (InterruptedException e) {
00314                         }
00315                         continue;
00316                     }
00317                     if (hasAddedTask && logger.isInfoEnabled()) {
00318                         logger.info("Pending tasks: " +
00319                             this.pendingTaskList.size() +
00320                             " - Achivied tasks: " +
00321                             this.taskProviderQueue.howManyResults() +
00322                             " - Not calculated tasks: " +
00323                             this.taskProviderQueue.size());
00324                         continue;
00325                     }
00326                 }
00327 
00328                 try {
00329                     int index = ProActive.waitForAny(this.futureTaskList, 1000);
00330                     backupCounter++;
00331                     Result currentResult = this.futureTaskList.remove(index);
00332                     this.taskProviderQueue.addResult(currentResult);
00333                     this.pendingTaskList.remove(index);
00334                     Worker freeWorker = this.workingWorkerList.remove(index);
00335                     if (this.taskProviderQueue.hasNext().booleanValue()) {
00336                         Task t1 = this.taskProviderQueue.next();
00337                         try {
00338                             this.assignTaskToWorker(freeWorker, t1);
00339                         } catch (Exception e) {
00340                             logger.info("A worker is down");
00341                             this.taskProviderQueue.addTask(t1);
00342                         }
00343                     } else {
00344                         this.freeWorkerList.add(freeWorker);
00345                     }
00346                     if (logger.isDebugEnabled()) {
00347                         logger.debug(currentResult);
00348                     }
00349                     if (logger.isInfoEnabled()) {
00350                         logger.info("Pending tasks: " +
00351                             this.pendingTaskList.size() +
00352                             " - Achivied tasks: " +
00353                             this.taskProviderQueue.howManyResults() +
00354                             " - Not calculated tasks: " +
00355                             this.taskProviderQueue.size());
00356                     }
00357                     if (enableBackup && ((backupCounter % backupTask) == 0)) {
00358                         this.backupAll(this.rootTask);
00359                     }
00360                 } catch (Exception e) {
00361                     if (logger.isDebugEnabled()) {
00362                         logger.debug("Manager is waiting for result: " + e);
00363                     }
00364                     reallocCounter++;
00365                     // Reallocating tasks
00366                     if (enableRealloc && !this.freeWorkerList.isEmpty() &&
00367                             (reallocCounter == 60)) {
00368                         reallocCounter = 0;
00369                         for (int i = 0; i < this.workingWorkerList.size();
00370                                 i++) {
00371                             Worker current = this.workingWorkerList.get(i);
00372                             try {
00373                                 current.alive();
00374                             } catch (Exception down) {
00375                                 this.futureTaskList.remove(i);
00376                                 this.workingWorkerList.remove(i);
00377                                 this.toReallocTaskList.add(this.pendingTaskList.remove(
00378                                         i));
00379                             }
00380                         }
00381                     }
00382                     continue;
00383                 }
00384             } catch (Exception e) {
00385                 continue;
00386             }
00387         }
00388         logger.info("Total of results = " +
00389             this.taskProviderQueue.howManyResults());
00390         logger.info("Total of tasks = " + this.taskProviderQueue.size());
00391         // Set the final result
00392         Collection resultsFuture = this.taskProviderQueue.getAllResults();
00393         ProActive.waitFor(resultsFuture);
00394         Result[] results = (Result[]) resultsFuture.toArray(new Result[this.taskProviderQueue.howManyResults()
00395                                                                                              .intValue()]);
00396         return this.rootTask.gather(results);
00397     }
00398 
00404     public Result start(Task rootTask) {
00405         this.taskProviderQueue.flushAll();
00406 
00407         this.workerGroup.reset();
00408 
00409         try {
00410             this.rootTask = (Task) ProActive.turnActive(rootTask,
00411                     ProActive.getBodyOnThis().getNodeURL());
00412         } catch (ActiveObjectCreationException e) {
00413             logger.fatal("Problem with the turn active of the root task", e);
00414             throw new RuntimeException(e);
00415         } catch (NodeException e) {
00416             logger.fatal("Problem with the node of the root task", e);
00417             throw new RuntimeException(e);
00418         }
00419 
00420         // Spliting
00421         logger.info("Compute the lower bound for the root task");
00422         this.rootTask.initLowerBound();
00423         logger.info("Compute the upper bound for the root task");
00424         this.rootTask.initUpperBound();
00425         logger.info("Calling for the first time split on the root task");
00426         Vector subTaskList = this.rootTask.split();
00427         logger.info("The ROOT task sends " + subTaskList.size());
00428         this.taskProviderQueue.addAll(subTaskList);
00429 
00430         return ((Manager) ProActive.getStubOnThis()).start();
00431     }
00432 
00439     public Result start(InputStream task, InputStream result) {
00440         this.loadTasks(task);
00441         this.loadResults(result);
00442         return ((Manager) ProActive.getStubOnThis()).start();
00443     }
00444 
00450     public void setHungryLevel(int level) {
00451         assert this.taskProviderQueue != null : "Manager is not active";
00452         this.taskProviderQueue.setHungryLevel(level);
00453     }
00454 
00455     // -------------------------------------------------------------------------
00456     // Private methods
00457     // -------------------------------------------------------------------------
00458 
00467     private void assignTaskToWorker(Worker worker, Task task)
00468         throws Exception {
00469         this.futureTaskList.add(worker.execute(task));
00470         this.pendingTaskList.add(task);
00471         this.workingWorkerList.add(worker);
00472     }
00473 
00478     private void backupAll(Task rootTask) {
00479         logger.info("Backuping");
00480         try {
00481             this.taskProviderQueue.backupResults(new FileOutputStream(
00482                     backupResultFile));
00483             this.taskProviderQueue.backupTasks(rootTask, this.pendingTaskList,
00484                 new FileOutputStream(backupTaskFile));
00485         } catch (FileNotFoundException e) {
00486             logger.fatal("Problem with backup", e);
00487             throw new ProActiveRuntimeException(e);
00488         }
00489     }
00490 
00495     private void loadTasks(InputStream taskFile) {
00496         if (!ProActive.getBodyOnThis().isActive()) {
00497             logger.fatal("The manager is not active");
00498             throw new ProActiveRuntimeException("The manager is not active");
00499         }
00500         this.taskProviderQueue.loadTasks(taskFile);
00501         this.taskProviderQueue.getRootTaskFromBackup();
00502         try {
00503             this.rootTask = (Task) ProActive.turnActive(this.taskProviderQueue.getRootTaskFromBackup(),
00504                     ProActive.getBodyOnThis().getNodeURL());
00505         } catch (ActiveObjectCreationException e) {
00506             logger.fatal("Problem with the turn active of the root task", e);
00507             throw new RuntimeException(e);
00508         } catch (NodeException e) {
00509             logger.fatal("Problem with the node of the root task", e);
00510             throw new RuntimeException(e);
00511         }
00512     }
00513 
00518     private void loadResults(InputStream resultFile) {
00519         this.taskProviderQueue.loadResults(resultFile);
00520     }
00521 
00522     // -------------------------------------------------------------------------
00523     // Inner Threads for groups creation and activing deploying
00524     // -------------------------------------------------------------------------
00525 
00533     private class GroupThread implements Runnable {
00534         private Node[] nodes;
00535         private Object[] args;
00536         private Group<Worker> group;
00537 
00538         public GroupThread(Node[] nodes, Object[] args, Group<Worker> group) {
00539             this.nodes = nodes;
00540             this.args = args;
00541             this.group = group;
00542         }
00543 
00544         public void run() {
00545             Worker tmpWorkers = null;
00546             if (this.nodes.length > 0) {
00547                 long startTime = System.currentTimeMillis();
00548                 try {
00549                     tmpWorkers = (Worker) ProActiveGroup.newGroupInParallel(Worker.class.getName(),
00550                             args, this.nodes);
00551                     freeWorkerList.addAll(ProActiveGroup.getGroup(tmpWorkers));
00552                     Worker activedTmpWorkers = (Worker) ProActiveGroup.turnActiveGroup(tmpWorkers,
00553                             this.nodes[0]);
00554                     this.group.add(activedTmpWorkers);
00555                 } catch (Exception e) {
00556                     logger.fatal("Problem with group creation", e);
00557                     return;
00558                 }
00559                 long endTime = System.currentTimeMillis();
00560                 if (logger.isInfoEnabled()) {
00561                     logger.info("The remote Group " +
00562                         this.nodes[0].getNodeInformation().getHostName() +
00563                         " was created in " + (endTime - startTime) +
00564                         " ms with " +
00565                         ProActiveGroup.getGroup(tmpWorkers).size() +
00566                         " members");
00567                 }
00568             } else {
00569                 if (logger.isInfoEnabled()) {
00570                     logger.info(
00571                         "A remote Group was not created because no deployed nodes");
00572                 }
00573             }
00574         }
00575     }
00576 
00583     private class VnThread implements Runnable {
00584         private VirtualNode vn;
00585         private Object[] args;
00586         private Group<Worker> group;
00587         long startTime;
00588         long endTime;
00589 
00590         public VnThread(VirtualNode virtualNode, Object[] args, Group<Worker> vnGroup) {
00591             this.vn = virtualNode;
00592             this.args = args;
00593             this.group = vnGroup;
00594         }
00595 
00596         public void run() {
00597             Node[] nodes = null;
00598             try {
00599                 startTime = System.currentTimeMillis();
00600                 this.vn.activate();
00601                 nodes = this.vn.getNodes();
00602                 endTime = System.currentTimeMillis();
00603                 if (logger.isInfoEnabled()) {
00604                     logger.info("The VN " + this.vn.getName() +
00605                         " was deployed in " + (endTime - startTime) +
00606                         " ms with " + nodes.length + " nodes");
00607                 }
00608             } catch (NodeException e) {
00609                 if (logger.isInfoEnabled()) {
00610                     logger.info("No nodes returned for " + this.vn.getName());
00611                 }
00612                 return;
00613             }
00614             Worker tmpWorkers = null;
00615             if (nodes.length > 0) {
00616                 startTime = System.currentTimeMillis();
00617                 try {
00618                     tmpWorkers = (Worker) ProActiveGroup.newGroupInParallel(Worker.class.getName(),
00619                             args, nodes);
00620                     freeWorkerList.addAll(ProActiveGroup.getGroup(tmpWorkers));
00621                     Worker activedTmpWorkers = (Worker) ProActiveGroup.turnActiveGroup(tmpWorkers,
00622                             nodes[0]);
00623                     this.group.add(activedTmpWorkers);
00624                 } catch (Exception e) {
00625                     logger.fatal("Problem with group creation", e);
00626                     return;
00627                 }
00628                 workerGroup.setWorkerGroup(workerGroup);
00629                 endTime = System.currentTimeMillis();
00630                 if (logger.isInfoEnabled()) {
00631                     logger.info("The remote Group " + this.vn.getName() +
00632                         " was created in " + (endTime - startTime) +
00633                         " ms with " +
00634                         ProActiveGroup.getGroup(tmpWorkers).size() +
00635                         " members");
00636                 }
00637             } else {
00638                 if (logger.isInfoEnabled()) {
00639                     logger.info(
00640                         "A remote Group was not created because no deployed nodes");
00641                 }
00642             }
00643         }
00644     }
00645 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1