basix_doc 0.1
/Users/mourrain/Devel/mmx/basix/src/threads.cpp
Go to the documentation of this file.
00001 
00002 /******************************************************************************
00003 * MODULE     : threads.cpp
00004 * DESCRIPTION: Multi-threading
00005 * COPYRIGHT  : (C) 2007  Joris van der Hoeven
00006 *******************************************************************************
00007 * This software falls under the GNU general public license and comes WITHOUT
00008 * ANY WARRANTY WHATSOEVER. See the file $TEXMACS_PATH/LICENSE for more details.
00009 * If you don't have this file, write to the Free Software Foundation, Inc.,
00010 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00011 ******************************************************************************/
00012 
00013 #include <basix/threads.hpp>
00014 #include <basix/port.hpp>
00015 #ifdef __linux__
00016 #include <unistd.h>
00017 #else
00018 #  ifdef __APPLE__
00019 #  include <sys/types.h>
00020 #  include <sys/sysctl.h>
00021 #  endif
00022 #endif
00023 
00024 namespace mmx {
00025 
00026 mutex memory_lock;
00027 
00028 #ifdef BASIX_ENABLE_THREADS
00029 
00030 /******************************************************************************
00031 * Global data
00032 ******************************************************************************/
00033 
00034 struct thread_data {
00035   pthread_t th;
00036   nat       id;
00037   task_rep* t;
00038 };
00039 
00040 bool            threads_busy     = false;
00041 bool            threads_active   = false;
00042 nat             threads_number   = 0;
00043 thread_data*    threads_data     = NULL;
00044 
00045 task*           threads_task     = NULL;
00046 nat             threads_nr_tasks = 0;
00047 nat             threads_index    = 0;
00048 nat             threads_todo     = 0;
00049 
00050 pthread_mutex_t threads_task_lock;
00051 pthread_mutex_t threads_idle_lock;
00052 pthread_cond_t  threads_idle_cond;
00053 pthread_mutex_t threads_busy_lock;
00054 pthread_cond_t  threads_busy_cond;
00055 
00056 /******************************************************************************
00057 * Workers
00058 ******************************************************************************/
00059 
00060 void*
00061 threads_worker (void* ptr) {
00062   thread_data* data= (thread_data*) ptr;
00063   //pthread_mutex_lock (&threads_busy_lock);
00064   //mmout << "Started thread #" << data->id << "\n";
00065   //pthread_mutex_unlock (&threads_busy_lock);
00066 
00067   while (threads_busy) {
00068     // Wait for new input
00069     pthread_mutex_lock (&threads_idle_lock);
00070     //mmout << "Thread #" << data->id << " is waiting\n";
00071     pthread_cond_wait (&threads_idle_cond, &threads_idle_lock);
00072     pthread_mutex_unlock (&threads_idle_lock);
00073     if (!threads_busy) break;
00074 
00075     while (true) {
00076       // Retrieve next task
00077       pthread_mutex_lock (&threads_task_lock);
00078       if (threads_index == threads_nr_tasks) {
00079         pthread_mutex_unlock (&threads_task_lock);
00080         break;
00081       }
00082       //mmout << "Thread #" << data->id
00083       //      << " processes task " << threads_index << "\n";
00084       data->t= inside (threads_task[threads_index]);
00085       threads_index++;
00086       pthread_mutex_unlock (&threads_task_lock);
00087 
00088       // Execute next task
00089       if (data->t != NULL) data->t->execute ();
00090 
00091       // Remove task
00092       data->t= NULL;
00093       pthread_mutex_lock (&threads_busy_lock);
00094       threads_todo--;
00095       //mmout << "Thread #" << data->id << " completed; "
00096       //      << threads_todo << " tasks waiting\n";
00097       if (threads_todo == 0)
00098         pthread_cond_signal (&threads_busy_cond);
00099       pthread_mutex_unlock (&threads_busy_lock);
00100     }
00101   }
00102 
00103   pthread_mutex_lock (&threads_busy_lock);
00104   threads_todo--;
00105   //mmout << "Thread #" << data->id << " dying; "
00106   //      << threads_todo << " tasks waiting to die\n";
00107   if (threads_todo == 0)
00108     pthread_cond_signal (&threads_busy_cond);
00109   pthread_mutex_unlock (&threads_busy_lock);
00110   pthread_exit (NULL);
00111 }
00112 
00113 /******************************************************************************
00114 * Master
00115 ******************************************************************************/
00116 
00117 void
00118 threads_execute (task* ts, nat n) {
00119   threads_task    = ts;
00120   threads_nr_tasks= n;
00121   threads_index   = 0;
00122   threads_todo    = n;
00123 
00124   threads_active  = true;
00125   pthread_mutex_lock (&threads_busy_lock);
00126   pthread_mutex_lock (&threads_idle_lock);
00127   pthread_cond_broadcast (&threads_idle_cond);
00128   pthread_mutex_unlock (&threads_idle_lock);
00129 
00130   pthread_cond_wait (&threads_busy_cond, &threads_busy_lock);
00131   pthread_mutex_unlock (&threads_busy_lock);
00132   threads_active  = false;
00133 }
00134 
00135 /******************************************************************************
00136 * Initialization and termination
00137 ******************************************************************************/
00138 
00139 void
00140 threads_initialize (nat n) {
00141   pthread_mutex_init (&threads_task_lock, NULL);
00142   pthread_mutex_init (&threads_idle_lock, NULL);
00143   pthread_cond_init  (&threads_idle_cond, NULL);
00144   pthread_mutex_init (&threads_busy_lock, NULL);
00145   pthread_cond_init  (&threads_busy_cond, NULL);
00146 
00147   threads_busy   = true;
00148   threads_number = n;
00149   threads_data   = new thread_data[threads_number];
00150   for (nat id=0; id<threads_number; id++) {
00151     //pthread_attr_t attr;
00152     //pthread_attr_init(&attr);
00153     //pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM);
00154     threads_data[id].id= id;
00155     threads_data[id].t = NULL;
00156     if (pthread_create(&(threads_data[id].th), NULL /* &attr */,
00157                        threads_worker, (void *) (threads_data + id)))
00158       ERROR ("failed to create thread");
00159   }
00160 }
00161 
00162 void
00163 threads_terminate () {
00164   threads_busy    = false;
00165   threads_todo    = threads_number;
00166 
00167   pthread_mutex_lock (&threads_busy_lock);
00168   pthread_mutex_lock (&threads_idle_lock);
00169   pthread_cond_broadcast (&threads_idle_cond);
00170   pthread_mutex_unlock (&threads_idle_lock);
00171 
00172   pthread_cond_wait (&threads_busy_cond, &threads_busy_lock);
00173   pthread_mutex_unlock (&threads_busy_lock);
00174 
00175   void* exit_status;
00176   for (nat id=0; id<threads_number; id++)
00177     pthread_join (threads_data[id].th, &exit_status);
00178   
00179   delete[] threads_data;
00180   threads_data   = NULL;
00181   threads_number = 0;
00182 
00183   pthread_mutex_destroy (&threads_task_lock);
00184   pthread_mutex_destroy (&threads_idle_lock);
00185   pthread_cond_destroy  (&threads_idle_cond);
00186   pthread_mutex_destroy (&threads_busy_lock);
00187   pthread_cond_destroy  (&threads_busy_cond);
00188 }
00189 
00190 struct threads_instance {
00191   threads_instance (nat n) {
00192     threads_initialize (n); }
00193   ~threads_instance () {
00194     threads_terminate (); }
00195 };
00196 
00197 threads_instance threads_inst (BASIX_ENABLE_THREADS);
00198 
00199 int
00200 threads_get_number () {
00201   return threads_number;
00202 }
00203 
00204 void
00205 threads_set_number (const int& n) {
00206   threads_terminate ();
00207   threads_initialize (n);
00208 }
00209 
00210 /******************************************************************************
00211 * Multi-threading disabled
00212 ******************************************************************************/
00213 
00214 #else // ifndef BASIX_ENABLE_THREADS
00215 
00216 nat  threads_number= 1;
00217 void threads_initialize (nat n) { (void) n; }
00218 void threads_terminate () {}
00219 int threads_get_number () { return 1; }
00220 void threads_set_number (const int& n) {
00221   ASSERT (n == 1, "multithreading disabled"); }
00222 void init_threads () {}
00223 void term_threads () {}
00224 void threads_execute (task* ts, nat n) {
00225   for (nat i=0; i<n; i++) inside (ts[i]) -> execute (); }
00226 
00227 #endif // BASIX_ENABLE_THREADS
00228 
00229 /******************************************************************************
00230 * Platform independent determination of available number of cores
00231 ******************************************************************************/
00232 
00233 int
00234 get_number_cores () {
00235 #ifdef __linux__ // also to be used for Solaris, AIX, ...
00236   int nr1= sysconf (_SC_NPROCESSORS_ONLN);
00237   if (nr1 >= 1) return nr1;
00238 #endif
00239 
00240 #ifdef __APPLE__ // also to be used for BSD system
00241   int mib[2] = { CTL_HW, HW_AVAILCPU };
00242   //int mib[2] = { CTL_HW, HW_NCPU };
00243   int nr2;
00244   size_t len;
00245   len = sizeof (nr2);
00246   sysctl (mib, 2, &nr2, &len, NULL, 0);
00247   if (nr2 >= 1) return nr2;
00248 #endif
00249 
00250 #ifdef __WINDOW__upcoming
00251   SYSTEM_INFO sysinfo;
00252   GetSystemInfo( &sysinfo );
00253   int nr3= sysinfo.dwNumberOfProcessors;
00254   if (nr3 >= 1) return nr3;
00255 #endif
00256 
00257 #ifdef __HPUX__upcoming
00258   int nr4= mpctl (MPC_GETNUMSPUS, NULL, NULL);
00259   if (nr4 >= 1) return nr4;
00260 #endif
00261 
00262 #ifdef __IRIX__upcoming
00263   int nr5= sysconf (_SC_NPROC_ONLN);
00264   if (nr5 >= 1) return nr5;
00265 #endif
00266 
00267   return 1;
00268 }
00269 
00270 /******************************************************************************
00271 * Running a stupid loop for testing purposes
00272 ******************************************************************************/
00273 
00274 struct loop_task_rep: public task_rep {
00275   nat iterations;
00276 public:
00277   inline loop_task_rep (nat its): iterations (its) {}
00278   void execute () { for (nat i=0; i<iterations; i++) noop (); }
00279 };
00280 
00281 void
00282 threads_simple_loop (nat nr_instances, nat nr_iterations) {
00283   task ts[nr_instances];
00284   for (nat id=0; id<nr_instances; id++)
00285     ts[id]= new loop_task_rep (nr_iterations);
00286   threads_execute (ts, nr_instances);
00287 }
00288 
00289 } // namespace mmx
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines