3. Examples - 1

3. Examples - 1

Browsing

Home:
Concurrent Programming with Fair Threads
The LOFT Language

Previous chapter: The Language LOFT
Next chapter: Programming Style

Sections

3.1 Basic Examples
  3.1.1 Mutual Stops
  3.1.2 Wait-Notify
  3.1.3 Synchronization Points
  3.1.4 Readers/Writers
  3.1.5 Producers/Consumers
3.2 Reflex Game Example
  3.2.1 Preemption
  3.2.2 The Reflex Game
  3.2.3 Environment
3.3 Sieve Examples
  3.3.1 Standard Eratosthenes Sieve
  3.3.2 Eratosthenes Sieve in Loft
  3.3.3 Lucky Numbers
  3.3.4 Lucky Prime Numbers
3.4 Data-Flow Programming
  3.4.1 Introduction
  3.4.2 Channels
  3.4.3 Processes
  3.4.4 Prime Numbers

Chapters

1. Introduction
2. The Language LOFT
3. Examples - 1
4. Programming Style
5. Semantics
6. FairThreads in C
7. Implementations
8. Examples - 2
9. Related Work
10. Conclusion
11. Annex A: API of FairThreads
12. Annex B: LOFT Reference Manual
13. Annex C: the LOFT Compiler
  Glossary
  Index

One considers several examples of programming in Loft. First, some basic examples are given in Basic Examples. The complete programming of a little reflex game is considered in Reflex Game Example. Sieve algorithms are coded in Sieve Examples. Finally, data-flow programming is considered Data-Flow Programming.
3.1 Basic Examples

One considers several basic examples which highlight some aspects of the language. Section Mutual Stops shows the benefit of a precise semantics. A notification-based communication mechanism is implemented in Wait-Notify. Barriers are considered in Synchronization Points. A reader/writer example is code in Readers/Writers. Finally, a producer/consumer example is considered in section Producers/Consumers.

3.1.1 Mutual Stops

Let us consider a system made of two threads implementing two variants of a service, say a fast one and a slow one. Two events are used to start each of the variants. After a variant is chosen, the other one should become unavailable, that is each variant should be able to stop the other variant. The coding of such an example is straightforward. First, the two threads fast and slow, and the two events start_fast and start_slow are declared:

thread_t fast, slow;
event_t  start_fast, start_slow;
The thread fast awaits start_fast to start. Then, before serving, it stops the slow variant. It is an instance of the following module fast_variant:

module fast_variant ()
await (start_fast);
stop (slow);
< fast service >
end module
The thread slow is an instance of slow_variant defined similarly by:

module slow_variant ()
await (start_slow);
stop (fast);
< slow service >
end module
The program consists in creating the two threads and the two events:

  ....  
  fast = fast_variant_create ();
  slow = slow_variant_create ();  
  start_fast = event_create ();
  start_slow = event_create ();
  ....
The question is now: what happens if both start_fast and start_slow are simultaneously present (this should not appear, but the question remains of what happens if by accident it is the case)? The answer is clear and precise, according to the semantics: the two variants are executed during only one instant, and, then, they both terminate at the next instant. Note that to insert a cooperate just after the stop in the two modules would prevent both threads to start servicing.. Now, suppose that the same example is coded using standard pthreads, replacing events by condition variables and stop by pthread_cancel. The resulting program is deeply non-deterministic. Actually, one of the two threads could cancel the other and run up to completion. But the situation where both threads cancel the other one is also possible in a multiprocessor context (where each thread is run by a distinct processor); however, in this case, both variants execute simultaneously while they are not canceled, which produces unpredictable results. Note that the possibility of mutual stopping exhibited in this example cannot be expressed in a synchronous language, because it would be rejected at compile time as having a causality cycle.

3.1.2 Wait-Notify

One considers thread synchronization using conditions which basically correspond to (simplified) condition variables of POSIX. A thread can wait for a condition to be set by another thread. The waiting thread is said to be notified when the condition is set. In a very first naive implementation, conditions are simply boolean variables (initially false). To notify a condition means to set the variable, and to wait for the condition means to test it.

int condition = 0;

void notify (int *condition)
{
   (*condition) = 1;
}

