|
One considers several examples of programming in Loft. First, some
basic examples are given in Basic Examples. The
complete programming of a little reflex game is considered in Reflex Game Example. Sieve algorithms are coded in Sieve Examples. Finally, data-flow programming is considered Data-Flow Programming.
One considers several basic examples which highlight some aspects of
the language. Section Mutual Stops shows
the benefit of a precise semantics. A notification-based communication
mechanism is implemented in Wait-Notify. Barriers
are considered in Synchronization Points. A
reader/writer example is code in Readers/Writers.
Finally, a producer/consumer example is considered in section Producers/Consumers.
3.1.1 Mutual Stops
Let us consider a system made of two threads implementing two variants
of a service, say a fast one and a slow one. Two events are used to
start each of the variants. After a variant is chosen, the other one
should become unavailable, that is each variant should be able to stop
the other variant. The coding of such an example is straightforward.
First, the two threads fast and slow , and the two
events start_fast and start_slow are declared:
thread_t fast, slow;
event_t start_fast, start_slow;
The thread fast awaits start_fast to start. Then,
before serving, it stops the slow variant. It is an instance of the
following module fast_variant :
module fast_variant ()
await (start_fast);
stop (slow);
< fast service >
end module
The thread slow is an instance of slow_variant
defined similarly by:
module slow_variant ()
await (start_slow);
stop (fast);
< slow service >
end module
The program consists in creating the two threads and the two events:
....
fast = fast_variant_create ();
slow = slow_variant_create ();
start_fast = event_create ();
start_slow = event_create ();
....
The question is now: what happens if both start_fast and
start_slow are simultaneously present (this should not
appear, but the question remains of what happens if by accident it is
the case)? The answer is clear and precise, according to the
semantics: the two variants are executed during only one instant, and,
then, they both terminate at the next instant. Note that to insert a
cooperate just after the stop in the two modules
would prevent both threads to start servicing..
Now, suppose that the same example is coded using standard pthreads,
replacing events by condition variables and stop by pthread_cancel . The resulting program is deeply
non-deterministic. Actually, one of the two threads could cancel the
other and run up to completion. But the situation where both threads
cancel the other one is also possible in a multiprocessor context
(where each thread is run by a distinct processor); however, in
this case, both variants execute simultaneously while they are not
canceled, which produces unpredictable results.
Note that the possibility of mutual stopping exhibited in this example
cannot be expressed in a synchronous language, because it would be
rejected at compile time as having a causality cycle.
3.1.2 Wait-Notify
One considers thread synchronization using conditions which
basically correspond to (simplified) condition variables of
POSIX. A thread can wait for a condition to be set by another
thread. The waiting thread is said to be notified when the
condition is set. In a very first naive implementation, conditions
are simply boolean variables (initially false). To notify a condition
means to set the variable, and to wait for the condition means to test
it.
int condition = 0;
void notify (int *condition)
{
(*condition) = 1;
}
module wait (int *cond)
while ((*local(cond)) == 0) do
cooperate;
end
{(*local(cond)) = 0;}
end module
Note that no mutex is needed: because of the cooperative model, simple
boolean shared variables are sufficient to implement atomic test and
set operations needed. There is however a major drawback: the module
wait performs busy-waiting and is thus wasting the CPU
resource.
3.1.2.1 Use of Events
Of course, events are the means to avoid busy-waiting. One now
considers conditions as made of a boolean variable with an associated
event:
typedef struct condition_t
{
int condition;
event_t satisfied;
}
*condition_t;
The notification sets the condition variable and generates
the associated event:
void notify (condition_t cond)
{
cond->condition = 1;
generate (cond->satisfied);
}
The wait module awaits the event while the condition
is not set:
module wait (condition_t cond)
while (local(cond)->condition == 0) do
await (local(cond)->satisfied);
if (local(cond)->condition == 0) then cooperate; end
end
{local(cond)->condition = 0;}
end module
Note that once the event is received, the condition must be tested
again because an other thread waiting for the same condition could have
been scheduled before. In this case, the wakening is not productive
and the thread has just to cooperate. Without the second test, an
instantaneous loop would occur in this situation.
The solution proposed does not allow single notifications,
where only one thread is notified at a time. Actually, all threads
waiting for the same condition are simultaneously awaken and all but
one will be able to proceed, the other ones returning to their
previous state. This can be inefficient when several threads are often
simultaneously waiting for the same condition.
3.1.2.2 Single Notification
To be able to notify threads one by one, an event is associated to
each waiting thread. Thus, a condition now holds a list of events
managed in a fifo way and storing the waiting threads. There are two
notifications: notify_one which notifies a unique thread and
notify_all which, as previously, notifies all the threads.
The notify_one function gets the first event (get )
and generates it:
void notify_one (condition_t cond)
{
event_t event = get (cond);
if (event == NULL) return;
cond->condition = 1;
generate (event);
}
The function get returns NULL if no thread is currently
waiting on the condition. The notification is lost in this case (this
conforms to POSIX specification). The notify_all function
notifies all the threads in one single loop:
void notify_all (condition_t cond)
{
int i, k = cond->length;
for (i = 0; i < k; i++) notify_one (cond);
}
The module wait now begins with the creation of a new
event which is stored (put ) in the condition:
module wait (condition_t cond)
local event_t event;
while (local(cond)->condition == 0) do
{
local(event) = event_create_in (current_scheduler ());
put (local(cond),local(event));
}
await (local(event));
if (local(cond)->condition == 0) then cooperate; end
end
{local(cond)->condition = 0;}
end module
The notification of a unique thread does not imply the competition of
all the other threads waiting for the same condition because they are
actually waiting on distinct events. Note that the choice of a fifo
strategy to manage events in conditions is quite arbitrary (while
reasonable). This is of course a question which is inherent to the
notify_one primitive: how is the notified thread chosen?
Either which thread is notified is left unspecified, which introduces
some nondeterminism, or the way threads are managed is specified,
which can be felt as an over-specification. This is actually an issue
which can only be solved by application programmers according to their
needs.
3.1.3 Synchronization Points
A synchronization point is a given point in a program where
different threads must wait until a certain number of threads have
reached it. The type of synchronization points is defined by:
typedef struct sync_point_t
{
int sync_count;
int threshold;
event_t go;
}
*sync_point_t;
If the threshold is reached, then the counter is reset to 0 and the
event go is generated. Thus, all the waiting threads can
immediately proceed. If the threshold is not reached, then the
counter is incremented and the thread awaits go to continue.
module sync (sync_point_t cond)
if (local(cond)->threshold > local(cond)->sync_count) then
{local(cond)->sync_count++;}
await (local(cond)->go);
else
{
local(cond)->sync_count = 0;
generate (local(cond)->go);
}
end
end module
3.1.4 Readers/Writers
One considers several threads that are reading and writing a shared
resource1. The
writers have priority over readers. Several readers can simultaneously
read the resource while a writer must have exclusive access to it (no
other writer nor reader) while writing. One adopts the terminology of
locks and note rwlock_t the type of control structures:
typedef struct rwlock_t
{
int lock_count;
int waiting_writers;
event_t read_go;
event_t write_go;
}
*rwlock_t;
The convention for lock_count is the following: 0 means that
the lock is held by nobody; when held by the writer, lock_count has value -1; when positive, lock_count is
the number of readers currently reading.
3.1.4.1 Writers
In order to write, a writer must first run the module
write_lock :
module write_lock (rwlock_t rw)
{local(rw)->waiting_writers++;}
while (local(rw)->lock_count) do
await (local(rw)->write_go);
if (local(rw)->lock_count) then cooperate; end
end
{
local(rw)->lock_count = -1;
local(rw)->waiting_writers--;
}
finalize
{
local(rw)->waiting_writers--;
}
end module
When writing is finished, the writer must call the write_unlock
function:
void write_unlock (rwlock_t rw)
{
rw->lock_count = 0;
if (!rw->waiting_writers) generate (rw->read_go);
else generate (rw->write_go);
}
write_unlock should also be called in the finalizer of the writer
in order to release the lock when the writer is forced to terminate.
3.1.4.2 Readers
A reader can read if no writer is currently writing (lock_count < 0 ) and if there is no waiting writer:
module read_lock (rwlock_t rw)
while ((local(rw)->lock_count < 0) || (local(rw)->waiting_writers)) do
await (local(rw)->read_go);
end
{local(rw)->lock_count++;}
end module
After reading or when stopped, a reader should call the function read_unlock :
void read_unlock (rwlock_t rw)
{
rw->lock_count--;
if (!rw->lock_count) generate (rw->write_go);
}
3.1.5 Producers/Consumers
3.1.5.1 Unique area
One implements the simplest form of a producers/consumers system where
several threads are processing values placed in a buffer. Each thread
gets a value from the buffer, process it, and then put the result back
in the buffer. To simplify, one considers that values are integers
that are decremented each time they are processed. Moreover, the
processing terminates when 0 is reached. First, a buffer implemented
as a list of type channel , and an event are defined:
channel buffer;
event_t new_input;
The processing module cyclically gets a value from the buffer, tests
if it is zero, and if it is not, processes the value. When it is
finished, the value decremented by one is put back in the buffer. The
event new_input is used to avoid busy-waiting while the
buffer is empty (one considers that the buffer is rarely empty;
so, a unique event is preferred to single notifications of section
Wait-Notify).
module process ()
local int v;
while (1) do
while (buffer->length == 0) do
await (new_input);
if (buffer->length == 0) then cooperate; end
end
{local (v) = get (buffer);}
if (local(v) == 0) then return; end
< processing the value >
{local(v)--;}
put (buffer,local(v));
generate (new_input);
end
end module
All accesses to the shared buffer are atomic under the condition that
all instances of process are created in the same
scheduler. This is of course the case when the implicit scheduler is
the unique scheduler in use.
3.1.5.2 Two areas
One now considers a situation where there are two buffers in and
out , and a pool of threads that take data from in ,
process them, and then put results in out . A distinct scheduler
is associated to each buffer.
channel in, out;
scheduler_t in_sched, out_sched;
event_t new_input, new_output;
The module process gets values from in , avoiding
busy-waiting by using the event new_input . A new thread
instance of process_value is run for each value.
module process ()
while (1) do
if (in->length > 0) then
run process_value (get (in));
else
await (new_input);
if (in->length == 0) then cooperate; end
end
end
end module
The process_value module process its parameter and then
links to out_sched to deliver the result in out . At delivery, the event new_output is generated to
awake threads possibly waiting for out to be filled.
module process_value (int v)
< processing the value >
link (out_sched);
put (out,local(v));
generate (new_output);
end module
This code shows a way to manage shared data using schedulers. The use
of standard locks is actually replaced by linking operations. Of
course, locks still exist in the implementation and are used when
linking operations are performed, but they are totally masked to the
programmer who can thus reason in a more abstract way, in terms of
linking actions to schedulers, and not in terms of low-level lock take
and release actions.
In this section, one considers the example of a little game for
measuring the reactivity of users. This example, issued from
Esterel, shows how Loft can be used for basic reactive
programming.
The purpose of the game is to measure the reflexes of the user. Four
keys are used. The c key means "put a coin", the r key means "I'm ready", the e key means "end the
measure" and the q key means "quit the game". After putting
a coin, the user signals the game that he is ready; then, he waits
for the GO! prompt; from that moment, the measure starts
and it lasts until the user ends the measure. After a series of four
measures, the game outputs the average score of the user. The game is
over when an error situation is encountered. There are actually two
such situations: when the player takes too much time to press a button
(the player has abandonned); or when e is pressed before
GO! (this is considered as a cheating attempt).
First, one codes a preemption mechanism which will be used for
processing error situations.
3.2.1 Preemption
One considers the preemption of a thread by an event, which means that
the thread must be stopped when the event becomes present. This can be
obtained using the following module killer :
module killer (event_t event,thread_t thread)
await (local(event));
stop (local(thread));
end module
However, this definition is not completely satisfactory because,
when the controlled thread terminates normally, that is without being
preempted, the instance of killer should also be
stopped. One thus encapsulates it in the following module until :
module until (thread_t thread,event_t event)
local thread_t kill;
{local(kill) = killer_create (local(event),local(thread));}
join (local(thread));
stop (local(kill));
end module
Then, the instance of killer is stopped when the controlled
thread terminates, as in this case, it is joined. The preemption of a
thread t by the event e thus takes the
form:
run until (t,e);
3.2.2 The Reflex Game
Several variables, functions, and events are first introduced:
- Several integer variables:
pause_length ,
limit_time , total_time , and measure_number .
- A function
print_out to print messages.
- Five events:
coin_evt is generated when c is pressed;
ready_evt is generated when r is pressed;
end_evt is generated when e is pressed;
tilt_evt is generated by the game when an error is detected;
the value of display_evt holds the measures times to be printed.
int pause_length, limit_time, total_time, measure_numbers;
void print_out (char*);
event_t coin_evt, ready_evt, end_evt, display_evt, tilt_evt;
Phase 1
The first phase consists in waiting the notification that the user is
ready. The waiting lasts at most limit_time instants, and if
the timeout is reached, an error is produced and tilt_evt is
generated. During the waiting, mistakes are signaled by a beep sound:
a mistake here means that the user presses the e key instead
of the r key. Beeps are produced by the following module
beep_on :
module beep_on (event_t event)
while (1) do
await (local(event));
print_out ("\07");
cooperate;
end
end module
The first phase starts by creating an instance of beep_on
which reacts on end_evt . Then, ready_evt is awaited
during at most limit_time instants. If the timeout is
reached then tilt_evt is generated and the execution
blocks. In all cases, the previously created instance of beep_on is stopped.
module phase1 ()
local thread_t beeper;
print_out ("press r when ready\r\n");
{local(beeper) = beep_on_create (end_evt);}
await (ready_evt,limit_time);
stop (local(beeper));
if (return_code () == ETIMEOUT) then
generate (tilt_evt);
halt;
end
end module
Phase 2
In phase 2, a prompt message is output after a random number of
instants and then a mew measure starts. Cheating attempt are detected:
the player is cheating if he tries to press e in advance,
trying to anticipate the prompt. In this case, an error is detected
and the game terminates. The module detector detects the
presence of an event and then generates an other event and stops a
thread:
module detector (event_t trigger,event_t produced,thread_t thread)
await (local(trigger));
stop (local(thread));
generate (local(produced));
end module
The module phase2 begins by creating a cheating detector
which generates tilt_evt as soon as e is
pressed. Then, one waits for a random number of instants (returned by
the function myrandom ) before printing the prompt
message. At that moment, the cheating detector is stopped.
module phase2 ()
local thread_t cheat_detector;
print_out ("wait...\r\n");
{local(cheat_detector) = detector_create (end_evt,tilt_evt,self ());}
repeat (myrandom ()) do cooperate; end
print_out ("GO!\r\n");
stop (local(cheat_detector));
end module
Phase 3
Phase 3 consists in measuring the number of instants taken by the
player to press e . An abandon (e is not pressed
during limit_time instants) terminates the game. Moreover,
mistakes (here, pressing r instead of e ) are
detected and signaled. Finally, the value measured is associated to
display_evt to be printed.
An auxiliary module increm for counting instants is first
defined. Once started, it increments at each instant a global variable
whose reference is passed as parameter:
module increm (int *where)
while (1) do
{(*local(where))++;}
cooperate;
end
end module
The phase3 module is defined by:
module phase3 ()
local int count, thread_t counter, thread_t beeper;
{
local(count) = 0;
local(counter) = increm_create (&local(count));
local(beeper) = beep_on_create (ready_evt);
}
await (end_evt,limit_time);
stop (local(counter));
stop (local(beeper));
if (return_code () == ETIMEOUT) then
generate (tilt_evt);
halt;
else
{total_time += local(count);}
generate_value (display_evt, (void*)local(count));
end
end module
Reflex Game Module
The final_display module waits during pause_length
instants before generating display_evt with a value which is the
average of the measured times:
module final_display ()
repeat (pause_length) do cooperate; end
print_out ("**** final ");
generate_value (display_evt, (void*)(total_time / measure_number));
end module
A measure consists in running in sequence the three previous phases.
The module list_of_measures execute measure_number
measures and finally runs an instance of final_display :
module list_of_measures ()
repeat (measure_number) do
run phase1 ();
run phase2 ();
run phase3 ();
end
run final_display ();
end module
The reflex_game module waits for c and then
executes a list of measures. The measures are preempted by tilt_evt which is generated in case of errors. The preemption results
from the use of the until module defined in Preemption:
module reflex_game ()
print_out ("A reflex game ... press c to start, q to stop.\r\n");
print_out ("You must press `e' as fast as possible after GO!.\r\n");
while (1) do
{total_time = 0;}
await (coin_evt);
run until (list_of_measures_create (),tilt_evt);
print_out ("game over. Press c to restart, q to stop.\r\n");
end
end module
3.2.3 Environment
One now considers the executing environment needed to use the reflex game.
Two functions are first defined to put the terminal in a raw mode (to
be able to directly get the key pressed by the player), and to restore
the initial mode and terminate the game.
void the_beginning (void){
system ("stty raw -echo");
}
void the_end (void){
print_out ("It's more fun to compete ...\r\n");
system ("stty -raw echo");
exit (0);
}
Input
The input of the program is produced from the keys pressed. The keys
are returned by the getchar function which is not
cooperating. In order to use it, one thus defines the native module
analyze_input which, after unlinking, cyclically calls getchar and generates the appropriate events:
module native analyze_input ()
unlink;
the_beginning ();
while (1) do
{
switch(getchar()){
case 'c': generate (coin_evt); break;
case 'r': generate (ready_evt); break;
case 'e': generate (end_evt); break;
case 'q': the_end ();
}
}
end
end module
Output
The results of measures are the values generated with display_evt . The way to get them is to use the get_value
instruction. Note that only one value can be generated at a time;
thus only the value of index 0 has to be considered. The module
analyze_display is:
module analyze_display ()
local int res;
while (1) do
await (display_evt);
get_value (display_evt,0, (void**)&local(res));
fprintf (stdout,"score: %d\r\n",local(res));
cooperate;
end
end module
A message with a beep must be issued when tilt_evt is
generated; this is done by the module analyze_tilt :
module analyze_tilt ()
while (1) do
await (tilt_evt);
print_out ("\07TILT!!\r\n");
cooperate;
end
end module
Finally, the analyse of the output consists in creating two threads,
one instance of analyze_display and one of analyze_tilt :
module analyze_output ()
analyze_display_create ();
analyze_tilt_create ();
end module
Main Module
The main module creates the five events and the three threads needed:
module main ()
{
ready_evt = event_create ();
coin_evt = event_create ();
end_evt = event_create ();
display_evt = event_create ();
tilt_evt = event_create ();
analyze_input_create ();
reflex_game_create ();
analyze_output_create ();
}
end module
Two points are to be noticed. First, the order in which the threads
are created corresponds to the intuitive sequence: input processing -
reaction - output processing. Second, the values of pause_length and limit_time , and the definition of myrandom must be adapted to the executing platform, in order to get
a playable system.
In this section, the focus is put on the dynamic creation of threads.
One considers sieves which are algorithms for producing
numbers with a given characteristics. The most well-known sieve is
of course the sieve of Eratosthenes for prime numbers.
Actually, there are two types of sieves: the number of elements of
bounded sieves is statically fixed, while in unbounded sieves the
number of elements is potentially infinite. Only unbounded sieves are
considered here.
3.3.1 Standard Eratosthenes Sieve
The basic version of the unbounded sieve of Eratosthenes can be implemented
by the following function2:
int main ()
{
list_t primes = init_list ();
int current = 3, create = 1;
while (1) {
cell_t cell = primes->first;
while (cell != NULL) {
if (multiple_of (current,cell->val)) {
create = 0; break;
}
cell = cell->next;
}
if (create) {
add_list (primes,current);
output (current);
} else create = 1;
current += 2;
}
return 0;
}
The type list_t is the type of lists made of elements of
type cell_t which contains an integer field val
and a pointer next to the next cell. The definition of these
two types is standard and not given here.
3.3.2 Eratosthenes Sieve in Loft
Dynamic memory allocation is somewhere needed in unbounded
sieves. Actually, in Loft memory allocation can be masked by the
creation of threads. The Eratosthenes's sieve can be thus basically
implemented in Loft without the need of any user-defined data type..
One first defines a module which generates the odd numbers, one
by instant. The odd numbers are associated to an event given as
parameter.
module producer (event_t start)
local int i;
{local(i) = 1;}
while (1) do
{local(i) += 2;}
generate_value (local(start), (void**)local(i));
cooperate;
end
end module
First, a macro is defined for waiting an event evt and
assigning its first value to a variable res :
#define get_first_val(evt,res) await(local(evt));get_value(local(evt),0, (void**)&local(res))
An instance of filter_prime is created for each prime
number passed as the value associated to the event in parameter:
module filter_prime (event_t me)
local int prime, int res, event_t next;
get_first_val(me,prime);
{
output (local(prime));
filter_prime_create (local(next = event_create ()));
}
while (1) do
cooperate;
get_first_val(me,res);
{
if (local(res) % local(prime))
generate_value (local(next), (void**)local(res));
}
end
end module
The condition local(res) % local(prime) is true if local(res) is not a multiple of local(prime)
case local(res) is transmitted to the next filter;
otherwise, nothing is done.
The main module creates a producer and a first filter, linked
by the same event:
module main ()
{
event_t start = event_create ();
producer_create (start);
filter_prime_create (start);
}
end module
The beginning of the output is:
3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73
79 83 89 97 101 103 107 109 113 127 131 137 139 149 151 157 163 167
173 179 181 191 193 197 199 211 223 227 229 233 239 241 251 257 263
269 271 277 281 283 293 307 311 313 317 331 337 347 349 353 359 367
373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463
467 479 487 491 499 503 509 521 523 541 547 557 ...
3.3.3 Lucky Numbers
lucky numbers [18] are generated by a
sieve which is very close to the one of Eratosthenes. The production
of lucky numbers is defined by the following algorithm: write all the
odd numbers: 1,3,5,7,9,11,... The first number greater than 1 is 3,
so delete every third number: 1,3,7,9,13,15,19,... The first
number greater than 3 in the list is 7, so delete every seventh
number: 1,3,7,9,13,15,21,25,31,... And so on, without ending. Numbers
that remain are called lucky numbers.
The sieve to produce lucky numbers is based on a filtering function
which filters numbers according to their indexes and not to their
values, as the Eratosthenes sieve.
module filter_lucky (int i,event_t trigger,event_t out)
local int base, int res, int k, event_t next;
get_first_val(trigger,base);
{
generate_value (local(out), (void**)local(base));
filter_lucky_create (local(k)= local(i)+1,
local(next) = event_create (),
local(out));
}
while (1) do
cooperate;
get_first_val(trigger,res);
{
local(k)++;
if (local(k) % local(base))
generate_value (local(next), (void**)local(res));
}
end
end module
The produced numbers are generated as values of the event out in parameter. Moreover, the indexes of the filtered values is
given as first parameter to filter_lucky . The sieve for
lucky numbers is defined by:
module main ()
{
event_t go = event_create ();
event_t result = event_create ();
out1_create (result);
producer_create (go);
filter_lucky_create (1,go,result);
}
end module
The module out1 prints all the values associated to its
parameter which is an event. The list of lucky numbers starts with:
3 7 9 13 15 21 25 31 33 37 43 49 51 63 67 69 73 75 79 87
93 99 105 111 115 127 129 133 135 141 151 159 163 169 171 189 193 195
201 205 211 219 223 231 235 237 241 259 261 267 273 283 285 289 297
303 307 319 321 327 331 339 349 357 361 367 385 391 393 399 409 415
421 427 429 433 451 463 475 477 483 487 489 495 511 517 519 529 535
537 541 553 559 577 579 583 591 601 613 615 619 ...
3.3.4 Lucky Prime Numbers
Let us now consider the numbers that are simultaneously prime and
lucky. The direct production of these lucky prime numbers is not so
clear using a unique sieve (what would be the filtering
function?). However, a possible solution is to reuse the two previous
sieves and to build a system made of two sub-systems, one producing
prime numbers and the other producing lucky numbers. The values
produced by the two sub-systems are comming from the same event and
are output by the module out2 which outputs a value when two
generation of it are received during the same instant, meaning thus
that the value has passed all the prime filters but also all the lucky
filters. The definition of out2 is:
module out2 (event_t event)
local int k, int res, int all;
while (1) do
await (local(event));
{local(k) = 0; local(all) = 0;}
while (!local(all)) do
get_value (local(event),local(k), (void**)&local(res));
{
if (return_code () == ENEXT) {
local(all) = 1;
if (local(k) == 2) output (local(res));
}
local(k)++;
}
end
end
end module
The module filter_prime is changed to let it produce its
results as values of an event. The system is the following:
module main ()
{
event_t go = event_create ();
event_t result = event_create ();
out2_create (result);
producer_create (go);
filter_lucky_create (1,go,result);
filter_prime_create (go,result);
}
end module
The list of lucky prime numbers is:
3 7 13 31 37 43 67 73 79 127 151 163 193 211 223 241 283
307 331 349 367 409 421 433 463 487 541 577 601 613 619 631 643 673
727 739 769 787 823 883 937 991 997 1009 1021 1039 1087 1093 1117 1123
1201 1231 1249 1291 1303 1459 1471 1543 1567 1579 1597 1663 1693 1723
1777 1801 1831 1879 1933 1987 2053 2083 2113 2221 2239 2251 2281 2311
2467 2473 2557 2593 2647 2671 2689 2797 2851 2887 2953 2971 3037 3049
3109 3121 3163 3187 3229 3259 3301 3307 3313 ...
This example shows how code reuse can be made easier by the existence
of common instants shared by all the threads.
3.4 Data-Flow Programming
|
In this section, one considers data-flow programming and shows how to
implement it in Loft.
3.4.1 Introduction
One considers programs made of processes connected through
channels. Channels are basically unbounded FIFO
files. Processes can put values in channels (this is always possible
as channels are unbounded) and get values from them. Each channel as
at most one producer (a process that fills it) and at most one
consumer (a process that empties it). Thus, there is no
concurrency on channels accesses, except between producers and
consumers. These programs are called data-flow programs because
processes are basically creating and transforming flows of
values. They were first introduced in [23], and
they are actually the initial computing model at the basis
of dataflow synchronous languages.
There are two variants: in Nets of Sequential Processes, the
consumer of an empty channel remains stuck until some value is
(possibly) produced by the producer of the channel. In Nets of
Reactive Processes, all processes share the same instants, and a
process can thus detect that a channel is empty (and thus it can avoid
to get stuck on it) during one instant. In what follows, one does not
distinguish between the two variants.
3.4.2 Channels
Channels are basically lists of cells containing values, with an
access to the first cell and to the last cell. Moreover, an event is
associated to each channel for signaling that a new value is put in
it. Cells hold integers of type value_t .
typedef struct channel_t {
cell_t first;
cell_t last;
int length;
event_t new_value;
} *channel_t;
The function to put a value into a channel is:
void put (channel_t chan,value_t v)
{
if (chan->length == 0) {
chan->last = cell_create (v);
chan->first = chan->last;
generate (chan->new_value);
} else {
chan->last->next = cell_create (v);
chan->last = chan->last->next;
}
chan->length++;
}
Note the generation of the event hold by the channel, to signal the
consumer that the channel is no more empty. Extraction of a value
from a channel is performed by the function extract :
value_t extract (channel_t chan)
{
cell_t head;
value_t res;
if (chan->length == 0) return NULL;
head = chan->first;
chan->first = head->next;
res = head->val;
free (head);
chan->length--;
return res;
}
The module get blocks execution until a first value can be
extracted out of a channel:
module get (channel_t in,value_t *res)
while (local(in)->length == 0) do
await (local(in)->new_value);
if (local(in)->length == 0) then cooperate; end
end
{(*local(res)) = extract (local(in));}
end module
If the channel is not empty, the value is immediately extracted from
it and assigned to the result variable. Otherwise, the event
associated to the channel is awaited. Note that a process can extract
several values from a channel during the same instant. Thus, the event
associated to a channel may be present while the channel is empty,
because the channel has been already emptied during the same
instant. There are two consequences:
- One must always test if the channel is empty, after
receiving the event.
- In case the channel is empty (which means that all the values
in it have been previously extracted), the module should cooperate, to
avoid cycling.
A macro is defined to replace runs of the get module:
#define GET(chan) \
while (chan->length == 0) do\
await (chan->new_value);\
if (chan->length == 0) then cooperate; end\
end
3.4.3 Processes
Processes are modules. One just presents here two basic modules: the
first one produces integer values on its output channel, and the
second one prints the values received on its input channel. The
producer is3:
module producer_process (value_t from,value_t incr,channel_t out)
local value_t val;
{local(val) = local(from);}
while (1) do
if (local(out)->length < 10) then
{
put (local(out),local(val));
local(val) += local(incr);
}
end
cooperate;
end
end module
The out_process process is:
module out_process (channel_t in)
while (1) do
GET (local(in))
printf ("%d ",extract (local(in)));
fflush(stdout);
end
end module
Programs are built by connecting processes with channels. Each
channel should have at most one producer and at most one consumer. For
example, the following program is made of a channel (created with the
function InitChannel ) connecting an instance of the process
Producer to an instance of the process Out :
module main()
{
channel_t c = channel_create ();
producer_process_create (0,1,c);
out_process_create (c);
}
end module
The program prints the list of integers.
3.4.4 Prime Numbers
One programs in the dataflow style the Eratosthenes sieve to produce
prime numbers. A list of filters corresponding to already produced
prime numbers are put in sequence and connected through channels. A
number that is able to pass all the filters is prime. A new filter is
introduced for each new prime number that rejects multiples of it. The
filter module is:
module filter_process (int index,int base,channel_t in,channel_t out,filter_t filter)
while (1) do
GET (local(in))
{
int v = extract (local(in));
local(index)++;
if (local(filter) (local(index),local(base),v))
put(local(out),v);
}
end
end module
The list of filters is constructed by the module sift_process :
module sift_process (channel_t in,channel_t out,filter_t filter)
local int k;
{local(k) = 1;}
while (1) do
GET (local(in))
{
channel_t intern = channel_create_in (current_scheduler ());
int v = extract (local(in));
put(local(out),v);
local(k)++;
filter_process_create (local(k),v,local(in),intern,local(filter));
local(in) = intern;
}
end
end module
Finally, the prime numbers are produced by the following program,
where the filtering function primes (of type filter_t ) filters the multiples of a base value:
int primes (int index,int base,int value)
{
return value%base;
}
module main()
{
channel_t c1 = channel_create ();
channel_t c2 = channel_create ();
producer_process_create (3,2,c1);
sift_process_create (c1,c2,primes);
out_process_create (c2);
}
end module
1: This example is considered in the book General
Programming Concepts: Writing and Debugging Programs belonging to
the AIX system documentation (2nd edition, 1999).
2: thanks to Fabrice Peix
3: One supposes that the number of
unusued valued is limited to 10.
|