00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.group.threadpool;
00032
00033 import java.util.ArrayList;
00034
00035 import org.objectweb.proactive.core.group.AbstractProcessForGroup;
00036
00037
00042 public class ThreadPool {
00043
00045 private Thread[] threads = null;
00046
00048 private ArrayList<AbstractProcessForGroup> pendingJobs = null;
00049
00051 protected EndControler controler = new EndControler();
00052
00054 private int memberToThreadRatio = 4;
00055
00057 private int additionalThreads = 3;
00058
00062 public ThreadPool() {
00063 this(1);
00064 }
00065
00069 public ThreadPool(int size) {
00070 this.threads = new ThreadInThePool[size];
00071 this.pendingJobs = new ArrayList<AbstractProcessForGroup>(size);
00072 for (int i = 0; i < this.threads.length; i++) {
00073 this.threads[i] = new ThreadInThePool(this);
00074 this.threads[i].start();
00075 }
00076 }
00077
00082 protected void createThreads(int number) {
00083 this.threads = new ThreadInThePool[number];
00084 for (int i = 0; i < this.threads.length; i++) {
00085 this.threads[i] = new ThreadInThePool(this);
00086 this.threads[i].start();
00087 }
00088 }
00089
00095 public void checkNumberOfThreads(int members) {
00096 int i;
00097 int f;
00098
00099
00100 if (this.memberToThreadRatio != 0) {
00101 f = ((int) Math.ceil(((float) members) / ((float) this.memberToThreadRatio))) +
00102 this.additionalThreads;
00103 } else {
00104 f = this.additionalThreads;
00105 }
00106
00107
00108 if (this.threads.length < f) {
00109 Thread[] tmp = new Thread[f];
00110 for (i = 0; i < this.threads.length; i++) {
00111 tmp[i] = this.threads[i];
00112 }
00113 for (; i < f; i++) {
00114 tmp[i] = new ThreadInThePool(this);
00115 tmp[i].start();
00116 }
00117 this.threads = tmp;
00118 } else if (this.threads.length > f) {
00119 Thread[] tmp = new Thread[f];
00120 for (i = 0; i < f; i++) {
00121 tmp[i] = this.threads[i];
00122 }
00123 for (; i < this.threads.length; i++) {
00124 this.threads[i].interrupt();
00125 this.threads[i] = null;
00126 }
00127 this.threads = tmp;
00128 }
00129 }
00130
00135 public void ratio(int i) {
00136 this.memberToThreadRatio = i;
00137 }
00138
00143 public void thread(int i) {
00144 this.additionalThreads = i;
00145 }
00146
00148 public synchronized void addAJob(AbstractProcessForGroup r) {
00149 this.controler.jobStart();
00150 this.pendingJobs.add(r);
00151 this.notify();
00152 }
00153
00157 public synchronized Runnable getJobForThePendingQueue() {
00158 try {
00159
00160 while (!this.pendingJobs.iterator().hasNext()) {
00161 this.wait();
00162
00163 }
00164
00165
00166 Runnable r = (Runnable) this.pendingJobs.iterator().next();
00167 this.pendingJobs.remove(r);
00168 return r;
00169 } catch (InterruptedException e) {
00170 this.controler.jobFinish();
00171 return null;
00172 }
00173 }
00174
00176 public void complete() {
00177
00178 this.controler.waitDone();
00179 }
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00193 public void clean() {
00194 for (int i = 0; i < threads.length; i++) {
00195 this.threads[i].interrupt();
00196 }
00197 }
00198 }