module wait (int *cond)
while ((*local(cond)) == 0) do
   cooperate;
end
{(*local(cond)) = 0;}
end module
Note that no mutex is needed: because of the cooperative model, simple boolean shared variables are sufficient to implement atomic test and set operations needed. There is however a major drawback: the module wait performs busy-waiting and is thus wasting the CPU resource.

3.1.2.1 Use of Events

Of course, events are the means to avoid busy-waiting. One now considers conditions as made of a boolean variable with an associated event:

typedef struct condition_t
{
   int condition;
   event_t satisfied;
}
*condition_t;
The notification sets the condition variable and generates the associated event:

void notify (condition_t cond)
{
   cond->condition = 1;
   generate (cond->satisfied);
}
The wait module awaits the event while the condition is not set:

module wait (condition_t cond)
while (local(cond)->condition == 0) do
   await (local(cond)->satisfied);
   if (local(cond)->condition == 0) then cooperate; end
end
{local(cond)->condition = 0;}
end module
Note that once the event is received, the condition must be tested again because an other thread waiting for the same condition could have been scheduled before. In this case, the wakening is not productive and the thread has just to cooperate. Without the second test, an instantaneous loop would occur in this situation. The solution proposed does not allow single notifications, where only one thread is notified at a time. Actually, all threads waiting for the same condition are simultaneously awaken and all but one will be able to proceed, the other ones returning to their previous state. This can be inefficient when several threads are often simultaneously waiting for the same condition.

3.1.2.2 Single Notification

To be able to notify threads one by one, an event is associated to each waiting thread. Thus, a condition now holds a list of events managed in a fifo way and storing the waiting threads. There are two notifications: notify_one which notifies a unique thread and notify_all which, as previously, notifies all the threads. The notify_one function gets the first event (get) and generates it:

void notify_one (condition_t cond)
{
   event_t event = get (cond);
   if (event == NULL) return;
   cond->condition = 1;
   generate (event);
}
The function get returns NULL if no thread is currently waiting on the condition. The notification is lost in this case (this conforms to POSIX specification). The notify_all function notifies all the threads in one single loop:

void notify_all (condition_t cond)
{
   int i, k = cond->length;
   for (i = 0; i < k; i++) notify_one (cond);
}
The module wait now begins with the creation of a new event which is stored (put) in the condition:

module wait (condition_t cond)
local event_t event;
while (local(cond)->condition == 0) do
   {
      local(event) = event_create_in (current_scheduler ());
      put (local(cond),local(event));
   }
   await (local(event));
   if (local(cond)->condition == 0) then cooperate; end
end
{local(cond)->condition = 0;}
end module
The notification of a unique thread does not imply the competition of all the other threads waiting for the same condition because they are actually waiting on distinct events. Note that the choice of a fifo strategy to manage events in conditions is quite arbitrary (while reasonable). This is of course a question which is inherent to the notify_one primitive: how is the notified thread chosen? Either which thread is notified is left unspecified, which introduces some nondeterminism, or the way threads are managed is specified, which can be felt as an over-specification. This is actually an issue which can only be solved by application programmers according to their needs.

3.1.3 Synchronization Points

A synchronization point is a given point in a program where different threads must wait until a certain number of threads have reached it. The type of synchronization points is defined by:

typedef struct sync_point_t
{
   int sync_count;
   int threshold;
   event_t go;
}
*sync_point_t;
If the threshold is reached, then the counter is reset to 0 and the event go is generated. Thus, all the waiting threads can immediately proceed. If the threshold is not reached, then the counter is incremented and the thread awaits go to continue.

module sync (sync_point_t cond)
if (local(cond)->threshold > local(cond)->sync_count) then
   {local(cond)->sync_count++;}
   await (local(cond)->go);
else
   {
      local(cond)->sync_count = 0;   
      generate (local(cond)->go);
   }
end
end module

3.1.4 Readers/Writers

One considers several threads that are reading and writing a shared resource1. The writers have priority over readers. Several readers can simultaneously read the resource while a writer must have exclusive access to it (no other writer nor reader) while writing. One adopts the terminology of locks and note rwlock_t the type of control structures:

