Tutoriel sur une introduction à l'API Java 8 Concurrency, application à ExecutorService

Cet article a pour but la présentation de l’utilitaire ExecutorService défini dans l'API Java Concurrency de la bibliothèque java.util.concurrent. Cet utilitaire propose un mécanisme très simple pour adresser les problématiques de multi-threading dans un programme. Il a été introduit dans le langage Java depuis sa version 5 et a vu sa syntaxe modifiée avec le temps au gré des évolutions. Son principal apport est qu’il nous dispense des traitements « bas niveaux » et nous permet plutôt de mettre l’accent sur la définition et la conception du modèle de tâches à exécuter. Dans « bas niveaux », nous entendons, par exemple : le démarrage/arrêt des tâches, la gestion propre des tâches périodiques/sporadiques, des offsets, de l’ordonnancement, la gestion de l'interaction entre tâches…

Une plus-value de cet article est que nous présenterons cet utilitaire dans sa version Java 8 avec l’introduction des expressions lambda. Par des exemples comparatifs, nous remarquerons à quel point il est plus simple et moins verbeux de faire du traitement multi-threads en utilisant cette API native de Java.

Cet article est à destination de personnes ayant déjà de bonnes notions en développement Java et désireux de connaître les concepts avancés du langage.


Réagissez à cet article en faisant vos remarques ici : 4 commentaires Donner une note  l'article (5)

Article lu   fois.

L'auteur

Site personnel

Liens sociaux

Viadeo Twitter Facebook Share on Google+   

I. Définitions et contexte

La notion de thread ou tâche (en français) constitue le creuset de tout le contenu de cet article. Il est donc important de la définir et d’en situer le contexte d’utilisation. Dans mes articles POSIX ou encore OSEK-VDX, j’ai déjà eu à définir ce terme et expliquer son rôle dans une application dite multitâche. Nous rappelons cette définition ci-dessous.

Une tâche ou thread en anglais, est une file d’exécution dans un programme. C’est-à-dire, une suite d’instructions compilées qui s’exécute de façon « indépendante » avec pour but de répondre à un besoin demandé. C’est ce caractère indépendant qui fait toute l’originalité d’un thread, en ce sens que pendant qu’il s’exécute, il n’est censé théoriquement dépendre d’aucun autre thread. Dans la majeure partie des cas, même si un thread dispose de sa propre pile d’exécution, il peut arriver que des threads partagent des ressources (exemple : espace mémoire, etc.) communes.

D’un point de vue système d’exploitation, un thread peut être vue comme une sorte de sous-programme reconnu par ce dernier et qu’il ordonne au processeur de votre ordinateur de l’exécuter.

De façon générale, il existe deux sortes de thread. Les threads dits légers et les threads dits lourds (encore appelés Processus). Il faut retenir que tout programme Java (et même plus largement tout programme informatique) dispose d’un thread lourd (processus) représenté par la méthode :

processus main()
Sélectionnez
public static void main(String[] args)

qui représente sa file d'exécution principale. Ce processus main() est le point d'entrée de votre programme et a pour charge d’effectuer tout le besoin attendu. Si ce besoin est un travail énorme à traiter et donc coûteux en temps, une bonne conception est de faire en sorte que le thread en charge donne naissance à un ou plusieurs autres threads (dits légers) qui vont donc se partager le travail et s’exécuter en parallèle. C’est là le sens même de la notion de multi-threading : découper le travail à faire en lots et les confier à des threads qui vont s’exécuter en parallèle afin de nous faire gagner en temps. La figure ci-dessous illustre une exécution parallèle de threads :

Illustration de l'exécution parallèle de threads

En réalité, une parallélisation concrète de plusieurs threads sur un ordinateur, exige que ce dernier dispose d’un processeur à plusieurs cœurs ou tout simplement de plusieurs processeurs. Ceci afin de permettre que chaque processeur (ou cœur) puisse exécuter à un instant t son propre thread. Mais faire du multi-threading sur un ordinateur ayant un processeur à un seul cœur est possible ; plusieurs algorithmes ayant été développés pour implémenter un simulacre de parallélisme (exemple : l’algorithme du round-robin ou tourniquet). Mais nous ne détaillerons pas plus cette notion ici.

Une dernière chose à savoir à propos des threads est qu'en Java, il existe deux sortes de threads légers : les threads système (system threads) et les threads définis (defined threads). Les threads système sont ceux lancés par votre JVM (Java Virtual Machine). Exemple, le garbage collector est un thread système. Les threads définis sont ceux créés explicitement dans un programme par le développeur.

Dans cet article nous nous intéressons aux threads légers de type définis. Nous apprendrons comment les déclarer et les manager via l’utilitaire ExecutorService en Java 8.

Pour finir cette section, nous vous présentons le schéma ci-dessous qui illustre une vue abstraite d’un programme multitâche Java sur un ordinateur :

Gestion de threads dans un programme

II. Préambule

Avant de commencer les enseignements techniques de ce cours, nous allons d'abord fixer les éléments de langage. Dans la section III, les mécanismes de création de threads que nous allons présenter seront qualifiés d'Ancienne version, car il s'agit du paradigme initial qui a été introduit dans le langage Java depuis sa création pour le traitement des threads. Dans la section IV, nous qualifierons de nouvelle version le mécanisme de traitement des threads utilisant l'API Java Concurrency via l'utilitaire ExecutorService. Mais il convient de noter que l'appellation ancienne version ne caractérise en rien l'aspect obsolète du dit mécanisme, car d'expérience, il reste encore très usité dans les applications de nos jours. De même l'appellation nouvelle version ne renvoie en rien à l'aspect récent de l'API Java concurrency, car il a été introduit depuis 2004 dans la version Java 5. Mais son inconvénient est qu'elle reste encore moins connue du grand public alors qu'elle représente un outil puissant et évolué pour le traitement des threads. D'où l'objet de ce tutoriel de contribuer à sa vulgarisation.

III. Les threads en Java : ancienne version

D'un point de vue ancienne version, le langage Java dispose d'une classe appelée Thread qui permet de créer des threads. Il existe globalement deux façons de créer un thread.

III-A. Utilisation ou extension de la classe mère Thread

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
public class MainClass {

	public static void main(String[] args) {
		
		Thread myThread = new Thread (){
			public void run(){
				//TODO corps du thread
				System.out.println("en train d'effectuer un travail...");
			}
		};
		
		myThread.start();
	}
}

