00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.branchnbound.core;
00032
00033 import java.io.FileNotFoundException;
00034 import java.io.FileOutputStream;
00035 import java.io.InputStream;
00036 import java.io.Serializable;
00037 import java.util.Collection;
00038 import java.util.Vector;
00039
00040 import org.apache.log4j.Logger;
00041 import org.objectweb.proactive.ActiveObjectCreationException;
00042 import org.objectweb.proactive.Body;
00043 import org.objectweb.proactive.InitActive;
00044 import org.objectweb.proactive.ProActive;
00045 import org.objectweb.proactive.branchnbound.core.queue.TaskQueue;
00046 import org.objectweb.proactive.core.ProActiveRuntimeException;
00047 import org.objectweb.proactive.core.descriptor.data.VirtualNode;
00048 import org.objectweb.proactive.core.exceptions.proxy.FailedGroupRendezVousException;
00049 import org.objectweb.proactive.core.group.Group;
00050 import org.objectweb.proactive.core.group.ProActiveGroup;
00051 import org.objectweb.proactive.core.mop.ClassNotReifiableException;
00052 import org.objectweb.proactive.core.node.Node;
00053 import org.objectweb.proactive.core.node.NodeException;
00054 import org.objectweb.proactive.core.util.log.Loggers;
00055 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00056
00057
00058
00066 public class Manager implements Serializable, InitActive {
00067 private static Logger logger = ProActiveLogger.getLogger(Loggers.P2P_SKELETONS_MANAGER);
00068 private static final boolean enableRealloc = false;
00069 private static final boolean enableBackup = false;
00070 private static final int backupTask = 10;
00071 private static final String backupResultFile = System.getProperty(
00072 "user.home") + System.getProperty("file.separator") +
00073 "framework.results.backup";
00074 public static final String backupTaskFile = System.getProperty("user.home") +
00075 System.getProperty("file.separator") + "framework.tasks.backup";
00076 private Task rootTask = null;
00077 private TaskQueue taskProviderQueue = null;
00078
00079
00080 private Node[] nodes = null;
00081 private Node[][] arrayOfNodes = null;
00082 private VirtualNode[] arrayOfVns = null;
00083
00084
00085 private Worker workerGroup = null;
00086
00087
00088 private Vector<Result> futureTaskList = new Vector<Result>();
00089 private Vector<Task> pendingTaskList = new Vector<Task>();
00090 private Vector<Worker> workingWorkerList = new Vector<Worker>();
00091 private Vector<Worker> freeWorkerList = new Vector<Worker>();
00092 private Vector<Task> toReallocTaskList = new Vector<Task>();
00093
00097 public Manager() {
00098
00099 }
00100
00107 private Manager(Task root, Node myNode, String queueType) {
00108
00109 try {
00110 this.rootTask = (Task) ProActive.turnActive(root, myNode);
00111 } catch (ActiveObjectCreationException e) {
00112 logger.fatal("Problem with the turn active of the root task", e);
00113 throw new RuntimeException(e);
00114 } catch (NodeException e) {
00115 logger.fatal("Problem with the node of the root task", e);
00116 throw new RuntimeException(e);
00117 }
00118
00119
00120 try {
00121 if (logger.isInfoEnabled()) {
00122 logger.info("Activing the task queue: " + queueType);
00123 }
00124 this.taskProviderQueue = (TaskQueue) ProActive.newActive(queueType,
00125 null, myNode);
00126 } catch (ActiveObjectCreationException e1) {
00127 logger.fatal("Couldn't create the Task Provider", e1);
00128 throw new ProActiveRuntimeException(e1);
00129 } catch (NodeException e1) {
00130 logger.fatal("Couldn't create the Task Provider", e1);
00131 throw new ProActiveRuntimeException(e1);
00132 }
00133 }
00134
00142 public Manager(Task root, Node myNode, Node[] nodes, String queueType) {
00143 this(root, myNode, queueType);
00144 this.nodes = nodes;
00145 }
00146
00155 public Manager(Task root, Node myNode, Node[][] nodes, String queueType) {
00156 this(root, myNode, queueType);
00157 this.arrayOfNodes = nodes;
00158 }
00159
00169 public Manager(Task root, Node myNode, VirtualNode[] virtualNodes,
00170 String queueType) {
00171 this(root, myNode, queueType);
00172 this.arrayOfVns = virtualNodes;
00173 }
00174
00180 public void initActivity(Body body) {
00181
00182 logger.info("Compute the lower bound for the root task");
00183 this.rootTask.initLowerBound();
00184 logger.info("Compute the upper bound for the root task");
00185 this.rootTask.initUpperBound();
00186 logger.info("Calling for the first time split on the root task");
00187 Vector subTaskList = this.rootTask.split();
00188 logger.info("The ROOT task sends " + subTaskList.size());
00189 this.taskProviderQueue.addAll(subTaskList);
00190
00191
00192 try {
00193 Object[] args = new Object[] { this.taskProviderQueue };
00194
00195
00196 if (this.nodes != null) {
00197 logger.info("Manager is deploying a group of workers");
00198
00199 long singleStartTime = System.currentTimeMillis();
00200 this.workerGroup = (Worker) ProActiveGroup.newGroupInParallel(Worker.class.getName(),
00201 args, this.nodes);
00202 ProActive.addNFEListenerOnGroup(this.workerGroup,
00203 FailedGroupRendezVousException.AUTO_GROUP_PURGE);
00204 this.freeWorkerList.addAll(ProActiveGroup.getGroup(
00205 this.workerGroup));
00206 long singleEndTime = System.currentTimeMillis();
00207 if (logger.isInfoEnabled()) {
00208 logger.info("The Group was created in " +
00209 (singleEndTime - singleStartTime) + " ms");
00210 }
00211 } else if ((this.arrayOfNodes != null) &&
00212 (this.arrayOfNodes.length > 0)) {
00213 logger.info("Manager is deploying " + this.arrayOfNodes.length +
00214 " groups of workers");
00215
00216 this.workerGroup = (Worker) ProActiveGroup.newGroup(Worker.class.getName());
00217 Group<Worker> mainGroup = ProActiveGroup.getGroup(this.workerGroup);
00218 for (int i = 0; i < this.arrayOfNodes.length; i++) {
00219 GroupThread gt = new GroupThread(this.arrayOfNodes[i],
00220 args, mainGroup);
00221 new Thread(gt).start();
00222 }
00223 } else if ((this.arrayOfVns != null) &&
00224 (this.arrayOfVns.length > 0)) {
00225 logger.info("Manager is deploying " + this.arrayOfVns.length +
00226 " groups of workers");
00227
00228 this.workerGroup = (Worker) ProActiveGroup.newGroup(Worker.class.getName());
00229 Group<Worker> vnGroup = ProActiveGroup.getGroup(this.workerGroup);
00230 for (int i = 0; i < this.arrayOfVns.length; i++) {
00231 VnThread vt = new VnThread(this.arrayOfVns[i], args, vnGroup);
00232 new Thread(vt).start();
00233 }
00234 } else {
00235 logger.fatal("No nodes for distributing the computation");
00236 throw new ProActiveRuntimeException(
00237 "No nodes for distributing the computation");
00238 }
00239 } catch (ClassNotReifiableException e) {
00240 logger.fatal("The Worker is not reifiable", e);
00241 throw new ProActiveRuntimeException(e);
00242 } catch (ActiveObjectCreationException e) {
00243 logger.fatal("Problem with active objects creation", e);
00244 throw new ProActiveRuntimeException(e);
00245 } catch (NodeException e) {
00246 logger.fatal("Problem with a node", e);
00247 } catch (ClassNotFoundException e) {
00248 logger.fatal("The class for worker was not found", e);
00249 throw new ProActiveRuntimeException(e);
00250 }
00251
00252 Group<Worker> groupOfWorkers = ProActiveGroup.getGroup(this.workerGroup);
00253 this.workerGroup.setWorkerGroup(this.workerGroup);
00254
00255 if (logger.isInfoEnabled()) {
00256 logger.info("Manager successfuly activate with " +
00257 this.freeWorkerList.size() + " workers");
00258 }
00259
00260 this.nodes = null;
00261 this.arrayOfNodes = null;
00262 }
00263
00268 public Result start() {
00269 logger.info("Starting computation");
00270
00271 if (!ProActive.getBodyOnThis().isActive()) {
00272 logger.fatal("The manager is not active");
00273 throw new ProActiveRuntimeException("The manager is not active");
00274 }
00275
00276 int backupCounter = 0;
00277 int reallocCounter = 0;
00278
00279
00280 boolean hasNext;
00281 while ((hasNext = this.taskProviderQueue.hasNext().booleanValue()) ||
00282 (this.pendingTaskList.size() != 0) ||
00283 (!this.toReallocTaskList.isEmpty())) {
00284 boolean hasAddedTask = false;
00285 if (!this.toReallocTaskList.isEmpty() &&
00286 !this.freeWorkerList.isEmpty()) {
00287 Task tReallocated = this.toReallocTaskList.remove(0);
00288 try {
00289 this.assignTaskToWorker(this.freeWorkerList.remove(
00290 0), tReallocated);
00291 logger.info("A task just reallocated");
00292 } catch (Exception e) {
00293 logger.info("A worker is down");
00294 this.toReallocTaskList.add(tReallocated);
00295 }
00296 }
00297 try {
00298 if (hasNext && (this.freeWorkerList.size() > 0)) {
00299 if ((this.freeWorkerList.size() > 0)) {
00300 Task t = this.taskProviderQueue.next();
00301 try {
00302 this.assignTaskToWorker(this.freeWorkerList.remove(
00303 0), t);
00304 hasAddedTask = true;
00305 } catch (Exception e) {
00306 logger.info("A worker is down");
00307 this.taskProviderQueue.addTask(t);
00308 }
00309 } else if (this.futureTaskList.size() == 0) {
00310
00311 try {
00312 Thread.sleep(100);
00313 } catch (InterruptedException e) {
00314 }
00315 continue;
00316 }
00317 if (hasAddedTask && logger.isInfoEnabled()) {
00318 logger.info("Pending tasks: " +
00319 this.pendingTaskList.size() +
00320 " - Achivied tasks: " +
00321 this.taskProviderQueue.howManyResults() +
00322 " - Not calculated tasks: " +
00323 this.taskProviderQueue.size());
00324 continue;
00325 }
00326 }
00327
00328 try {
00329 int index = ProActive.waitForAny(this.futureTaskList, 1000);
00330 backupCounter++;
00331 Result currentResult = this.futureTaskList.remove(index);
00332 this.taskProviderQueue.addResult(currentResult);
00333 this.pendingTaskList.remove(index);
00334 Worker freeWorker = this.workingWorkerList.remove(index);
00335 if (this.taskProviderQueue.hasNext().booleanValue()) {
00336 Task t1 = this.taskProviderQueue.next();
00337 try {
00338 this.assignTaskToWorker(freeWorker, t1);
00339 } catch (Exception e) {
00340 logger.info("A worker is down");
00341 this.taskProviderQueue.addTask(t1);
00342 }
00343 } else {
00344 this.freeWorkerList.add(freeWorker);
00345 }
00346 if (logger.isDebugEnabled()) {
00347 logger.debug(currentResult);
00348 }
00349 if (logger.isInfoEnabled()) {
00350 logger.info("Pending tasks: " +
00351 this.pendingTaskList.size() +
00352 " - Achivied tasks: " +
00353 this.taskProviderQueue.howManyResults() +
00354 " - Not calculated tasks: " +
00355 this.taskProviderQueue.size());
00356 }
00357 if (enableBackup && ((backupCounter % backupTask) == 0)) {
00358 this.backupAll(this.rootTask);
00359 }
00360 } catch (Exception e) {
00361 if (logger.isDebugEnabled()) {
00362 logger.debug("Manager is waiting for result: " + e);
00363 }
00364 reallocCounter++;
00365
00366 if (enableRealloc && !this.freeWorkerList.isEmpty() &&
00367 (reallocCounter == 60)) {
00368 reallocCounter = 0;
00369 for (int i = 0; i < this.workingWorkerList.size();
00370 i++) {
00371 Worker current = this.workingWorkerList.get(i);
00372 try {
00373 current.alive();
00374 } catch (Exception down) {
00375 this.futureTaskList.remove(i);
00376 this.workingWorkerList.remove(i);
00377 this.toReallocTaskList.add(this.pendingTaskList.remove(
00378 i));
00379 }
00380 }
00381 }
00382 continue;
00383 }
00384 } catch (Exception e) {
00385 continue;
00386 }
00387 }
00388 logger.info("Total of results = " +
00389 this.taskProviderQueue.howManyResults());
00390 logger.info("Total of tasks = " + this.taskProviderQueue.size());
00391
00392 Collection resultsFuture = this.taskProviderQueue.getAllResults();
00393 ProActive.waitFor(resultsFuture);
00394 Result[] results = (Result[]) resultsFuture.toArray(new Result[this.taskProviderQueue.howManyResults()
00395 .intValue()]);
00396 return this.rootTask.gather(results);
00397 }
00398
00404 public Result start(Task rootTask) {
00405 this.taskProviderQueue.flushAll();
00406
00407 this.workerGroup.reset();
00408
00409 try {
00410 this.rootTask = (Task) ProActive.turnActive(rootTask,
00411 ProActive.getBodyOnThis().getNodeURL());
00412 } catch (ActiveObjectCreationException e) {
00413 logger.fatal("Problem with the turn active of the root task", e);
00414 throw new RuntimeException(e);
00415 } catch (NodeException e) {
00416 logger.fatal("Problem with the node of the root task", e);
00417 throw new RuntimeException(e);
00418 }
00419
00420
00421 logger.info("Compute the lower bound for the root task");
00422 this.rootTask.initLowerBound();
00423 logger.info("Compute the upper bound for the root task");
00424 this.rootTask.initUpperBound();
00425 logger.info("Calling for the first time split on the root task");
00426 Vector subTaskList = this.rootTask.split();
00427 logger.info("The ROOT task sends " + subTaskList.size());
00428 this.taskProviderQueue.addAll(subTaskList);
00429
00430 return ((Manager) ProActive.getStubOnThis()).start();
00431 }
00432
00439 public Result start(InputStream task, InputStream result) {
00440 this.loadTasks(task);
00441 this.loadResults(result);
00442 return ((Manager) ProActive.getStubOnThis()).start();
00443 }
00444
00450 public void setHungryLevel(int level) {
00451 assert this.taskProviderQueue != null : "Manager is not active";
00452 this.taskProviderQueue.setHungryLevel(level);
00453 }
00454
00455
00456
00457
00458
00467 private void assignTaskToWorker(Worker worker, Task task)
00468 throws Exception {
00469 this.futureTaskList.add(worker.execute(task));
00470 this.pendingTaskList.add(task);
00471 this.workingWorkerList.add(worker);
00472 }
00473
00478 private void backupAll(Task rootTask) {
00479 logger.info("Backuping");
00480 try {
00481 this.taskProviderQueue.backupResults(new FileOutputStream(
00482 backupResultFile));
00483 this.taskProviderQueue.backupTasks(rootTask, this.pendingTaskList,
00484 new FileOutputStream(backupTaskFile));
00485 } catch (FileNotFoundException e) {
00486 logger.fatal("Problem with backup", e);
00487 throw new ProActiveRuntimeException(e);
00488 }
00489 }
00490
00495 private void loadTasks(InputStream taskFile) {
00496 if (!ProActive.getBodyOnThis().isActive()) {
00497 logger.fatal("The manager is not active");
00498 throw new ProActiveRuntimeException("The manager is not active");
00499 }
00500 this.taskProviderQueue.loadTasks(taskFile);
00501 this.taskProviderQueue.getRootTaskFromBackup();
00502 try {
00503 this.rootTask = (Task) ProActive.turnActive(this.taskProviderQueue.getRootTaskFromBackup(),
00504 ProActive.getBodyOnThis().getNodeURL());
00505 } catch (ActiveObjectCreationException e) {
00506 logger.fatal("Problem with the turn active of the root task", e);
00507 throw new RuntimeException(e);
00508 } catch (NodeException e) {
00509 logger.fatal("Problem with the node of the root task", e);
00510 throw new RuntimeException(e);
00511 }
00512 }
00513
00518 private void loadResults(InputStream resultFile) {
00519 this.taskProviderQueue.loadResults(resultFile);
00520 }
00521
00522
00523
00524
00525
00533 private class GroupThread implements Runnable {
00534 private Node[] nodes;
00535 private Object[] args;
00536 private Group<Worker> group;
00537
00538 public GroupThread(Node[] nodes, Object[] args, Group<Worker> group) {
00539 this.nodes = nodes;
00540 this.args = args;
00541 this.group = group;
00542 }
00543
00544 public void run() {
00545 Worker tmpWorkers = null;
00546 if (this.nodes.length > 0) {
00547 long startTime = System.currentTimeMillis();
00548 try {
00549 tmpWorkers = (Worker) ProActiveGroup.newGroupInParallel(Worker.class.getName(),
00550 args, this.nodes);
00551 freeWorkerList.addAll(ProActiveGroup.getGroup(tmpWorkers));
00552 Worker activedTmpWorkers = (Worker) ProActiveGroup.turnActiveGroup(tmpWorkers,
00553 this.nodes[0]);
00554 this.group.add(activedTmpWorkers);
00555 } catch (Exception e) {
00556 logger.fatal("Problem with group creation", e);
00557 return;
00558 }
00559 long endTime = System.currentTimeMillis();
00560 if (logger.isInfoEnabled()) {
00561 logger.info("The remote Group " +
00562 this.nodes[0].getNodeInformation().getHostName() +
00563 " was created in " + (endTime - startTime) +
00564 " ms with " +
00565 ProActiveGroup.getGroup(tmpWorkers).size() +
00566 " members");
00567 }
00568 } else {
00569 if (logger.isInfoEnabled()) {
00570 logger.info(
00571 "A remote Group was not created because no deployed nodes");
00572 }
00573 }
00574 }
00575 }
00576
00583 private class VnThread implements Runnable {
00584 private VirtualNode vn;
00585 private Object[] args;
00586 private Group<Worker> group;
00587 long startTime;
00588 long endTime;
00589
00590 public VnThread(VirtualNode virtualNode, Object[] args, Group<Worker> vnGroup) {
00591 this.vn = virtualNode;
00592 this.args = args;
00593 this.group = vnGroup;
00594 }
00595
00596 public void run() {
00597 Node[] nodes = null;
00598 try {
00599 startTime = System.currentTimeMillis();
00600 this.vn.activate();
00601 nodes = this.vn.getNodes();
00602 endTime = System.currentTimeMillis();
00603 if (logger.isInfoEnabled()) {
00604 logger.info("The VN " + this.vn.getName() +
00605 " was deployed in " + (endTime - startTime) +
00606 " ms with " + nodes.length + " nodes");
00607 }
00608 } catch (NodeException e) {
00609 if (logger.isInfoEnabled()) {
00610 logger.info("No nodes returned for " + this.vn.getName());
00611 }
00612 return;
00613 }
00614 Worker tmpWorkers = null;
00615 if (nodes.length > 0) {
00616 startTime = System.currentTimeMillis();
00617 try {
00618 tmpWorkers = (Worker) ProActiveGroup.newGroupInParallel(Worker.class.getName(),
00619 args, nodes);
00620 freeWorkerList.addAll(ProActiveGroup.getGroup(tmpWorkers));
00621 Worker activedTmpWorkers = (Worker) ProActiveGroup.turnActiveGroup(tmpWorkers,
00622 nodes[0]);
00623 this.group.add(activedTmpWorkers);
00624 } catch (Exception e) {
00625 logger.fatal("Problem with group creation", e);
00626 return;
00627 }
00628 workerGroup.setWorkerGroup(workerGroup);
00629 endTime = System.currentTimeMillis();
00630 if (logger.isInfoEnabled()) {
00631 logger.info("The remote Group " + this.vn.getName() +
00632 " was created in " + (endTime - startTime) +
00633 " ms with " +
00634 ProActiveGroup.getGroup(tmpWorkers).size() +
00635 " members");
00636 }
00637 } else {
00638 if (logger.isInfoEnabled()) {
00639 logger.info(
00640 "A remote Group was not created because no deployed nodes");
00641 }
00642 }
00643 }
00644 }
00645 }