typedef struct rwlock_t
{
   int lock_count;
   int waiting_writers;
   event_t read_go;
   event_t write_go;
}
*rwlock_t;
The convention for lock_count is the following: 0 means that the lock is held by nobody; when held by the writer, lock_count has value -1; when positive, lock_count is the number of readers currently reading.

3.1.4.1 Writers

In order to write, a writer must first run the module write_lock:

module write_lock (rwlock_t rw)
{local(rw)->waiting_writers++;}
while (local(rw)->lock_count) do
   await (local(rw)->write_go);
   if (local(rw)->lock_count) then cooperate; end
end

{
   local(rw)->lock_count = -1;
   local(rw)->waiting_writers--;
}
finalize
{
   local(rw)->waiting_writers--;
}
end module
When writing is finished, the writer must call the write_unlock function:

void write_unlock (rwlock_t rw)
{
   rw->lock_count = 0;
   if (!rw->waiting_writers) generate (rw->read_go);
   else generate (rw->write_go);
}
write_unlock should also be called in the finalizer of the writer in order to release the lock when the writer is forced to terminate.

3.1.4.2 Readers

A reader can read if no writer is currently writing (lock_count < 0) and if there is no waiting writer:

module read_lock (rwlock_t rw)
while ((local(rw)->lock_count < 0) || (local(rw)->waiting_writers)) do
   await (local(rw)->read_go);
end
{local(rw)->lock_count++;}
end module
After reading or when stopped, a reader should call the function read_unlock:

void read_unlock (rwlock_t rw)
{
   rw->lock_count--;
   if (!rw->lock_count) generate (rw->write_go);
}

3.1.5 Producers/Consumers

3.1.5.1 Unique area

One implements the simplest form of a producers/consumers system where several threads are processing values placed in a buffer. Each thread gets a value from the buffer, process it, and then put the result back in the buffer. To simplify, one considers that values are integers that are decremented each time they are processed. Moreover, the processing terminates when 0 is reached. First, a buffer implemented as a list of type channel, and an event are defined:

channel buffer;
event_t new_input;
The processing module cyclically gets a value from the buffer, tests if it is zero, and if it is not, processes the value. When it is finished, the value decremented by one is put back in the buffer. The event new_input is used to avoid busy-waiting while the buffer is empty (one considers that the buffer is rarely empty; so, a unique event is preferred to single notifications of section Wait-Notify).

module process ()
local int v;
while (1) do
   while (buffer->length == 0) do
      await (new_input);
      if (buffer->length == 0) then cooperate; end
   end
   {local (v) = get (buffer);}
   if (local(v) == 0) then return; end
   < processing the value >
   {local(v)--;}
   put (buffer,local(v));
   generate (new_input);
end
end module
All accesses to the shared buffer are atomic under the condition that all instances of process are created in the same scheduler. This is of course the case when the implicit scheduler is the unique scheduler in use.

3.1.5.2 Two areas

One now considers a situation where there are two buffers in and out, and a pool of threads that take data from in, process them, and then put results in out. A distinct scheduler is associated to each buffer.

channel in, out;
scheduler_t in_sched, out_sched;
event_t new_input, new_output;
The module process gets values from in, avoiding busy-waiting by using the event new_input. A new thread instance of process_value is run for each value.

module process ()
while (1) do
    if (in->length > 0) then
      run process_value (get (in));
    else 
      await (new_input);
      if (in->length == 0) then cooperate; end
    end
end
end module
The process_value module process its parameter and then links to out_sched to deliver the result in out. At delivery, the event new_output is generated to awake threads possibly waiting for out to be filled.

module process_value (int v)
< processing the value >
link (out_sched);
put (out,local(v));
generate (new_output);
end module
This code shows a way to manage shared data using schedulers. The use of standard locks is actually replaced by linking operations. Of course, locks still exist in the implementation and are used when linking operations are performed, but they are totally masked to the programmer who can thus reason in a more abstract way, in terms of linking actions to schedulers, and not in terms of low-level lock take and release actions.
3.2 Reflex Game Example