La méthode run() représente le corps du travail à réaliser par le thread myThread et doit toujours être redéfinie. Le démarrage du thread est explicite et est effectué par l'instruction myThread.start(). Au cas contraire, le thread ne sera jamais exécuté.

III-B. Utilisation de l'interface Runnable

L'interface Runnable s'utilise avec un object de type Thread. C'est-à-dire, une instance de la classe Thread elle-même ou une instance d'une classe qui hérite de Thread.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
public class MainClass {

	public static void main(String[] args) {
		
		Runnable myRunnable = new Runnable (){
			public void run(){
				//TODO corps du thread
				System.out.println("en train d'effectuer un travail...");
			}
		};
		
		(new Thread(myRunnable)).start();
	}
}

III-C. Exemple de mise en œuvre d'un thread périodique

Nous entendons par thread périodique, un thread qui doit s'exécuter de façon périodique. C'est-à-dire, qui se déclenche après un intervalle de temps bien défini. Dans l'exemple ci-dessous, nous avons un thread qui doit exécuter son travail dans une boucle, mais la contrainte est qu'après une itération, le thread doit attendre 3000ms soit trois secondes avant d'effectuer la prochaine itération. D'où l'utilisation de la méthode sleep().

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
public class MainClass {

	public static void main(String[] args) {
		
		Runnable myRunnable = new Runnable () {
			public void run() {
				for (int i = 0; i < 5; i++) {
					//TODO corps du thread
					try {
						System.out.println("j'effectue un travail à l'instant : " + System.currentTimeMillis());
						Thread.sleep(3000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		};
		
		(new Thread(myRunnable)).start();
	}
}
Résultat
Sélectionnez
j'effectue un travail à l'instant : 1536011424606
j'effectue un travail à l'instant : 1536011427607
j'effectue un travail à l'instant : 1536011430607
j'effectue un travail à l'instant : 1536011433607
j'effectue un travail à l'instant : 1536011436608

Si vous faites la différence entre les différents instants qui s'affichent, vous constaterez que l'on se rapproche des 3000ms de périodicité souhaitée, mais pas toujours 3000 avec exactitude.

III-D. Remarques

Nous avons présenté rapidement dans les sous-sections ci-dessus, les techniques de création et un exemple d'utilisation des threads à la façon ancienne version. Vous pouvez d'ores et déjà remarquer ce que nous disions dans le synopsis de ce cours : il vous incombe de gérer vous même le démarrage de votre thread sinon, il ne se lancera pas seul. Il vous incombe de gérer par vous même les problèmes de traitement périodique d'un thread, etc.

De plus, si nous revenons à l'exemple de la section III-C, je vous disais que la différence des instants de traitements périodiques n'était pas exactement de 3000ms. Exemple entre les instants 1536011427607 et 1536011424606 vous avez 3001ms et pas 3000ms comme souhaité au départ. Ce problème est connu et s'appelle le problème de la dérive d'horloge. Nous ne le développerons pas ici. Mais, s'il nous avait été demandé d'implémenter une périodicité stricte de notre thread dans l'exemple précédent, nous aurions été très embêté, car il nous reviendrait de gérer la dérive d'horloge à la main par nous-même, ce qui est très technique, pas évident et en réalité s'écarte du besoin métier à réaliser.

C'est au vu de ces types de problématiques illustrées non exhaustivement dans le paragraphe précédent que l'API Java Concurrency a été créée pour libérer le développeur des tâches bas niveaux en lui offrant des services préparés qui implémentent les solutions de la plupart des problèmes d'ordre technique de gestion des threads qu'il pourra rencontrer. Ce qui lui permet donc de plus consacrer son temps à la conception de ses threads et l'implémentation proprement dite du besoin à réaliser.

IV. Les threads en Java : nouvelle version

Le but de cette section est de vous introduire à l'API Java 8 Concurrency.

IV-A. Les interfaces fonctionnelles Runnable et Callable

L'API java.util.Concurrency (via son utilitaire ExecutorService) s'appuie essentiellement sur les interfaces Runnable et Callable. Avec l'avènement de Java 8, un nouveau terme a vu le jour : la notion d'interface fonctionnelle qui signifie tout simplement une interface Java comportant exactement une et une seule méthode publique.

IV-A-1. L'interface Runnable

Nous l'avons déjà présenté dans les sections ci-dessus. Du fait que cette interface comporte uniquement la méthode publique run(), en Java 8, elle est devenue une interface fonctionnelle. Elle se présente comme suit :

Interface fonctionnelle Runnable
Sélectionnez
1.
2.
3.
public interface Runnable {
	public void run();
}

L'implémentation en expression lambda de la méthode run() qu'il faudra retenir pour la suite de ce cours est :

Expression lambda implémentant la méthode run()
Sélectionnez
Runnable r = () -> { /*corps de la tâche*/ }

Cette écriture représente tout simplement une fonction qui ne prend aucun paramètre en entrée (symbolisé par ()) et qui effectue un travail ne retournant aucun résultat (symbolisé par { /*corps de la tâche*/ }).

IV-A-2. L'interface Callable

Cette interface peut être nouvelle pour vous. Mais sachez qu'il s'agit d'une interface similaire à Runnable à la seule différence que son unique méthode call() renvoie un résultat. Cette interface peut être paramétrée et se présente comme suit :

Interface Callable
Sélectionnez
1.
2.
3.
public interface Callable<T> {
	public T call();
}

L'implémentation en expression lambda de la méthode call() qu'il faudra retenir pour la suite de ce cours est :

Expression lambda implémentant la méthode call()
Sélectionnez
Callable<T> c = () -> { /*corps de la tâche*/ return t; }

Cette écriture représente tout simplement une fonction qui ne prend aucun paramètre en entrée (symbolisé par ()) et qui effectue un travail et retourne un objet t instance de la classe paramétrée T (symbolisé par { /*corps de la tâche*/ return t; }).

IV-A-3. Exemple d'application

Reprenons l'exemple de la section III-C sur l'implémentation d'un thread périodique. Sa mise en place en Java 8 à l'aide de l'interface fonctionnelle Runnable couplée à l'utilisation de l'expression lambda donne ceci :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
public class MainClass {

	public static void main(String[] args) {
		
		Runnable myRunnable = () -> {
				for (int i = 0; i < 5; i++) {
					//corps du thread
					try {
						System.out.println("j'effectue un travail à l'instant : "+ System.currentTimeMillis());
						Thread.sleep(3000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			};
		
		(new Thread(myRunnable)).start();

	}
}
Résultat
Sélectionnez
j'effectue un travail à l'instant : 1541535229652
j'effectue un travail à l'instant : 1541535232652
j'effectue un travail à l'instant : 1541535235652
j'effectue un travail à l'instant : 1541535238652
j'effectue un travail à l'instant : 1541535241652

Vous pouvez remarquer que ce nouveau code est moins verbeux que le précédent...
Nous allons maintenant vous présenter dans les paragraphes qui suivent, l'utilitaire ExecutorService.

IV-B. l'API ExecutorService

ExecutorService est une interface Java qui propose plusieurs services (méthodes). Elle est intégrée à la l'API java.util.concurrent. Une présentation exhaustive de cette interface se trouve iciAPI ExecutorService. Dans ce cours, nous nous focalisons sur six méthodes principales que nous récapitulons dans le tableau suivant :

Prototype de la méthode Description
void execute(Runnable command) Méthode qui exécute tout thread de type Runnable, ne retourne rien.
<T> Future<T> submit(Callable<T> task) Méthode paramétrée qui exécute tout thread de type Callable, retourne un objet instance du type paramétré T, lui-même encapsulé dans l'objet java.util.concurrent.Future.
Future<?> submit(Runnable task) Méthode qui exécute tout thread Runnable, retourne un objet qui peut être de tout type (cf. ?).
<T> T invokeAny(Collection<? extends Callable<T> > tasks) Méthode paramétrée qui exécute une collection de threads Callable, retourne un objet instance du type paramétré T, correspondant au résultat du premier thread de la collection ayant fini de s'exécuter. Tous les autres threads n'ayant pas fini sont stoppés.
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) Méthode paramétrée qui exécute une collection de threads Callable, retourne une liste de résultats Future<T> dont l'ordre respecte le même positionnement des threads dans la collection d'entrées. L'ensemble des résultats est obtenu et utilisable que lorsque tous les threads en entrée ont terminé leur exécution.
void shutdown() Demande l'arrêt explicite d'un thread s'il n'a plus de tâche à exécuter.


Et la classe Future c'est quoi ?

Il s'agit d'une classe définie dans java.util.concurrent dédiée à l'exploitation du résultat d'exécution d'un thread en tenant compte des problématiques d'accès en contexte multi-threading. Ces problématiques peuvent se résumer à la question : comment faire pour récupérer le résultat de l'exécution d'un thread sachant qu'on ne sait pas à priori s'il a effectivement terminé son traitement, s'il a levé une exception ou s'il a été suspendu ? Nous ne vous présenterons pas toutes les méthodes de cette classe que vous pourrez étudier dans vos approfondissements. Nous nous arrêtons sur deux méthodes des plus usitées :

Méthode Description
boolean isDone() L'appel de cette méthode via une instance de la classe Future, teste la terminaison du thread pour savoir si la donnée résultante de son exécution est disponible. Retourne true si ce thread s'est bien terminé ou s'il a malheureusement levé une exception pendant son exécution, ou enfin s'il a été suspendu.
T get() L'appel de cette méthode via une instance de la classe Future, retourne le résultat (instance de T) de la tâche exécutée par le thread. Si le thread n'a pas terminé son exécution, alors l'appel de cette méthode est bloqué en attente active jusqu'à ce qu'il termine.


Ensuite, comment utilise-t-on l'interface ExecutorService pour créer concrètement nos threads ?

Puisque ExecutorService est une interface, elle doit être implémentée pour être utilisée concrètement. Il existe une classe paramétrée en fonction de son constructeur qui implémente cette interface. Ci-dessous son constructeur générique :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
public ThreadPoolExecutor (
	int corePoolSize,    // nombre de threads constant dans le pool
	int maximumPoolSize, // nombre maximum de threads que peut contenir le pool
	long keepAliveTime,  // durée maximale d'attente pour terminer un thread s'il est oisif
	TimeUnit timeUnit,   // unité de temps du paramètre keepAliveTime
	BlockingQueue<Runnable> workQueue //file d'attente des nouveaux threads (de type Runnable) créés et non encore en exécution dans le pool
	);

La particularité de cette classe d'implémentation est qu'elle apporte des solutions préparées et prêtes à l'utilisation pour différentes problématiques de multi-threading. Dans java.util.concurrent, et comme nous le verrons dans les exemples à venir, il existe une classe nommée Executors, utilisant le concept de polymorphisme d'objets, qui encapsule cette classe d'implémentation et qui en fonction de ses méthodes appelées crée pour nous l'instance souhaitée. Nous résumons dans le tableau suivant, quelques méthodes (il en existe plus) permettant d'avoir accès à cette classe d'implémentation :

Classe d'implémentation Type de retour Description
Executors.newSingleThreadExecutor() ExecutorService Classe qui crée un thread unique qui sera chargé d'exécuter un besoin donné.
Executors.newFixedThreadPool(int nbreThreads) ExecutorService Classe qui crée un pool de nbreThreads threads qui s'exécuteront en parallèle de façon à traiter plusieurs besoins simultanément.
Executors.newCachedThreadPool() ExecutorService Classe qui prépare un pool réutilisable de threads. La création concrète d'un thread est faite à la demande.
Executors.newSingleThreadScheduledExecutor() ScheduledExecutorService Classe qui crée un thread unique paramétrable pour pouvoir s'exécuter périodiquement ou après un délai spécifié.
Executors.newScheduledThreadPool(int nbreThreads) ScheduledExecutorService Classe qui crée un pool de threads paramétrables pour s'exécuter périodiquement ou après un délai spécifié.

V. Exemples d'utilisation de l'API ExecutorService

Avant de commencer à vous déverser du code Java dans tous les sens dans cette section, nous précisons qu'un article sur l'API ExecutorService a déjà été écrit pour la version Java 7. De plus, il serait intéressant de vous faire imager un exemple de besoin métier qui peut être traité à base de multi-threading :

Imaginez que vous travaillez au siège d'un grand groupe commercial qui possède plus de 500 magasins repartis dans le monde et qui commercialisent des milliers de produits alimentaires. On vous demande de développer un programme Java qui sera capable chaque jour de rendre le service suivant :

  1. Calculer le chiffre d'affaires journalier de chaque magasin sur tous les produits qui ont été vendus. Chacun de ces magasins disposant d'une base de données qui recense tous les produits vendus et leurs prix de vente pendant une journée;
  2. Se servir des chiffres d'affaires précédents pour calculer le chiffre d'affaires global journalier du groupe commercial.

Suite à cette demande, si vous concevez un programme Java mono-thread, vous risquerez de livrer une solution inacceptable au groupe commercial, car votre programme pourrait retourner un résultat après de très nombreuses heures voire même de jours. Pour éviter un temps de réponse trop long de votre application, il serait judicieux de concevoir une solution à base de multi-threads. Les threads devant se morceler le travail global à faire pour les exécuter en parallèle, afin d'agréger les différents résultats intermédiaires pour compiler le résultat final qui est le chiffre d'affaires journalier du groupe commercial.

Cependant, l'exemple de besoin métier ci-dessus que nous venons de vous présenter succinctement et non exhaustivement (car il peut être encore complexifié) est un vrai problème et ce n'est pas ce que nous nous proposons de résoudre pour la suite de ce cours. Je voulais juste que cela impacte notre esprit et nous conduise à la prise de conscience de l'importance de s'intéresser aux solutions de multi-threading avec notamment l'API java.util.concurrent qui nous simplifie beaucoup de choses.

Pour la suite de ce cours donc, nous allons nous inspirer quelque peu de l'exemple ci-dessus en définissant de petits sujets simplifiés qui y en découlent et d'y illustrer des solutions de traitement multi-threads à base du framework ExecutorService.

V-A. Executors.newSingleThreadExecutor()

La classe d'implémentation obtenue via l'instruction Executors.newSingleThreadExecutor(), permet de créer un unique thread qui viendra s'exécuter en parallèle à son thread créateur. Elle est généralement utilisée lorsque l'on n'a pas besoin de créer plus d'un thread pour le dédier au traitement voulu. Dans cette section, nous focalisons son utilisabilité au travers des deux méthodes : execute() et submit(). Notons que les autres méthodes invokeAny() et invokeAll() sont tout à fait aussi utilisables, mais c'est moins intéressant dans ce contexte.

V-A-1. Méthode execute()

Lorsque l'objectif est de créer un thread unique qui n'a pas besoin de retourner de résultat, il est simple d'utiliser la méthode execute() qui prend en paramètre un objet Runnable chargé d'effectuer le besoin attendu.

 
Sélectionnez
void execute(Runnable command)

Pour illustrer son utilisation, supposons que dans un des magasins du groupe commercial, en plus d'un traitement long qu'on aimerait effectuer, on voudrait aussi lister tous les produits qui y sont vendus. On voudrait que le temps de réponse du programme conçu à cet effet soit optimal. Dans l'exemple suivant, en plus du thread main() qui effectue le travail principal, nous définissons un autre thread qui est chargé de lister les produits en magasin présents dans sa base de données. Pour des raisons de simplicité, nous simulons cette base de données par l'objet static listeProduits. Nous ferons pareil sur tous les autres exemples.

Illustration de la méthode execute(Runnable command)
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ProductDisplayer {

	final static List<String> PRODUCTS_LIST = Arrays.asList("Miel", "Chocolat", "Pomme", "Chocolat");
	
	public static void main(String[] args) {
		
		ExecutorService displayerThreadService = Executors.newSingleThreadExecutor();
		try {
			displayerThreadService.execute(() -> {
				PRODUCTS_LIST.forEach(System.out::println);
			});
		}catch(Exception e) {
			System.out.println("L'exécution du thread ne s'est pas bien passée " + e.getMessage());
		}finally {
			if(displayerThreadService != null) displayerThreadService.shutdown();
		}

		//Traitement principal du thread main()
	}

}
Résultat
Sélectionnez
Miel
Chocolat
Pomme
Concombre

Lors de la création d'un thread ou pendant son exécution, une exception peut être levée. Il convient de le traiter, d'où la gestion de notre thread dans un bloc try-catch. Un thread créé via l'API ExecutorService a besoin d'être explicitement arrêté s'il a terminé son travail (pour libérer la mémoire de la JVM), d'où le bloc finally.

V-A-2. Méthode submit()

Lorsque l'objectif est de créer un thread unique qui doit retourner un résultat, il convient d'utiliser la méthode submit() qui prend en paramètre un objet Callable chargé d'effectuer le besoin attendu et qui via sa méthode call() retourne le résultat attendu.

 
Sélectionnez
<T> Future<T> submit(Callable<T> task)

Pour illustrer son utilisation, supposons que dans un des magasins du groupe commercial, en plus d'un traitement long qu'on aimerait effectuer, on voudrait aussi obtenir le nombre total des quatre produits (miel, chocolat, pomme et concombre) qui y sont vendus. On voudrait que le temps de réponse du programme conçu à cet effet soit optimal. Dans l'exemple suivant, en plus du thread main() qui effectue le travail principal, nous définissons un autre thread qui est chargé de calculer la somme des produits présents dans sa base de données. Pour des raisons de simplicité, nous simulons cette base de données par l'objet static mapProduits.

Illustration de la méthode submit(Callable task)
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class ProductCounter {

	final static Map<String, Integer> PRODUCTS_MAP = new HashMap<String, Integer>();
	static {
		PRODUCTS_MAP.put("Miel", 1200);
		PRODUCTS_MAP.put("Chocolat", 522);
		PRODUCTS_MAP.put("Pomme", 347);
		PRODUCTS_MAP.put("Concombre", 701);
	};

	public static void main(String[] args) throws InterruptedException, ExecutionException {

		ExecutorService counterThreadService = Executors.newSingleThreadExecutor();
		try {
			Future<Integer> sommeProduits = counterThreadService.submit(() -> {
				Stream<Integer> streamValues = PRODUCTS_MAP.values().stream();
				Optional<Integer> optResult =  streamValues.reduce(Integer::sum);
				return optResult.isPresent() ? optResult.get() : null;
			});
			
			//exploitation du résultat du thread counterThread.
			System.out.println("Nombre total de produits dans le magasin : " + sommeProduits.get(3, TimeUnit.SECONDS));
			
		} catch (Exception e) {
			System.out.println("L'exécution du thread ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (counterThreadService != null) counterThreadService.shutdown();
		}
				
		// Traitement principal du thread main()
	}

}
Résultat
Sélectionnez
Nombre total de produits dans le magasin : 2770

Nous avons choisi dans cet exemple d'utiliser un Stream pour user de sa fonction reduce qui calculera pour nous la somme des éléments qu'il contient. Puisque lorsqu'un thread est créé il s'exécute indépendamment de son créateur, l'instruction counterThread.awaitTermination(3, TimeUnit.SECONDS) permet au thread créateur main() de se synchroniser sur le thread counterThread et d'y attendre pendant trois secondes (vous mettez ce que vous voulez) qu'il termine. Si pendant ces trois secondes le thread counterThread se termine, alors vous pouvez exploiter son résultat...

V-B. Executors.newFixedThreadPool()

Pour traiter un problème donné, si nous avons besoin de créer un nombre exact n (n > 1) de threads, alors cette classe d'implémentation est bien indiquée. Il permet de créer un pool de n threads et de leur répartir le travail à effectuer. Avec cette classe d'implémentation, nous pouvons naturellement utiliser ses méthodes submit() ou execute(), mais ici nous la présenterons au travers des méthodes invokeAny() et invokeAll(). Par ailleurs, il convient de noter que le choix du nombre de threads avec cette classe doit être fait de façon à ne pas inverser l'objectif de rendre performant le temps de réponse de l'application. Car créer un programme avec 1000 threads par exemple peut plutôt avoir l'effet de ralentir les traitements... Généralement, le choix est fait en fonction du nombre de cœurs ou de processeurs de la machine qui exécutent le programme.

V-B-1. Méthode invokeAll()

La méthode invokeAll() permet de démarrer en parallèle plusieurs threads de type Callable et de retourner dans l'ordre leurs résultats d'exécution. Cette méthode synchronise les threads de façon à rendre la main uniquement lorsque tous les threads ont fini leurs exécutions.

 
Sélectionnez
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

Considérons cet exemple bidon d'école : supposons que le groupe commercial dispose déjà de la liste des chiffres d'affaires de ses 500 magasins. L'objectif est de calculer la somme totale de ces chiffres d'affaires individuels pour obtenir le chiffre d'affaires global journalier du groupe. On voudrait que l'implémentation proposée utilise deux threads qui se partageront chacun la somme des chiffres d'affaires de 250 magasins pour qu'à la fin on ne puisse qu'avoir à additionner les deux résultats.

Voici un exemple de programme implémentant une solution à ce problème :

Illustration de la méthode invokeAll(...)
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BinaryOperator;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class ChiffreDaffaireGroupe {

	static LongSupplier CHIFFRE_AFFAIRE_INDIV = () -> {
		//un chiffre d'affaires sera compris entre 1000 et 2000 euros.
		return  1000 + Math.round(1000*Math.random());
	};
	//les 500 chiffres d'affaires des magasins du groupe
	final static LongStream LIST_CHIFFRES_AFF_INDIV = LongStream.generate(CHIFFRE_AFFAIRE_INDIV).limit(500);

	public static void main(String[] args) throws InterruptedException, ExecutionException {

		ExecutorService chiffreAffaireThreadsService = Executors.newFixedThreadPool(2);
		try {
			List<Long> listeChiffresDaffaires = LIST_CHIFFRES_AFF_INDIV.boxed().collect(Collectors.toList());
			Callable<Long> premierThread = () -> {
				return calculSommeIntermediaire(listeChiffresDaffaires.subList(0, 250), 1);
			};
			Callable<Long> secondThread = () -> {
				return calculSommeIntermediaire(listeChiffresDaffaires.subList(250, 500), 2);
			};
			List<Callable<Long>> listeThreadsAexecuter = Arrays.asList(premierThread, secondThread);
			
			List<Future<Long>> listChiffresDaffaireResultat = chiffreAffaireThreadsService.invokeAll(listeThreadsAexecuter);
			
			//exploitation du résultat des threads.
			Long chiffreDaffaireTotal = 0L;
			for (Future<Long> future : listChiffresDaffaireResultat) {
				chiffreDaffaireTotal += future.get();
			}
			System.out.println("Chiffre Affaires total du groupe : " + chiffreDaffaireTotal);
			
		} catch (Exception e) {
			System.out.println("L'exécution des threads ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (chiffreAffaireThreadsService != null) chiffreAffaireThreadsService.shutdown();
		}
				
		// Traitement principal du thread main()
	}
	
	private static Long calculSommeIntermediaire(List<Long> list, int i) {
		BinaryOperator<Long> operateurSomme = (elt1, elt2) -> elt1 + elt2;
		Long chiffreAffaireIntermediaire2 = list.stream().reduce(0L, operateurSomme);
		System.out.println("Somme Chiffre d'affaires intermédiaire " + i + " : " + chiffreAffaireIntermediaire2);
		return chiffreAffaireIntermediaire2;
	}

}
Résultat
Sélectionnez
Somme Chiffre d'affaires intermédiaire 1 : 374335
Somme Chiffre d'affaires intermédiaire 2 : 368592
Chiffre Affaires total du groupe : 742927

V-B-2. Méthode invokeAny()

La méthode invokeAny est très proche de invokeAll que nous avons présenté dans la section ci-dessus. La seule différence est qu'elle stoppe l'exécution des threads qui lui sont transmises en paramètre dès qu'il y a en un qui a fini son exécution. C'est le résultat de ce dernier qui est donc retourner.

 
Sélectionnez
<T> T invokeAny(Collection<? extends Callable<T> > tasks)

L'utilisation de cette méthode invokeAny est adaptée, par exemple aux problématiques de recherche/fouille d'un élément dans un très grand ensemble. On pourrait donc effectuer une dichotomie de ce grand ensemble et affecter chaque sous-ensemble à un thread. Dès qu'un thread trouve l'élément recherché, la méthode n'a plus besoin de perdre du temps à attendre la fin d'exécution des autres threads, elle retourne simplement le résultat voulu et termine les autres threads.

Reprenons notre exemple de la section précédente avec les 500 chiffres d'affaires des magasins du groupe commercial. Pour avoir un résultat rapide, nous voulons que deux threads effectuent la recherche d'un chiffre d'affaires égale à 1500 euros et informe si oui ou non il est contenu dans la liste. Chacun des threads doit effectuer sa fouille sur 250 magasins.

Voici un exemple d'implémentation :

Illustration de la méthode invokeAny(...)
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class RechercheChiffreDaffaire {
	static LongSupplier CHIFFRE_AFFAIRE_INDIV = () -> {
		//un chiffre d'affaires sera compris entre 1000 et 2000 euros.
		return  1000 + Math.round(1000*Math.random());
	};
	//les 500 chiffres d'affaires des magasins du groupe
	final static LongStream LIST_CHIFFRES_AFF_INDIV = LongStream.generate(CHIFFRE_AFFAIRE_INDIV).distinct().limit(500);

	public static void main(String[] args) throws InterruptedException, ExecutionException {

		ExecutorService chiffreAffaireThreadsService = Executors.newFixedThreadPool(2);
		try {
			List<Long> listeChiffresDaffaires = LIST_CHIFFRES_AFF_INDIV.boxed().collect(Collectors.toList());
			Callable<String> premierThread = () -> {
				return direSiChiffreAffaireExiste(listeChiffresDaffaires.subList(0, 250), 1500L, 1);
			};
			Callable<String> secondThread = () -> {
				return direSiChiffreAffaireExiste(listeChiffresDaffaires.subList(250, 500), 1500L, 2);
			};
			List<Callable<String>> listeThreadsAexecuter = Arrays.asList(premierThread, secondThread);
			
			String resultat = chiffreAffaireThreadsService.invokeAny(listeThreadsAexecuter);
			
			//exploitation du résultat des threads.
			System.out.println(resultat);
			
		} catch (Exception e) {
			System.out.println("L'exécution des threads ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (chiffreAffaireThreadsService != null) chiffreAffaireThreadsService.shutdown();
		}
		
		// Traitement principal du thread main()
	}
	
	private static String direSiChiffreAffaireExiste(List<Long> list, Long eltRecherche, int i) throws ExecutionException {
		if (list.contains(1500L)) {
			return "Le thread  " + i + " a trouvé le CA 1500 recherché";
		}
		throw new ExecutionException("Aucun thread n a trouvé le CA 1500 recherché", null);
	}

}

Trois possibilités disjointes de résultat en rejouant plusieurs fois l'application :

Résultat Possible
Sélectionnez
Le thread  2 a trouvé le CA 1500 recherché
Résultat Possible
Sélectionnez
Le thread  1 a trouvé le CA 1500 recherché
Résultat Possible
Sélectionnez
L'exécution des threads ne s'est pas bien passée java.util.concurrent.ExecutionException: Aucun thread n a trouvé le CA 1500 recherché

V-C. Executors.newCachedThreadPool()

La méthode newCachedThreadPool fonctionne exactement comme newFixedThreadPool (vue précédemment) à la différence que les threads sont créés à la demande dans le pool dédié. C'est-à-dire par exemple que si deux threads sont en cours d'exécution dans le pool et qu'une autre tâche arrive, alors un troisième thread sera créé et dédié à l'exécution de cette tâche. Nous illustrons cette classe d'implémentation avec ses méthodes submit et invokeAll en reprenant les exemples vus dans les sections précédentes :

submit
Sélectionnez
<T> Future<T> submit(Callable<T> task)
invokeAll
Sélectionnez
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

V-C-1. Méthode submit()

L'exemple que nous présentons ici est inspiré de celui de la section V-A-2 où nous rajoutons un thread qui recherche le magasin qui a le plus petit nombre de produits. Interprétation : initialement, la méthode newCachedThreadPool crée un thread pour effectuer la somme des produits. Entre-temps, la demande d'un autre thread est effectuée (via la deuxième méthode submit) pour réaliser la recherche du minimum produit. Ainsi, si lorsque la deuxième demande est effectuée, et si le premier thread n'avait pas terminé, le pool se retrouvera avec deux threads.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class ProductCounter2 {

	final static Map<String, Integer> PRODUCTS_MAP = new HashMap<String, Integer>();
	static {
		PRODUCTS_MAP.put("Miel", 1200);
		PRODUCTS_MAP.put("Chocolat", 522);
		PRODUCTS_MAP.put("Pomme", 347);
		PRODUCTS_MAP.put("Concombre", 701);
	};

	public static void main(String[] args) throws InterruptedException, ExecutionException {

		ExecutorService counterThreadService = Executors.newCachedThreadPool();
		try {
			//Execution du 1er thread
			Future<Integer> sommeProduits = counterThreadService.submit(() -> {
				Stream<Integer> streamValues = PRODUCTS_MAP.values().stream();
				Optional<Integer> optResult =  streamValues.reduce(Integer::sum);
				return optResult.isPresent() ? optResult.get() : null;
			});
			
			//Ajout d'un second thread pour execution
			Future<Integer> minimumProduits = counterThreadService.submit(() -> {
				Stream<Integer> streamValues = PRODUCTS_MAP.values().stream();
				Optional<Integer> optResult =  streamValues.min((Integer e1, Integer e2) -> e1.compareTo(e2) );
				return optResult.isPresent() ? optResult.get() : null;
			});
			
			//exploitation du résultat du thread counterThread.
			System.out.println("Nombre total de produits dans le magasin : " + sommeProduits.get());
			System.out.println("Nombre total de produits dans le magasin : " + minimumProduits.get());

			
		} catch (Exception e) {
			System.out.println("L'exécution du thread ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (counterThreadService != null) counterThreadService.shutdown();
		}
				
		// Traitement principal du thread main()
	}

}
 
Sélectionnez
Nombre total de produits dans le magasin : 2770
Nombre total de produits dans le magasin : 347

V-C-2. Méthode invokeAll()

L'exemple que nous présentons ici transcrit celui de la section V-B-2 en utilisant simplement la méthode newCachedThreadPool.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BinaryOperator;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class ChiffreDaffaireGroupe {

	static LongSupplier CHIFFRE_AFFAIRE_INDIV = () -> {
		//un chiffre d'affaires sera compris entre 1000 et 2000 euros.
		return  1000 + Math.round(1000*Math.random());
	};
	//les 500 chiffres d'affaires des magasins du groupe
	final static LongStream LIST_CHIFFRES_AFF_INDIV = LongStream.generate(CHIFFRE_AFFAIRE_INDIV).limit(500);

	public static void main(String[] args) throws InterruptedException, ExecutionException {

		ExecutorService chiffreAffaireThreadsService = Executors.newCachedThreadPool();
		try {
			List<Long> listeChiffresDaffaires = LIST_CHIFFRES_AFF_INDIV.boxed().collect(Collectors.toList());
			Callable<Long> premierThread = () -> {
				return calculSommeIntermediaire(listeChiffresDaffaires.subList(0, 250), 1);
			};
			Callable<Long> secondThread = () -> {
				return calculSommeIntermediaire(listeChiffresDaffaires.subList(250, 500), 2);
			};
			List<Callable<Long>> listeThreadsAexecuter = Arrays.asList(premierThread, secondThread);
			
			List<Future<Long>> listChiffresDaffaireResultat = chiffreAffaireThreadsService.invokeAll(listeThreadsAexecuter);
			
			//exploitation du résultat des threads.
			Long chiffreDaffaireTotal = 0L;
			for (Future<Long> future : listChiffresDaffaireResultat) {
				chiffreDaffaireTotal += future.get();
			}
			System.out.println("Chiffre Affaires total du groupe : " + chiffreDaffaireTotal);
			
		} catch (Exception e) {
			System.out.println("L'exécution des threads ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (chiffreAffaireThreadsService != null) chiffreAffaireThreadsService.shutdown();
		}
				
		// Traitement principal du thread main()
	}
	
	private static Long calculSommeIntermediaire(List<Long> list, int i) {
		BinaryOperator<Long> operateurSomme = (elt1, elt2) -> elt1 + elt2;
		Long chiffreAffaireIntermediaire2 = list.stream().reduce(0L, operateurSomme);
		System.out.println("Somme Chiffre d'affaires intermédiaire " + i + " : " + chiffreAffaireIntermediaire2);
		return chiffreAffaireIntermediaire2;
	}

}
 
Sélectionnez
Somme Chiffre d'affaires intermédiaire 1 : 374569
Somme Chiffre d'affaires intermédiaire 2 : 376298
Chiffre Affaires total du groupe : 750867

V-D. Executors.newSingleThreadScheduledExecutor()

L'instance que renvoie la méthode newSingleThreadScheduledExecutor() provient d'une classe qui implémente l'interface ScheduledExecutorService elle-même étendant l'interface ExecutorService. Cette instance comporte en plus de ce que nous avons déjà présenté dans les sections précédentes, beaucoup d'autres caractéristiques liées à la gestion du temps de démarrage des threads créés. Plus concrètement, la méthode newSingleThreadScheduledExecutor() de la factory Executors nous permet de créer un unique thread qui pourra être démarré :

  1. Avec un delai donné, ce qu'on appelle techniquement offset;
  2. De façon sporadique; c'est-à-dire après une durée minimum de temps d;
  3. De façon périodique, c'est-à-dire exactement après chaque durée de temps d.

V-D-1. Méthode schedule()

La méthode schedule() est identique à la méthode submit(), sauf qu'elle ajoute la possibilité de démarrer le thread créé après un délai donné. Cette méthode gère les threads de type Callable et Runnable et voici leurs prototypes :

 
Sélectionnez
1.
2.
3.
4.
ScheduledFuture<?> schedule(Runnable command,  //tâche de type Runnable à exécuter
	long delay, // délai à attendre dès maintenant avant de démarrer effectivement la tâche
	TimeUnit unit // unité de temps liée au paramètre delay
	)
 
Sélectionnez
1.
2.
3.
4.
ScheduledFuture<V> schedule(Callable<V> command,  //tâche de type Runnable à exécuter
	long delay, // délai à attendre dès maintenant avant de démarrer effectivement la tâche
	TimeUnit unit // unité de temps liée au paramètre delay
	)

La classe ScheduledFuture est une classe fille de la classe Future que nous avons présentée dans les sections précédentes. Elle ajoute la possibilité de gérer l'aspect temps associé à la méthode schedule().

Dans l'exemple ci-dessous, nous illustrons l'utilisation de la méthode schedule() où nous retardons de 10 secondes l'exécution effective de la tâche qui affiche les produits.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ProductDisplayer {

	final static List<String> PRODUCTS_LIST = Arrays.asList("Miel", "Chocolat", "Pomme", "Chocolat");

	public static void main(String[] args) {

		ScheduledExecutorService displayerThreadService = Executors.newSingleThreadScheduledExecutor();
		try { 
			LocalDateTime dateDebut = LocalDateTime.now();
			System.out.println("Date de début : " + dateDebut);
			
			Runnable command = () -> {
				LocalDateTime dateDemarrage = LocalDateTime.now();
				System.out.println("Date de démarrage : " + dateDemarrage);
				System.out.println("Délai : " + ChronoUnit.SECONDS.between(dateDebut, dateDemarrage) + " Secondes");
				
				PRODUCTS_LIST.forEach(System.out::println);
			};
			
			displayerThreadService.schedule(command, 10, TimeUnit.SECONDS);
			
		} catch (Exception e) {
			System.out.println("L'exécution du thread ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (displayerThreadService != null)
				displayerThreadService.shutdown();
		}

		// Traitement principal du thread main()
	}

}
Resultat
Sélectionnez
Date de début : 2018-11-26T22:38:31.429
Date de démarrage : 2018-11-26T22:38:41.491
Délai : 10 Secondes
Miel
Chocolat
Pomme
Chocolat

V-D-2. Méthode scheduleWithFixedDelay()

Cette méthode permet de créer des threads de façon sporadique. C'est-à-dire, par exemple, que s'il crée un thread après chaque trois minutes et que le temps d'exécution pire cas d'un thread, prend une minute, alors le prochain thread sera démarré avec un minimum de 3 + 1 = 4 minutes, et ainsi de suite. De plus, elle ajoute la possibilité de démarrer le premier thread après un délai initial donné (comme pour le cas de la méthode schedule()). Voici le prototype de cette méthode :

 
Sélectionnez
1.
2.
3.
4.
5.
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, // tâche de type Runnable à exécuter
	long initialDelay, // délai initial avant le 1er démarrage de la tâche
	long delay,  	   // délai minimum de re-déclenchement de la tâche
	TimeUnit unit	   // unité de temps des paramètres initialDelay et delay
	)

L'exemple ci-dessous présente un thread qui une fois créé, démarre concrètement après 5 secondes, puis s'exécute toutes les 10 secondes au moins.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ProductDisplayer {

	final static List<String> PRODUCTS_LIST = Arrays.asList("Miel", "Chocolat", "Pomme", "Chocolat");
	static int INDEX = 0;

	public static void main(String[] args) {

		ScheduledExecutorService displayerThreadService = Executors.newSingleThreadScheduledExecutor();
		try { 
			System.out.println("Date de début : " + LocalDateTime.now());
			Runnable command = () -> {
				System.out.println("Article " + INDEX + " : " + PRODUCTS_LIST.get(INDEX) + " -> Affiché à la date : " + LocalDateTime.now());
				INDEX++;
				if(INDEX > 4) return;
			};
			
			displayerThreadService.scheduleWithFixedDelay(command, 5, 10, TimeUnit.SECONDS);
			
			//on fait le processus main() attendre pendant 55 secondes
			//pour voir afficher le résultat des threads
			displayerThreadService.awaitTermination(55, TimeUnit.SECONDS);
			
		} catch (Exception e) {
			System.out.println("L'exécution du thread ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (displayerThreadService != null) displayerThreadService.shutdown();
		}

		// Traitement principal du thread main()
	}

}
Résultat
Sélectionnez
Date de début : 2018-11-26T22:48:32.237
Article 0 : Miel -> Affiché à la date : 2018-11-26T22:48:37.291
Article 1 : Chocolat -> Affiché à la date : 2018-11-26T22:48:47.307
Article 2 : Pomme -> Affiché à la date : 2018-11-26T22:48:57.324
Article 3 : Chocolat -> Affiché à la date : 2018-11-26T22:49:07.340

En observant le résultat de la console de ce programme, on peut remarquer que l'affichage du premier produit est fait 5 secondes après la date de début du programme. Ensuite, les autres affichages se font tous les 10 secondes et quelques centièmes d'intervalle.

V-D-3. Méthode scheduleAtFixedRate()

Cette méthode fonctionne exactement comme la méthode scheduleWithFixedDelay() vue dans la section précédente. La seule différence réside dans le fait que la périodicité de création du nouveau thread qui doit exécuter la tâche est stricte. C'est-à-dire, que si l'on definit une période de trois minutes par exemple, que la durée d'exécution d'une tâche dure quatre minutes, alors les autres tâches seront retardées et attendrons que la tâche précédente se termine. Pas de notion de re-entrance donc.

 
Sélectionnez
1.
2.
3.
4.
5.
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, // tâche de type Runnable à exécuter
	long initialDelay, // délai initial avant le 1er démarrage de la tâche
	long period,  	   // délai exact de re-declenchement de la tâche
	TimeUnit unit	   // unité de temps des paramètres initialDelay et period
	)

Nous reprenons l'exemple ci-dessus en utilisant plutôt la méthode scheduleAtFixedRate().

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ProductDisplayer {

	final static List<String> PRODUCTS_LIST = Arrays.asList("Miel", "Chocolat", "Pomme", "Chocolat");
	static int INDEX = 0;

	public static void main(String[] args) {

		ScheduledExecutorService displayerThreadService = Executors.newSingleThreadScheduledExecutor();
		try { 
			System.out.println("Date de début : " + LocalDateTime.now());
			Runnable command = () -> {
					System.out.println("Article " + INDEX + " : " + PRODUCTS_LIST.get(INDEX) + " -> Affiché à la date : " + LocalDateTime.now());
					INDEX++;
					if(INDEX > 4) return;
			};
			
			displayerThreadService.scheduleAtFixedRate(command, 5, 10, TimeUnit.SECONDS);
			
			//on fait le processus main() attendre pendant 55 secondes
			//pour voir afficher le résultat des threads
			displayerThreadService.awaitTermination(55, TimeUnit.SECONDS);
			
		} catch (Exception e) {
			System.out.println("L'exécution du thread ne s'est pas bien passée " + e.getMessage());
		} finally {
			if (displayerThreadService != null) displayerThreadService.shutdown();
		}

		// Traitement principal du thread main()
	}

}
Résultat
Sélectionnez
Date de début : 2018-11-26T23:14:23.465
Article 0 : Miel -> Affiché à la date : 2018-11-26T23:14:28.519
Article 1 : Chocolat -> Affiché à la date : 2018-11-26T23:14:38.520
Article 2 : Pomme -> Affiché à la date : 2018-11-26T23:14:48.521
Article 3 : Chocolat -> Affiché à la date : 2018-11-26T23:14:58.521

L'on peut remarquer que la période d'exécution de chaque thread est plus stricte (quasi égale à 10 secondes) contrairement à celle de la méthode scheduleWithFixedDelay.

V-E. Executors.newScheduledThreadPool()

newScheduledThreadPool() est pour newSingleThreadScheduledExecutor() ce que newFixedThreadPool() est pour newSignleThreadExecutor(). En d'autres termes, la méthode newScheduledThreadPool() possède les mêmes caractéristiques que newSingleThreadScheduledExecutor(), mais propose un pool de n threads qui peuvent être démarrés en parallèle. Chacun exécutant sa tâche de façon sporadique ou périodique. De ce fait, cette méthode est plus adaptée lorsque l'on veut démarrer plusieurs threads à la fois.

 
Sélectionnez
ScheduledExecutorService Executors.newScheduledThreadPool(int corePoolSize)

VI. Accéder au code source de cet article

Les différents codes source présentés dans cet article sont consultables ici : https://gitlab.com/gkemayo/javaconcurrency

VII. Conclusion

Dans ce cours, nous avons présenté les deux paradigmes Java qui permettent de faire des traitements multi-threads : l'implémentation via la classe Thread et l'implémentation via l'API ExecutorService. Nous avons mis l'accent sur le paradigme basé sur l'API ExecutorService et sa dérivée ScheduledExecutorService qui offre des solutions préparées aux problématiques techniques rencontrées lors des traitements multi-threads. Ce qui donne donc au programmeur la possibilité de plus se concentrer sur les aspects métier de sa conception. Ce cours n'a fait qu'aborder quelques concepts de l'API ExecutorService en particulier et du monde du multi-threading Java en général; il ne se prétend donc pas exhaustif. Enfin, ce tutoriel met l'accent sur la présentation de ces différentes notions par des exemples de petits programmes basés sur les concepts Java 8.

VIII. Pour aller plus loin

Face à certains problèmes complexes, on peut être amené à faire face à des problématiques non abordées dans ce tutoriel :

  • - gestion de l'accès concurrent aux données ;
  • - gestion de l'exclusion mutuelle ;
  • - gestion de la condition de concurrence (race condition) ;
  • - gestion de processus concurrents ;
  • - etc.

La bibliothèque java.util.concurrent ne se limite pas à tout ce que nous avons présenté dans ce cours. Il est beaucoup plus vaste. Sans être exhaustif et sans aucun ordre, nous pouvons citer d'autres concepts que cette bibliothèque propose et qu'il serait intéressant de connaître : CyclicBarrier, ForkJoinPool, ForkJoinTask, RecursiveTask, Parallel Stream, type de données intégrant la gestion d'accès concurrent (AtomicBoolean, AtomicInteger, ConcurrentHashMap, CopyOnWriteArrayList, LinkedBlockingDeque, ...), etc.

IX. Remerciements

Je tiens à remercier Mickael Baron, responsable de la rubrique Java sur ce forum, pour sa relecture technique de cet article. Un grand merci également à Jacques THERY pour sa relecture orthographique.

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

  

Les sources présentées sur cette page sont libres de droits et vous pouvez les utiliser à votre convenance. Par contre, la page de présentation constitue une œuvre intellectuelle protégée par les droits d'auteur. Copyright © 2018 Georges KEMAYO. Aucune reproduction, même partielle, ne peut être faite de ce site ni de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts.