00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.core.group.threadpool;
00032 
00033 import java.util.ArrayList;
00034 
00035 import org.objectweb.proactive.core.group.AbstractProcessForGroup;
00036 
00037 
00042 public class ThreadPool {
00043 
00045     private Thread[] threads = null;
00046 
00048     private ArrayList<AbstractProcessForGroup> pendingJobs = null;
00049 
00051     protected EndControler controler = new EndControler();
00052 
00054     private int memberToThreadRatio = 4;
00055 
00057     private int additionalThreads = 3;
00058 
00062     public ThreadPool() {
00063         this(1); 
00064     }
00065 
00069     public ThreadPool(int size) {
00070         this.threads = new ThreadInThePool[size];
00071         this.pendingJobs = new ArrayList<AbstractProcessForGroup>(size);
00072         for (int i = 0; i < this.threads.length; i++) {
00073             this.threads[i] = new ThreadInThePool(this);
00074             this.threads[i].start();
00075         }
00076     }
00077 
00082     protected void createThreads(int number) {
00083         this.threads = new ThreadInThePool[number];
00084         for (int i = 0; i < this.threads.length; i++) {
00085             this.threads[i] = new ThreadInThePool(this);
00086             this.threads[i].start();
00087         }
00088     }
00089 
00095     public void checkNumberOfThreads(int members) {
00096         int i;
00097         int f;
00098 
00099         
00100         if (this.memberToThreadRatio != 0) {
00101             f = ((int) Math.ceil(((float) members) / ((float) this.memberToThreadRatio))) +
00102                 this.additionalThreads;
00103         } else {
00104             f = this.additionalThreads;
00105         }
00106 
00107         
00108         if (this.threads.length < f) {
00109             Thread[] tmp = new Thread[f];
00110             for (i = 0; i < this.threads.length; i++) {
00111                 tmp[i] = this.threads[i];
00112             }
00113             for (; i < f; i++) {
00114                 tmp[i] = new ThreadInThePool(this);
00115                 tmp[i].start();
00116             }
00117             this.threads = tmp;
00118         } else if (this.threads.length > f) {
00119             Thread[] tmp = new Thread[f];
00120             for (i = 0; i < f; i++) {
00121                 tmp[i] = this.threads[i];
00122             }
00123             for (; i < this.threads.length; i++) {
00124                 this.threads[i].interrupt();
00125                 this.threads[i] = null;
00126             }
00127             this.threads = tmp;
00128         }
00129     }
00130 
00135     public void ratio(int i) {
00136         this.memberToThreadRatio = i;
00137     }
00138 
00143     public void thread(int i) {
00144         this.additionalThreads = i;
00145     }
00146 
00148     public synchronized void addAJob(AbstractProcessForGroup r) {
00149         this.controler.jobStart();
00150         this.pendingJobs.add(r);
00151         this.notify();
00152     }
00153 
00157     public synchronized Runnable getJobForThePendingQueue() {
00158         try {
00159             
00160             while (!this.pendingJobs.iterator().hasNext()) {
00161                 this.wait();
00162                 
00163             }
00164 
00165             
00166             Runnable r = (Runnable) this.pendingJobs.iterator().next();
00167             this.pendingJobs.remove(r);
00168             return r;
00169         } catch (InterruptedException e) {
00170             this.controler.jobFinish();
00171             return null;
00172         }
00173     }
00174 
00176     public void complete() {
00177         
00178         this.controler.waitDone();
00179     }
00180 
00181     
00182     
00183     
00184     
00185     
00186     
00187     
00188     
00189     
00190     
00191 
00193     public void clean() {
00194         for (int i = 0; i < threads.length; i++) {
00195             this.threads[i].interrupt();
00196         }
00197     }
00198 }