In this section, one considers the example of a little game for measuring the reactivity of users. This example, issued from Esterel, shows how Loft can be used for basic reactive programming. The purpose of the game is to measure the reflexes of the user. Four keys are used. The c key means "put a coin", the r key means "I'm ready", the e key means "end the measure" and the q key means "quit the game". After putting a coin, the user signals the game that he is ready; then, he waits for the GO! prompt; from that moment, the measure starts and it lasts until the user ends the measure. After a series of four measures, the game outputs the average score of the user. The game is over when an error situation is encountered. There are actually two such situations: when the player takes too much time to press a button (the player has abandonned); or when e is pressed before GO! (this is considered as a cheating attempt). First, one codes a preemption mechanism which will be used for processing error situations.

3.2.1 Preemption

One considers the preemption of a thread by an event, which means that the thread must be stopped when the event becomes present. This can be obtained using the following module killer:

module killer (event_t event,thread_t thread)
await (local(event));
stop (local(thread));
end module
However, this definition is not completely satisfactory because, when the controlled thread terminates normally, that is without being preempted, the instance of killer should also be stopped. One thus encapsulates it in the following module until:

module until (thread_t thread,event_t event)
local thread_t kill;
{local(kill) = killer_create (local(event),local(thread));}
join (local(thread));
stop (local(kill));
end module
Then, the instance of killer is stopped when the controlled thread terminates, as in this case, it is joined. The preemption of a thread t by the event e thus takes the form:

run until (t,e);

3.2.2 The Reflex Game

Several variables, functions, and events are first introduced:
  • Several integer variables: pause_length, limit_time, total_time, and measure_number.
  • A function print_out to print messages.
  • Five events: coin_evt is generated when c is pressed; ready_evt is generated when r is pressed; end_evt is generated when e is pressed; tilt_evt is generated by the game when an error is detected; the value of display_evt holds the measures times to be printed.

int pause_length, limit_time, total_time, measure_numbers;
void print_out (char*);
event_t coin_evt, ready_evt, end_evt, display_evt, tilt_evt;

Phase 1

The first phase consists in waiting the notification that the user is ready. The waiting lasts at most limit_time instants, and if the timeout is reached, an error is produced and tilt_evt is generated. During the waiting, mistakes are signaled by a beep sound: a mistake here means that the user presses the e key instead of the r key. Beeps are produced by the following module beep_on:

module beep_on (event_t event)
while (1) do
  await (local(event));
  print_out ("\07");
  cooperate;
end
end module
The first phase starts by creating an instance of beep_on which reacts on end_evt. Then, ready_evt is awaited during at most limit_time instants. If the timeout is reached then tilt_evt is generated and the execution blocks. In all cases, the previously created instance of beep_on is stopped.

module phase1 ()
local thread_t beeper;
print_out ("press r when ready\r\n");   
{local(beeper) = beep_on_create (end_evt);}
await (ready_evt,limit_time);
stop (local(beeper));
if (return_code () == ETIMEOUT) then
   generate (tilt_evt);
   halt;
end
end module

Phase 2

In phase 2, a prompt message is output after a random number of instants and then a mew measure starts. Cheating attempt are detected: the player is cheating if he tries to press e in advance, trying to anticipate the prompt. In this case, an error is detected and the game terminates. The module detector detects the presence of an event and then generates an other event and stops a thread:

module detector (event_t trigger,event_t produced,thread_t thread)
await (local(trigger));
stop (local(thread));
generate (local(produced));
end module
The module phase2 begins by creating a cheating detector which generates tilt_evt as soon as e is pressed. Then, one waits for a random number of instants (returned by the function myrandom) before printing the prompt message. At that moment, the cheating detector is stopped.

module phase2 ()
local thread_t cheat_detector;
print_out ("wait...\r\n");
{local(cheat_detector) = detector_create (end_evt,tilt_evt,self ());} 
repeat (myrandom ()) do cooperate; end
print_out ("GO!\r\n");
stop (local(cheat_detector));
end module

Phase 3

Phase 3 consists in measuring the number of instants taken by the player to press e. An abandon (e is not pressed during limit_time instants) terminates the game. Moreover, mistakes (here, pressing r instead of e) are detected and signaled. Finally, the value measured is associated to display_evt to be printed. An auxiliary module increm for counting instants is first defined. Once started, it increments at each instant a global variable whose reference is passed as parameter:

module increm (int *where)
while (1) do
   {(*local(where))++;}
   cooperate;
end
end module
The phase3 module is defined by:
   
module phase3 ()
local int count, thread_t counter, thread_t beeper;
{
   local(count) = 0;
   local(counter) = increm_create (&local(count));
   local(beeper)  = beep_on_create (ready_evt);
}
await (end_evt,limit_time);
stop (local(counter));
stop (local(beeper));
if (return_code () == ETIMEOUT) then
   generate (tilt_evt);
   halt;
else
   {total_time += local(count);}
   generate_value (display_evt, (void*)local(count));
end
end module

Reflex Game Module

The final_display module waits during pause_length instants before generating display_evt with a value which is the average of the measured times:

module final_display ()
repeat (pause_length) do cooperate; end
print_out ("**** final ");
generate_value (display_evt, (void*)(total_time / measure_number));
end module
A measure consists in running in sequence the three previous phases. The module list_of_measures execute measure_number measures and finally runs an instance of final_display:

module list_of_measures ()
repeat (measure_number) do
   run phase1 ();
   run phase2 ();
   run phase3 ();
end
run final_display ();
end module
The reflex_game module waits for c and then executes a list of measures. The measures are preempted by tilt_evt which is generated in case of errors. The preemption results from the use of the until module defined in Preemption:

module reflex_game ()
print_out ("A reflex game ... press c to start, q to stop.\r\n");
print_out ("You must press `e' as fast as possible after GO!.\r\n");
while (1) do
   {total_time = 0;}
   await (coin_evt);
   run until (list_of_measures_create (),tilt_evt);
   print_out ("game over. Press c to restart, q to stop.\r\n");
end
end module

3.2.3 Environment

One now considers the executing environment needed to use the reflex game. Two functions are first defined to put the terminal in a raw mode (to be able to directly get the key pressed by the player), and to restore the initial mode and terminate the game.

void the_beginning (void){
   system ("stty raw -echo");
}

void the_end (void){
   print_out ("It's more fun to compete ...\r\n");
   system ("stty -raw echo");
   exit (0);
}

Input

The input of the program is produced from the keys pressed. The keys are returned by the getchar function which is not cooperating. In order to use it, one thus defines the native module analyze_input which, after unlinking, cyclically calls getchar and generates the appropriate events:

module native analyze_input ()
unlink;
the_beginning ();
while (1) do
   {
      switch(getchar()){
         case 'c': generate (coin_evt); break;
         case 'r': generate (ready_evt); break;

         case 'e': generate (end_evt); break;
         case 'q': the_end ();
      }
   }
end
end module

Output

The results of measures are the values generated with display_evt. The way to get them is to use the get_value instruction. Note that only one value can be generated at a time; thus only the value of index 0 has to be considered. The module analyze_display is:

module analyze_display ()
local int res;
while (1) do
   await (display_evt);
   get_value (display_evt,0, (void**)&local(res));
   fprintf (stdout,"score: %d\r\n",local(res));
   cooperate;
end
end module
A message with a beep must be issued when tilt_evt is generated; this is done by the module analyze_tilt:

module analyze_tilt ()
while (1) do
   await (tilt_evt);
   print_out ("\07TILT!!\r\n");
   cooperate;
end
end module
Finally, the analyse of the output consists in creating two threads, one instance of analyze_display and one of analyze_tilt:

module analyze_output ()
analyze_display_create ();
analyze_tilt_create ();
end module

Main Module

The main module creates the five events and the three threads needed:
   
module main ()
{
   ready_evt      = event_create ();
   coin_evt       = event_create ();
   end_evt        = event_create ();
   display_evt    = event_create ();
   tilt_evt       = event_create ();
   analyze_input_create ();
   reflex_game_create ();
   analyze_output_create ();
}
end module
Two points are to be noticed. First, the order in which the threads are created corresponds to the intuitive sequence: input processing - reaction - output processing. Second, the values of pause_length and limit_time, and the definition of myrandom must be adapted to the executing platform, in order to get a playable system.
3.3 Sieve Examples

