3. TD numéro 3 : Quelques exemples de programation concurrente

Objectif

Le but est de mieux maitriser les sockets, les threads, la synchronisation et la concurrence et de comprendre comment fonctionne les fonction wait(), notifyall() et * interrupt()*, isinterrupted. Un autre objectif est de montrer comment la programmation concurrente permet de simplifier la conception en reportant les problèmes de concurrence et de synchronisation sur le noyau de programation distribuée de la JVM.

3.1. Une Version Multi-Thread du serveur du Td1.

Exercice 1

On veut reprendre le serveur du TD1 afin de traiter plusieurs clients à la fois.

  1. Écrire une classe exécutable ServiceClient (inspirée du TD1) dont la partie exécutable (i.e la méthode run() sert un client. Notez que cette méthode fait presque exactement ce que le méthode analogue faisait dans la version séquentielle. On passera bien entendu à l’objet exécutable la Socket sur laquelle est connecté le client, plus eventuellement quelques informations annexes telles le numéro du port sur lequel le client est connecté.
  2. Écrire une classe principale Repartiteur qui attend sur un port et qui à chaque arrivée de client crée un thread ServiceClient qui le gère. La différence avec la version séquentielle du programme est la suivante : en sequentiel le répartiteur appelle la méthode ServiceClient et attend quelle se termine, en version parallèle le répartiteur crée une tâche qui sert le client et continue son calcul sans attendre que le service du client soit terminé.
  3. Écrire ou reprendre une classe Client (la plus simple possible) qui se connecte à un serveur et lui envoie les données saisies dans la console, ou contenues dans un fichier. Transformer Client pour rendre cet objet exécutable dans un thread.
  4. Pour terminer on veut simuler la connexion de nombreux clients. On va donc écrire une classe GenClients qui crée \(N\) Threads Clients (pex 5) qui se connectent tous au serveur et envoient toutes les deux secondes un message (quelconque tel que appel numéro X) au serveur et cela M fois (pex 10) .

3.2. La gestion des interruptions en Java

Notez que dans le question précédente il y a \(2N\), soit \(2\) threads par client:

  • le premier créé par le serveur (i.e par le répartiteur).
  • le second par le générateur de clients.

On voudrait maintenant pouvoir arrêter le serveur, pour cela il faut pouvoir lui indiquer qu’il doit cesser de s’exécuter et bien entendu il doit alors aussi indiquer au threads fils qu’ils doivent aussi se terminer.

On va donc avoir besoin d’utiliser des interruptions de thread. La méthode stop est déconseillée et est obsolète. À la place on utilise deux techniques. Ces techniques supposent que le concepteur à pris en compte la possibilité d’une demande d’arrêt du thread lors de la conception (ce qui est conseillé). C’est un choix de Java, on ne peut pas tuer un thread mais simplement lui demander de s’arreter.

3.2.1. L’interruption de Thread

Si on appelle la méthode interrupt() sur un objet thread un drapeau est positionné ce qui permet au thread de vérifier si il a été interompu ou non. Schématiquement on est dans la situation ci-dessous :

  1. Le programme maitre envoie une interruption au thread via la méthode monThread.interrupt().
  2. Le thread peut tester si une interruption a eu lieu via la méthode Thread.interrupted().
  3. La méthode interrupted() appartient à la classe Thread, elle est donc utilisable directemment sur votre Thread lorsqu’il est construit en étendant la classe Thread via la directive extends. Si le thread est construit à partir d’un objet exécutable qui implémente la méthode run() il faut récupérer le thread courant. On le fait en utilisant la méthode \(Thread.currentThread()\)

La méthode Thread.currentThread()

//exemple extrait du tutoriel d'oracle.
import java.lang.*;
public class ThreadDemo implements Runnable {

 ThreadDemo() {
 // main thread
 Thread currThread = Thread.currentThread();
 // thread created
 Thread t = new Thread(this, "Admin Thread");

 System.out.println("current thread = " + currThread);
 System.out.println("thread created = " + t);
 // this will call run() function
 t.start();
 }

 public void run() {
 System.out.println("This is run() method");
 }
 public static void main(String args[]) {
 new ThreadDemo();
 }

Exemple d interruption

  // Methode run du thread ..
  public  void run() {
   while (!Thread.interrupted() ) {}
   }
   // prog principal
  public static void main(String[] args) throws InterruptedException {
  Thread monThread = new Thread(c);
  monThread.start();
  Thread.sleep(5000);
  monThread.interrupt();
}

Nota Bene : Dans le cas d une implémentation utilisant extends Runnable, on écrira

while(!Thread.currentThread().interrupted())

Exercice 2

Utiliser des interruptions pour stopper les thread serveur quand on arrete le serveur. Pour cela

  1. le serveur stokera les references au threads créés dans une liste.
  2. Le serveur n’acceptera que 5 clients puis il attendra 10 secondes et interrompera les thread engendrés.
  3. les threads de service verifieront si oui ou non il ont été interrompus.

Optionnel:

Réfléchir à comment interrompre le serveur après X secondes.

3.2.2. L’utilisation d’un champ volatile

Un champs volatile est mis à jour immédiatement pour tous les threads utilisant l’objet. C’est la JVM qui assure cette propriété, un tel champs est ainsi intrinsèquement synchronisé et ceci permet de synchroniser des threads mais aussi de les faire communiquer. Dans l’exemple ci dessous le champs done est utilisé afin de transmettre l’ordre au thread de se terminer via la méthode seterminer. Notez bien qu’ici on communique via un champs de l’objet exécutable alors qu’avec la méthode utilisation des interruptions on utilise le thread lui même et les fonctionnalité de gestion de threads de la JVM.

 public final class ObjetExecutable  implements Runnable {
 private volatile boolean done = false;
 public void seterminer() {done = true;}

public synchronized void run()
{
  while (!done)
     {}
 }

public static void main(String[] args) throws InterruptedException {
  ObjetExecutable container = new ObjetExecutable();
  Thread myThread = new Thread(container);
  myThread.start();
  Thread.sleep(5000);
  container.seterminer();
}

Exercice 3 : Écrire une classe qui lance un thread qui compte de 0 à l’infini mais qui lui ordonne de se terminer après 10 secondes.

3.3. L’Attente d’évènement : wait() notify() et notifyAll()

La méthode wait() a pour objet de fournir un moyen d’attendre qu’une condition soit vérifiée (pex qu’une place de parking soit libre, qu’une ressource soit libérée, qu’un client arrive). Et ce sans saturer le système de tests en vérifiant la condition en permanence. C’est en effet le système qui va reveiller le thread quand la condition aura potentiellement changé.

Attention La méthode *wait()* s’applique à un objet et elle est invoquée par un thread, à priori toute référence à un objet peut être utilisée.

Le fonctionement est le suivant :

  1. lors de l’invocation, le thread qui invoque la méthode libère le verrou qu’il detient sur l’objet (verrou par exemple posé à l’entrée d’un bloc synchronisé). Ceci permet en général à d autres threads de progresser.
  2. Le thread s´endort.
  3. Quand un notify est effectué sur l’objet (par un autre thread), un des threads endormi est reveillé (on ne peut pas préjuger lequel).
  4. Le thread reveillé reprend alors la main sur l’objet (ie le verrou associé à la référence de cet objet).
  5. Enfin ce thread reprend son exécution.

L’intérêt principal de wait() est d’attendre qu’une condition soit debloquée en utilisant peu de ressources système (c.a.d sans faire de boucle vérifiant si la condition est vraie) et sans non plus bloquer les autres tâches.

Exemple d’utilisation de wait pour rentrer et sortir d’un parking

Ici on attend que le parking ne soit plus plein, autrement dit tant que le parking est plein le thread dort.

synchronized (Parking)
{
while (Parking.full())
{
System.out.format("Parking plein j'attend\n");
Parking.wait();
System.out.format("%s is awake now \n",  iD);
}
Parking.entrer(this)
}

Les méthodes notify() et notifyAll() sont en quelque sorte les symétriques de wait, elles aussi s’appliquent à un objet et elles sont invoquées par un thread, à priori toute référence à un objet peut être utilisée. Le fonctionement est le suivant :

  1. notifyAll() va reveiller tout les threads mis en attente par un wait sur l’objet (i.e le verrou) utilisé, dès que le verrou sera relaché (p.ex. quand on sortira du bloc synchronisé). Ensuite il est probable qu’un seul de ces threads continue son fil de calcul, ce sera celui à qui la JVM va attribuer le verrou.
  2. notify va reveiller un des thread mis en attente par un wait sur l’objet (i.e le verrou) utilisé dès que le verrou sera relaché.

Exemple Sortie du Parking

synchronized (Parking)
{
System.out.format("Je sors du Parking\n");
Parking.sortir()
Parking.notify()
}

Dans le cas ci-dessous des consommateurs et producteurs nous utilisons notifyAll(), mais dans ce cas précis notify fonctionnera aussi car de toute facon une fois le verrou relaché un unique et arbitraire thread sera autorisé à soit entrer dans la zone critique soit à reprendre le calcul dans la zone critique. Cependant de manière générale notifyAll() est préférable afin d’éviter des situations de famine ou d’interbloquage.

3.3.1. Exemple d’Application : le schéma producteur consommateur

Deux type de threads coexistent, les premier sont les producteurs qui remplissent une file, les seconds la vide. La file étant de taille limitée. nous avons la situation symétrique suivante :

  1. Un producteur doit attendre que la file ne soit plus pleine avant de produire.
  2. Un consommateur doit attendre que la file ne soit plus vide avant de consommer.
  3. Les opérations sur la file doivent être synchronisées (taille, ajout, suppresion).

Exercice 4

  • Écrire une classe exécutable producteur qui va creer sequentiellement des jobs 1,2,3,... l’intervale entre 2 productions sera aléatoire. Le producteur placera ses jobs dans la file d’attente. Un objet de la classe producteur répond aux spécifications suivantes :
    1. Il se construit à partir de la file d’attente (une liste appelée nosJobs) et d’un idenfiant.
    2. la classe implemente une méthode prod permettant d’ajouter un job à la file, cette méthode doit vérrouiller la nosJobs.
    3. si quand prod est invoquée la file nosJobs est pleine le thread doit attendre (wait) que la file ne soit plus pleine.
    4. Ensuite le thread peut ajouter un job à la file.
    5. Enfin quand le producteur réalise une production il doit informer les threads en attente que la file d’attente a changé (notify).
  • Écrire la classe consommateur, qui est analogue mais attend sur une file vide, dépile un job et effectue un notify. A nouveau le temps pour consommer sera aléatoire.
  • Lancer un programme qui utilise une file d’attente de taille 5, crée 7 consommateurs et 5 producteurs et vérifier que tout fonctionne.
  • Jouer avec les paramètres : taille de la file, nombre de producteurs, consommateur, temps moyen de production et consommation.

3.4. Le tri bitonique multithread (quicksort)

Le tri bitonique est une procédure récursive très simple pour trier un tableau :

  1. On coupe le tableau en 2, et on trie les deux parties.
  2. On fusionne les 2 parties en temps linéaire en parcourant les 2 tableaux de concert.

En Java cela ressemble à

public static void bitonicSort(int[] a) {
             if (a.length >= 2) {
                     // découpage
                     int[] left  = Arrays.copyOfRange(a, 0, a.length / 2);
                     int[] right = Arrays.copyOfRange(a, a.length / 2, a.length);

                     // tri des deux parties
                     bitonicSort(left);
                     bitonicSort(right);
                     // fusion des parties
                     fusionner(left, right, a);
             }

Exercice 5:

  1. Écrire une version récursive du tri bitonique (il suffit d’écrire la méthode de fusion).
  2. Ecrire une version multi-thread du tri bitonique, bitonicSort(int[] a, int nCore) qui utilise nCore coeurs pour trier, ou nCore sera une puissance de 2. Quand ncore sera plus grand que 1, L’appel récursif sera remplacé par la création de 2 thread, l’un chargé de trier left et l’autre right, et chacun utilisant ncore/2 coeurs. Sinon on utilisera le tri sequentiel. Attention : La procédure fusionner ne sera pas parrallélisée.
  3. Faire quelques test de performance.

3.5. Vers une application de chat centralisée

Exercice 6:

  1. Écrire un client de chat utilisant 2 threads ou le premier thread (receveur) lit les données envoyées par un serveur et les affiche surl’entrée standard tandis que le second (receveur) écrit et transmet les données saisies au serveur. Pourquoi est il plus simple d’utiliser deux sockets (une montante, une descendante) ?
  2. Coté serveur un client est aussi géré par un seul thread qui écoute le client et écrit tout ce qu’il dit sur les flux sortant de tous les clients. On vérifiera que les problèmes d’accès concurrent aux flux sortant sont bien gérés.
  3. Ajouter un peu de controle afin qu’un client puisse se deconnecter et que les messages soient signés par le client ou que le serveur mantienne la liste des clients connectés.
  4. Réfléchir à comment réaliser un client de chat pair à pair sans arbitre (serveur central). Afin de participer à la discussion un client se connectera à un pair et ce sont les pairs qui diffuseront les messages.

Optionnel

Ajouter d’autre fonctionnalités : chat privé, envoi de fichier, banissement, salon privé.