In this section, the focus is put on the dynamic creation of threads. One considers sieves which are algorithms for producing numbers with a given characteristics. The most well-known sieve is of course the sieve of Eratosthenes for prime numbers. Actually, there are two types of sieves: the number of elements of bounded sieves is statically fixed, while in unbounded sieves the number of elements is potentially infinite. Only unbounded sieves are considered here.

3.3.1 Standard Eratosthenes Sieve

The basic version of the unbounded sieve of Eratosthenes can be implemented by the following function2:

int main ()
{
   list_t primes = init_list ();
   int current = 3, create = 1;
   while (1) {
      cell_t cell = primes->first;
      while (cell != NULL) {
         if (multiple_of (current,cell->val)) {
            create = 0; break;
         }
         cell = cell->next;
      }
      if (create) {
         add_list (primes,current);
         output (current);
      } else create = 1;
      current += 2;
   }
   return 0;
}
The type list_t is the type of lists made of elements of type cell_t which contains an integer field val and a pointer next to the next cell. The definition of these two types is standard and not given here.

3.3.2 Eratosthenes Sieve in Loft

Dynamic memory allocation is somewhere needed in unbounded sieves. Actually, in Loft memory allocation can be masked by the creation of threads. The Eratosthenes's sieve can be thus basically implemented in Loft without the need of any user-defined data type.. One first defines a module which generates the odd numbers, one by instant. The odd numbers are associated to an event given as parameter.

module producer (event_t start)
local int i;
{local(i) = 1;}
while (1) do
   {local(i) += 2;}
   generate_value (local(start), (void**)local(i));
   cooperate;
end
end module
First, a macro is defined for waiting an event evt and assigning its first value to a variable res:

#define get_first_val(evt,res) await(local(evt));get_value(local(evt),0, (void**)&local(res))
An instance of filter_prime is created for each prime number passed as the value associated to the event in parameter:

module filter_prime (event_t me)
local int prime, int res, event_t next;
   get_first_val(me,prime);
   {
      output (local(prime));
      filter_prime_create (local(next = event_create ()));
   }
   while (1) do
      cooperate;
      get_first_val(me,res);
      {
         if (local(res) % local(prime))
              generate_value (local(next), (void**)local(res));
      }
   end
end module 
The condition local(res) % local(prime) is true if local(res) is not a multiple of local(prime) case local(res) is transmitted to the next filter; otherwise, nothing is done. The main module creates a producer and a first filter, linked by the same event:

module main ()
{
   event_t start = event_create ();
   producer_create (start);
   filter_prime_create (start);
}
end module
The beginning of the output is:
 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73
79 83 89 97 101 103 107 109 113 127 131 137 139 149 151 157 163 167
173 179 181 191 193 197 199 211 223 227 229 233 239 241 251 257 263
269 271 277 281 283 293 307 311 313 317 331 337 347 349 353 359 367
373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463
467 479 487 491 499 503 509 521 523 541 547 557 ...

3.3.3 Lucky Numbers

lucky numbers [18] are generated by a sieve which is very close to the one of Eratosthenes. The production of lucky numbers is defined by the following algorithm: write all the odd numbers: 1,3,5,7,9,11,... The first number greater than 1 is 3, so delete every third number: 1,3,7,9,13,15,19,... The first number greater than 3 in the list is 7, so delete every seventh number: 1,3,7,9,13,15,21,25,31,... And so on, without ending. Numbers that remain are called lucky numbers. The sieve to produce lucky numbers is based on a filtering function which filters numbers according to their indexes and not to their values, as the Eratosthenes sieve.

module filter_lucky (int i,event_t trigger,event_t out)
local int base, int res, int k, event_t next;
   get_first_val(trigger,base);
   {
      generate_value (local(out), (void**)local(base));
      filter_lucky_create (local(k)= local(i)+1,
                           local(next) = event_create (),
                           local(out));
   }
   while (1) do
      cooperate;
      get_first_val(trigger,res);
      {
         local(k)++;
         if (local(k) % local(base)) 
            generate_value (local(next), (void**)local(res));
      }

   end
end module
The produced numbers are generated as values of the event out in parameter. Moreover, the indexes of the filtered values is given as first parameter to filter_lucky. The sieve for lucky numbers is defined by:

module main ()
{
   event_t go = event_create ();
   event_t result = event_create ();

   out1_create (result);
   producer_create (go);
   filter_lucky_create (1,go,result);
}
end module
The module out1 prints all the values associated to its parameter which is an event. The list of lucky numbers starts with:
 3 7 9 13 15 21 25 31 33 37 43 49 51 63 67 69 73 75 79 87
93 99 105 111 115 127 129 133 135 141 151 159 163 169 171 189 193 195
201 205 211 219 223 231 235 237 241 259 261 267 273 283 285 289 297
303 307 319 321 327 331 339 349 357 361 367 385 391 393 399 409 415
421 427 429 433 451 463 475 477 483 487 489 495 511 517 519 529 535
537 541 553 559 577 579 583 591 601 613 615 619 ...

3.3.4 Lucky Prime Numbers

Let us now consider the numbers that are simultaneously prime and lucky. The direct production of these lucky prime numbers is not so clear using a unique sieve (what would be the filtering function?). However, a possible solution is to reuse the two previous sieves and to build a system made of two sub-systems, one producing prime numbers and the other producing lucky numbers. The values produced by the two sub-systems are comming from the same event and are output by the module out2 which outputs a value when two generation of it are received during the same instant, meaning thus that the value has passed all the prime filters but also all the lucky filters. The definition of out2 is:

module out2 (event_t event)
local int k, int res, int all;
while (1) do
   await (local(event));
   {local(k) = 0; local(all) = 0;}
   while (!local(all)) do
      get_value (local(event),local(k), (void**)&local(res)); 
      {
         if (return_code () == ENEXT) {
            local(all) = 1;
            if (local(k) == 2) output (local(res));
         }
         local(k)++;
      }
   end 
end
end module
The module filter_prime is changed to let it produce its results as values of an event. The system is the following:

module main ()
{
   event_t go = event_create ();
   event_t result = event_create ();

   out2_create (result);
   producer_create (go);
   filter_lucky_create (1,go,result);
   filter_prime_create (go,result);
}
end module
The list of lucky prime numbers is:
3 7 13 31 37 43 67 73 79 127 151 163 193 211 223 241 283
307 331 349 367 409 421 433 463 487 541 577 601 613 619 631 643 673
727 739 769 787 823 883 937 991 997 1009 1021 1039 1087 1093 1117 1123
1201 1231 1249 1291 1303 1459 1471 1543 1567 1579 1597 1663 1693 1723
1777 1801 1831 1879 1933 1987 2053 2083 2113 2221 2239 2251 2281 2311
2467 2473 2557 2593 2647 2671 2689 2797 2851 2887 2953 2971 3037 3049
3109 3121 3163 3187 3229 3259 3301 3307 3313 ...
This example shows how code reuse can be made easier by the existence of common instants shared by all the threads.
3.4 Data-Flow Programming

In this section, one considers data-flow programming and shows how to implement it in Loft.

3.4.1 Introduction

One considers programs made of processes connected through channels. Channels are basically unbounded FIFO files. Processes can put values in channels (this is always possible as channels are unbounded) and get values from them. Each channel as at most one producer (a process that fills it) and at most one consumer (a process that empties it). Thus, there is no concurrency on channels accesses, except between producers and consumers. These programs are called data-flow programs because processes are basically creating and transforming flows of values. They were first introduced in [23], and they are actually the initial computing model at the basis of dataflow synchronous languages. There are two variants: in Nets of Sequential Processes, the consumer of an empty channel remains stuck until some value is (possibly) produced by the producer of the channel. In Nets of Reactive Processes, all processes share the same instants, and a process can thus detect that a channel is empty (and thus it can avoid to get stuck on it) during one instant. In what follows, one does not distinguish between the two variants.

3.4.2 Channels

Channels are basically lists of cells containing values, with an access to the first cell and to the last cell. Moreover, an event is associated to each channel for signaling that a new value is put in it. Cells hold integers of type value_t.

typedef struct channel_t {
   cell_t   first;
   cell_t   last;
   int      length;
   event_t  new_value;
} *channel_t;
The function to put a value into a channel is:

void put (channel_t chan,value_t v)
{
   if (chan->length == 0) {
      chan->last = cell_create (v);
      chan->first = chan->last;
      generate (chan->new_value);
   } else {
      chan->last->next = cell_create (v);
      chan->last = chan->last->next;
   }
   chan->length++;
}
Note the generation of the event hold by the channel, to signal the consumer that the channel is no more empty. Extraction of a value from a channel is performed by the function extract:

value_t extract (channel_t chan)
{
   cell_t head;
   value_t res;
   if (chan->length == 0) return NULL;
   head = chan->first;
   chan->first = head->next;
   res = head->val;
   free (head);
   chan->length--;
   return res;
}
The module get blocks execution until a first value can be extracted out of a channel:

module get (channel_t in,value_t *res)
while (local(in)->length == 0) do
   await (local(in)->new_value);
   if (local(in)->length == 0) then cooperate; end
end
{(*local(res)) = extract (local(in));}
end module
If the channel is not empty, the value is immediately extracted from it and assigned to the result variable. Otherwise, the event associated to the channel is awaited. Note that a process can extract several values from a channel during the same instant. Thus, the event associated to a channel may be present while the channel is empty, because the channel has been already emptied during the same instant. There are two consequences:
  • One must always test if the channel is empty, after receiving the event.
  • In case the channel is empty (which means that all the values in it have been previously extracted), the module should cooperate, to avoid cycling.
A macro is defined to replace runs of the get module:

#define GET(chan) \
   while (chan->length == 0) do\
      await (chan->new_value);\
      if (chan->length == 0) then cooperate; end\
   end

3.4.3 Processes

Processes are modules. One just presents here two basic modules: the first one produces integer values on its output channel, and the second one prints the values received on its input channel. The producer is3:

module producer_process (value_t from,value_t incr,channel_t out)
local value_t val;
{local(val) = local(from);}
while (1) do
   if (local(out)->length < 10) then
   {
      put (local(out),local(val));
      local(val) += local(incr);
   }
   end
   cooperate;
end
end module
The out_process process is:

module out_process (channel_t in)
while (1) do
   GET (local(in))
   printf ("%d ",extract (local(in)));
   fflush(stdout);
end
end module
Programs are built by connecting processes with channels. Each channel should have at most one producer and at most one consumer. For example, the following program is made of a channel (created with the function InitChannel) connecting an instance of the process Producer to an instance of the process Out:

module main()
{ 
   channel_t c = channel_create ();
   producer_process_create (0,1,c);
   out_process_create (c);
}
end module
The program prints the list of integers.

3.4.4 Prime Numbers

One programs in the dataflow style the Eratosthenes sieve to produce prime numbers. A list of filters corresponding to already produced prime numbers are put in sequence and connected through channels. A number that is able to pass all the filters is prime. A new filter is introduced for each new prime number that rejects multiples of it. The filter module is:

module filter_process (int index,int base,channel_t in,channel_t out,filter_t filter)
while (1) do
   GET (local(in))
   {
      int v = extract (local(in));
      local(index)++;
      if (local(filter) (local(index),local(base),v)) 
            put(local(out),v);
   }
end
end module
The list of filters is constructed by the module sift_process:

module sift_process (channel_t in,channel_t out,filter_t filter)
local int k;
{local(k) = 1;}
while (1) do
   GET (local(in))
   {
      channel_t intern = channel_create_in (current_scheduler ());
      int v = extract (local(in));
      put(local(out),v);
      local(k)++;
      filter_process_create (local(k),v,local(in),intern,local(filter));
      local(in) = intern;
   }

end
end module
Finally, the prime numbers are produced by the following program, where the filtering function primes (of type filter_t) filters the multiples of a base value:

int primes (int index,int base,int value)
{
   return value%base;
}

module main()
{ 
   channel_t c1 = channel_create ();
   channel_t c2 = channel_create ();
   producer_process_create   (3,2,c1);
   sift_process_create       (c1,c2,primes);
   out_process_create        (c2);
}
end module



1: This example is considered in the book General Programming Concepts: Writing and Debugging Programs belonging to the AIX system documentation (2nd edition, 1999).

2: thanks to Fabrice Peix

3: One supposes that the number of unusued valued is limited to 10.

This page has been generated by Scribe.
Last update Wed Oct 22 18:41:04 2003