WOW !! MUCH LOVE ! SO WORLD PEACE !
Fond bitcoin pour l'amélioration du site: 1memzGeKS7CB3ECNkzSn2qHwxU6NZoJ8o
  Dogecoin (tips/pourboires): DCLoo9Dd4qECqpMLurdgGnaoqbftj16Nvp


Home | Publier un mémoire | Une page au hasard

 > 

Stratégies d'optimisation de requêtes SQL dans un écosystème Hadoop

( Télécharger le fichier original )
par Sébastien Frackowiak
Université de Technologie de COmpiègne - Master 2 2017
  

Disponible en mode multipage

Bitcoin is a swarm of cyber hornets serving the goddess of wisdom, feeding on the fire of truth, exponentially growing ever smarter, faster, and stronger behind a wall of encrypted energy

 
 

Stratégies d'optimisation de requêtes SQL dans un écosystème Hadoop

Sébastien FRACKOWIAK

15/01/2018

SOMMAIRE

SOMMAIRE 1

Remerciements 3

Résumé 4

Liste des figures et des tableaux 5

1 Introduction 7

1.1 Entrepôt et Bases de Données Relationnelles 7

1.2 Entrepôt et Bases de Données Big Data 7

1.3 Problématique 7

2 Hadoop 9

2.1 Histoire 9

2.2 Le composant de stockage (HDFS) 9

2.3 Le composant de traitement (MapReduce) 10

2.3.1 Architecture Hadoop v1 10

2.3.2 Architecture Hadoop v2 11

2.3.3 Le paradigme MapReduce 12

2.4 Discussion 16

2.4.1 Du point de vue HDFS 16

2.4.2 Du point de vue YARN 17

3 SQL sur Hadoop 19

3.1 Hive 19

3.1.1 Histoire 19

3.1.2 Architecture 19

3.2 La commande « EXPLAIN » 20

3.2.1 Explication d'une projection simple 21

3.2.2 Explication d'une projection avec une restriction 23

3.2.3 Explication d'une projection avec une restriction et une agrégation 26

3.2.4 Explication d'une jointure entre deux tables 30

3.3 Discussion 32

3.3.1 Requête avec une restriction 32

3.3.2 Requête avec une agrégation 32

3.3.3 Requête avec une jointure et une agrégation 32

4 Optimisation du SQL sur Hadoop 35

4.1 Optimisation par le réglage ou « tuning » 35

4.1.1 Utiliser Tez 35

4.1.2 Contrôler la taille des fichiers manipulés 37

4.1.3 Agréger en amont 39

4.1.4 Réaliser un « benchmark » significatif 39

4.2 Optimisation par la conception ou « design » 40

4.2.1 Utiliser les tables partitionnées 41

4.2.2 Optimiser les jointures 42

4.3 Discussion 46

5 Conclusion 47

6 Bibliographie 48

7 Annexes 49

7.1 Hadoop 49

7.1.1 Partition & Sort détaillé 49

7.1.2 Définition de la distance entre deux noeuds 49

7.2 SQL sur Hadoop 50

7.2.1 Gérer manuellement le partitionnement dans une requête 50

7.2.2 Comprendre la sérialisation sous Hadoop 50

7.3 Optimisation du SQL sur Hadoop 50

7.3.1 Exemple de WordCount avec Tez 50

7.3.2 Grouper les splits avec Tez 50

REMERCIEMENTS

J'aimerais exprimer toute ma gratitude à la Direction Solution Exploitation de la DSI des Réseaux d'Orange, en particulier Jean-Claude Marcovici, Nadine Poinson, Jean-Marc Pageot et Serge Schembri pour m'avoir donné l'opportunité de suivre ce cursus et les moyens de réaliser ce mémoire.

Je voudrais remercier mon tuteur Stéphane Crozat, pour ses conseils durant la rédaction de ce mémoire et son enseignement en Base de Données et en Big Data, ainsi que Dominique Lenne et Thomas Deshais pour leur soutien et leurs encouragements à l'égard de notre promotion tout au long de ce Master.

Toute ma reconnaissance va également versmes collègues développeurs de l'équipe Big Data Technique : Alioune, Bastien, Christophe, Patrick et particulièrement Régis et Thomas pour leur disponibilité.

Enfin, je remerciemon épouse pour ses précieux conseils, ses encouragements et son soutien ainsi que mon fils, mes beaux-parents et ma mère pour nous avoir grandementfacilités cette année un peu particulière.

Sébastien Frackowiak

1 RÉSUMÉ

Le Big Data n'est plus un « buzzword », il est devenu au fil de ces dernières années une réalité.Ainsi, il n'est plus rare que les grandes entreprises disposent d'un « puits de données » contenant une énorme quantité d'informations. C'est le cas d'Orange qui utilise Hadoop.

Dans ce cadre, le moyen le plus courant de programmer des traitements Big Data est d'utiliser Hive qui émule le langage SQL. Mais cette émulation peut être trompeuse car ses mécanismes sous-jacents sont radicalement différents de ceux que nous connaissons bien, ils cachent l'utilisation du paradigme MapReduce.Popularisé par Google, ce paradigme réussit à traiter de gigantesques volumes de données de manière distribuée. C'est son fonctionnement et son application au SQL qui seront détaillés dans ce mémoire afin de pouvoir proposer des stratégies d'optimisation des requêtes.

Il en ressortira qu'une bonne optimisation ne peut se faire sans connaissance fine des données manipulées et étude statistique préalable. Mais surtout, que la multitude d'options et de techniques d'optimisation proposées au développeur nécessiteraient de sa part une compétence similaire à celle d'un administrateur de base de données. C'est l'évolution de cet aspect qui conditionnera l'avenir de Hadoop et de Hive.

LISTE DES FIGURES ET DES TABLEAUX

Figure 1 : architecture Hadoop 1.0

Source (reproduction) : https://fr.hortonworks.com/blog/apache-hadoop-2-is-ga/

Figure 2 : architecture Hadoop 2.0

Source (reproduction) : https://fr.hortonworks.com/blog/apache-hadoop-2-is-ga/

Figure 3 : répartition de trois fichiers dans un cluster HDFS

Figure 4 : phases de création des containers YARN pour l'exécution de l'application « WordCount »

Figure 5 : phases d'exécution de l'application « WordCount »

Figure 6 : architecture de Hive

Figure 7 : diagramme de traitement du parcours d'une table avec projection et restriction

Figure 8 : processus de traitement MapReduce d'une requête avec projection et restriction

Figure 9 : processus de traitement MapReduce d'une requête SQL avec agrégation

Figure 10 : graphe des dépendances d'une requête avec jointure et agrégation

Figure 11 : processus de traitement MapReduce d'une requête SQL avec jointure

Figure 12 : phase « Map » d'une jointure sur un Mapper

Figure 13 : phase « Reduce » d'une jointure sur deux Reducers

Figure 14 : mouvement de données de type « One-To-One »

Source (reproduction) : https://fr.hortonworks.com/blog/expressing-data-processing-in-apache-tez/

Figure 15 : mouvement de données de type « Broadcast »

Source (reproduction) : https://fr.hortonworks.com/blog/expressing-data-processing-in-apache-tez/

Figure 16 : mouvement de données de type « Scatter-Gather »

Source (reproduction) : https://fr.hortonworks.com/blog/expressing-data-processing-in-apache-tez/

Figure 17 : comparaison MapReduce / Tez

Source (reproduction) : https://fr.hortonworks.com/blog/expressing-data-processing-in-apache-tez/

Figure 18 : du Système d'Information au Use Case Big Data

Figure 19 : graphe des dépendances d'une requête SQL avec jointure « Map-Only » (1/2)

Figure 20 : graphe des dépendances d'une requête SQL avec jointure « Map-Only » (2/2)

Figure 21 : Reducer devant traiter une donnée « biaisée » ou « skewed »

Figure 22 : traitement MapReduce détaillant le « Partition & Sort »

Source :Tom White (2015). Hadoop: The definitive guide, page 197

Tableau 1 : temps d'accès cumulé sur 1GB de données

Tableau 2 : centiles du nombre de lignes par date de chargement

Tableau 3 : centiles du nombre de lignes par date fonctionnelle

2 INTRODUCTION

Créé en 2011, Hadoop s'est imposé depuis plusieurs années comme la principale plateforme Big Data. Il représente une réelle rupture dans le traitement des donnéeset il a modifié en profondeur la manière de concevoir et d'utiliser les entrepôts de données.

2.1 Entrepôt et Bases de Données Relationnelles

Un entrepôt de données traditionnel permet de stocker les données provenant de plusieurs applications d'un Système d'Information afin d'en tirer des enseignements et ainsi aider l'entreprise dans ses décisions.

La création d'un tel entrepôt de données impliquela mise en oeuvre d'une Base de Données Relationnelle dont lepréalable fondamental est sa modélisation. Modélisationqui tacitement met en exergue des clés (primaires et étrangères) et des index, ce qui favoriseune manière de requêter les données plutôt qu'une autre.En effet, sans entrer dans les détails du fonctionnement d'un Système de Gestion de Base de Données Relationnelle (SGBDR), il est important de signaler que le requêtage SQL de deux tables jointes l'une à l'autre s'effectue le plus souvent via leurs clés respectives. Ces clés traduisent des contraintes qui garantissent la cohérence des résultats et permettent l'utilisation des fonctions d'indexation qui accélèrent leur obtention.

Lorsque les volumes de données manipulées sont ceux du Big Data, nous pouvons nous permettre de nous passer de ces contraintes trop rigides.

2.2 Entrepôt et Bases de Données Big Data

Les technologies du Big Data permettent dorénavant de stocker sur des systèmes de fichiers distribués de grandes quantités de données, issues principalement de flux continus et de Bases de Données Relationnelles provenant de SI et d'organisations distinctes.

Des solutions exploitant le paradigme distribué MapReduce(qui fait notamment l'objet de ce mémoire) permettent l'interrogation de ces grandes quantités de données, de la même manière que sur un SGBDR classique, en utilisant le langage SQL. Dans ce cas de figure, les données sont structurées dans des tables formant un unique "puits de données", ce qui rend ainsi possible, au travers de jointures, des croisements de données issues d'applications et d'organisations différentes (au sein même d'Orange mais aussi en dehors d'Orange, provenant de l'Open Data par exemple). Avec un SGBDR classique, ce type de traitement aurait pu prendre des jours contre seulement quelques minutes aujourd'hui avec le Big Data.

Si dans une Base de Données Relationnelle, les jointures sont anticipées dès la phase de conception (par identification des clés et des index), ce n'est pas le cas pour un puits de données qui ne couvre aucun cas d'usage de prime abord. De ce fait, les champs utilisés pour joindre deux tables n'expriment aucune contrainte (il s'agit d'une fonction propre aux clés primaires et étrangères des SGBDR) et ne sont pas indexés (il serait hasardeux d'anticiper un plan d'indexation particulier sachant que la conception du puits de données ne préjuge en rien de la manière dont il sera requêté). C'est donc une nouvelle manière d'optimiser les requêtes qui doit être pensée.

2.3 Problématique

L'implémentation "distribuée" du SQL conduit à adopter de nouveaux réflexes, très éloignés de ceux pris durant des années avec les SGBDR. Il devient alors nécessaire de s'interroger sur les leviers permettant d'optimiser les temps d'exécution des requêtes SQL dans un environnement Big Data.

L'objectif de ce mémoire est double : il s'agit premièrement de se construire une connaissance fine de l'environnement Big Data le plus répandu, c'est-à-dire Hadoop, qui est aussi celui sur lequel tout le Big Data d'Orange repose ; deuxièmement, c'est une technologie en plein essor et en rupture par rapport aux autres technologies plus conventionnelles, et les pratiques qu'elle implique nécessitent d'être étudiées spécifiquement afin d'optimiser son fonctionnement.

Par ailleurs, la documentation existante traduit du développement rapide de l'écosystème Hadoop. Nous pouvons ainsi trouver de très nombreux articles témoignant des différentes phases de son évolution, mais peu s'attardentsur son fonctionnement réel. Il paraît donc intéressant et non superflu de faire un état de l'art structuré et à jour de cet écosystème.

Dans ce mémoire, nous reviendrons tout d'abord sur ce qu'est Hadoop, ce qui le compose et la manière dont il fonctionne.

Nous détaillerons ensuite comment le langage SQL est implémenté sous Hadoop par une application nommée Hive. Ce qui nous permettra de mettre en lumière les principales faiblesses du paradigme distribué.

Puis, nous proposerons un ensemble de bonnes pratiques, aussi bien dans la configuration préalable à l'exécution d'une requête SQL que dans la conception des tables et le développement des requêtes.

Finalement, nous achèverons ce mémoire en prenant du recul sur la maturité de Hadoop et son évolution constante.Nous nous interrogerons aussi sur la pertinence des optimisations que nous proposons devant la multitude des possibilités offertes. Enfin, se posera la question de l'avenir de Hadoop dans sa formule actuelle et de son évolution prévisible dans les années à venir.

3 HADOOP

3.1 Histoire

L'histoire de Hadoop commence avec le projet Nutch, initié par Doug Cutting en 2002. Son but était de proposer un moteur de recherche open-source de type « crawler », c'est-à-dire capable de parcourir et d'indexer automatiquement l'ensemble du web, quand les moteurs de recherche de cette époque pratiquaient un référencement manuel des sites web.

Les quantitéscroissantes voire inattenduesde données à stocker et à traiter devinrent une difficulté, l'infrastructure supportant le projet Nutch ne reposaient en effet que sur quelques machines.

En 2003, Google publia un article de recherche concernant un système de fichiers, le GoogleFS (Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung, 2003). Ilse compose d'un cluster de noeuds, le rendant scalable et distribué :

- la taille du FS n'est pas limitée et peut être étendue par ajout d'un nouveau noeud

- la taille de chacun des fichiers qu'il contient n'est pas limitée non plus etces fichiers sont répartis, par blocs, sur plusieurs noeuds

Le GoogleFS est ainsi parfaitement adapté aux traitements distribués.

En 2004, Google publia un nouvel article de recherche concernant un framework de traitement distribué : le MapReduce (Jeffrey Dean and Sanjay Ghemawat, 2004).

Dès 2004, les travaux de Google permirent au projet Nutchde résoudre ses difficultés de stockage et de traitement en concevant une implémentation open-source de GoogleFS et MapReduce :c'est Hadoop. Ce dernier estainsi composéde son propre composant de stockage (HadoopFS ou HDFS) et de son propre composant de traitement (MapReduce).

D'abord utilisé par Yahoo en 2006, puis passé sous licence Apache en 2009, la première version stable de Hadoop fut proposée en 2011.

3.2 Le composant de stockage (HDFS)

Tout comme le GoogleFS, le HDFS repose sur un cluster scalable et distribué (Tom White, 2015). Il est composé d'un noeud principal exécutant le service « NameNode » (maître) et de plusieurs noeuds exécutant un service « DataNode » (esclave).

Les DataNodes contiennent les fichiers découpés en blocs. Par défaut :

- un bloc a une taille de 128MB (contre 4KB pour un FS « classique »),

- un même bloc sera copié sur 3 DataNodes distincts.

Le NameNodemaintient l'arborescence du système de fichier (répertoires, fichiers et blocs composant chaque fichier) et connaît l'emplacement (unDataNode) de chaque bloc constituant chaque fichier.

Il s'ensuit que toute demande de lecture ou d'écriture d'un fichier passera d'abord par le NameNode, soit pour obtenir la liste des blocs et leurs emplacements, soit pour allouer/désallouer de nouveaux blocs.

Ces caractéristiques impliquent que le HDFS est optimisé pour traiter spécifiquement des fichiers de grande taille, le plus rapidement possible. En effet, si la taille des blocs est aussi élevée, c'est pour être en mesure de lire les fichiers à la vitesse permise par le disque. Si ce dernier à une vitesse de lecture de 100MB/s et un temps d'accès de 10ms, un fichier de 128MB sera lu en 1,29s (1,28s+0,01s) et ne sera pas pénalisé pas le temps d'accès. A contrario, la lecture de 128 fichiers de 1MB nécessitera 2,56s (1,28s+0,01s×128) et la nécessité de solliciter autant de DataNodes (si chaque fichier est réparti sur un DataNode différent).

En outre, le HDFS fonctionne en mode « write-once, read-many-times », c'est-à-dire qu'une fois créé, un fichier ne peut être modifié, sauf pour une opération de concaténation ou de suppression. Cela permet de garantir la cohérence des données lorsqu'elles sont sollicitées simultanément. De plus, la modification d'un fichier de très grande taille serait bloquante et donc incompatible avec l'objectif de traiter des fichiers le plus rapidement possible.

Enfin, le HDFS est tolérant aux pannes du point de vue de ses DataNodes, puisque chaque bloc sera recopié 3 fois. En revanche, le NameNode doit être considéré comme un « single point of failure », dans la mesure où il stocke en mémoire l'arborescence du système de fichier et l'emplacement de chaque bloc. Comme un objet nécessite 150B de mémoire (quelle que soit sa taille), une quantité élevée d'objets peut conduire à l'arrêt du NameNode et donc à la panne du HDFS. A titre indicatif, un milliard de fichiers nécessitent 300GB de mémoire vive : pour chaque fichier, il faut considérer 150B pour le fichier et 150B pour le bloc sur lequel il repose.

Le morcellement de HDFS en blocs de données de grande taille est étudié pour être utilisé par une application distribuée : MapReduce.

3.3 Le composant de traitement (MapReduce)

3.3.1 Architecture Hadoop v1

Dansla première version de Hadoop (2011),MapReduce est le composant de traitement et repose sur un cluster scalable et distribué. Il est composé d'un noeud principal exécutant le service « jobtracker » (maître) et de plusieurs noeuds exécutant un service « tasktracker » (esclave).Chaque tasktracker contient un nombre fini de « slots », ressources disponibleset spécialisées pour exécuter des tâches soit « map » (lecture et prétraitement des données) soit « reduce » (agrégation des résultats du « map »).

Le jobtracker :

- reçoit les demandes de jobs à exécuter,

- localiseles DataNodes où se trouvent les données à traiter en interrogeant le NameNode,

- localise les tasktrackers avec des slots disponibles et les plus proches des données ( annexe),

- soumet les tâches« map » ou « reduce » aux tasktrackers,

- coordonne l'exécution de chaque tâche.

Les tasktrackers :

- exécutent les tâches« map » ou « reduce » (dans le slot attribué),

- communiquent périodiquement la progression des tâches qu'ils exécutent au jobtracker. En cas d'erreur, le jobtracker est ainsi capable de resoumettre une tâche sur un tasktracker différent.

A ce stade du chapitre, il est à noter que les composants HDFS et MapReducereposent sur le même cluster de noeuds physiques. Ainsi, leNameNode et le tasktracker (tous deux gérant leur cluster logique respectif) s'exécutent généralement sur des noeuds physiques qui leur sont dédiés (soit un pour deux, soit un chacun) et unDataNodepartage généralement un noeud physique avec un tasktracker.

Figure 1 : architecture Hadoop 1.0

Dans cette première version, la spécialisation des slots « map » ou « reduce » rendait problématique l'allocation des ressources. Ainsi, si tous les slots « map » étaient attribués, il était impossible d'utiliser un slot « reduce » pour effectuer une tâche « map », et le traitement était donc mis en attente. C'est notamment ce point que la version 2 a corrigé.

3.3.2 Architecture Hadoop v2

La seconde version de Hadoop (2012)dissocie la fonction de gestionnaire de ressources de MapReduce et propose un cadre généraliste au-dessus (API) permettant le développement d'applications de traitement distribué.

YARN (YetAnother Resource Negotiator) est le gestionnaire de ressource du cluster et est composé d'un noeud principal exécutant le service « ResourceManager » (maître) et de plusieurs noeuds exécutant un service « NodeManager » (esclave). Chaque NodeManagerpeut contenirun nombre indéfini de « containers », c'est-à-dire, des ressources disponibles pour exécuter tout type de tâches.

MapReduce devientainsi une application YARN, tout comme d'autres applications de traitement distribué telles que Tez et Spark. Ces applications sont des frameworks, ce qui veut dire qu'elles ne spécifient que la logique de traitement distribué et non le traitement en lui-même.Dorénavant, lorsque nous parlerons d'applications MapReduce, Tezou Spark, nous parlerons tacitement d'applications implémentant ces frameworks et décrivant un traitement précis.

Quelle que soit l'application YARN, le fonctionnement sera le même :

Le ResourceManager :

- reçoit les demandes d'applications à exécuter,

- crée un « ApplicationMaster »sur un NodeManager par application,

- attribue à l'ApplicationMaster, les NodeManagers disposant d'assez de ressources disponibles pour créer les containers demandés,

- priorise les demandes de ressources (file d'attente en mode FIFO par exemple).

Les ApplicationMasters :

- estiment la quantité de ressources requiseau regard de laconnaissance de l'application qu'ils exécutent,

- effectuent les demandes de containers au ResourceManager en précisant leur localisation idéale (par exemple, au plus proche des données),

- démarrent les containers sur les NodeManagers attribués par le ResourceManager

Les NodeManagers :

- contiennent les containers démarrés par l'ApplicationMaster,

- exécutent les tâches(au sens générique du terme) demandées par leur ApplicationMaster (dans le container attribué),

- communiquent périodiquement la progression des jobs qu'ils exécutent à l'ApplicationMaster. En cas d'erreur, ce dernierpeut ainsi demander d'autres containers au ResourceManager et leur affecter le job.

L'avantage par rapport à Hadoop v1 est la capacité à proposer plusieurs frameworks de traitement distribué (ne répondant pas tous aux mêmes besoins), sur un même gestionnaire de ressource. En outre, l'introduction de l'ApplicationMaster permet de décentraliser la coordination de l'exécution de chaque tâche et ainsi de ne pas surcharger le ResourceManager. Enfin, la gestion par container permet d'être plus flexible dans l'attribution des ressources, du fait de leur non spécificité.

Figure 2 : architecture Hadoop 2.0

3.3.3 Le paradigme MapReduce

Dans ce mémoire, nous considérerons principalement les frameworks MapReduce et Tez (Hadoop v2). Le premier implémenteexactement le paradigme MapReduce que Google a décrit dans ces travaux de 2004 alors que le second se veut plus généraliste et plus souple (nous y reviendrons). Ainsi, l'écriture d'une application MapReduce consiste à décrire :

- une classe « Map » qui génère un ensemble de couplesclé/valeur

- une classe « Reduce » qui agrège les valeurs intermédiaires selon leur clé

Pour illustrer le principe du paradigme MapReduce, prenons l'exemple du « Word Count », qui permet de compter les occurrences de chaque mot d'un jeu de données.

Considérons un répertoire du HDFScontenant 3 fichiers qui feront l'objet d'un comptage d'occurrences.

Figure 3 : répartition de trois fichiers dans un cluster HDFS

Explication :

Chaque fichier « fichierCouleurs » occupe1 bloc de données dupliqué 3 trois fois sur un cluster de 4 DataNodes (DN) gérés par le NameNode (NN).

Leprogramme « WordCount » suivantimplémenteMapReduce :

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value, Context context)

throwsIOException, InterruptedException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context)

throwsIOException, InterruptedException {

int sum = 0;

for (IntWritableval : values) {

sum += val.get();

}

context.write(key, new IntWritable(sum));

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setNumReduceTasks(2);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}

}

Ce programme s'exécutera dans l'architecture Hadoop et sollicitera ses différents composants :

Figure 4 : phases de création des containers YARN pour l'exécution de l'application « WordCount »

Explication :

· Etape 1

L'application « WordCount » sollicite le ResourceManager afin de déclarer une nouvelle application YARN.

· Etape 2

L'application « Wordcount » sollicite le NameNode afin d'obtenir la liste des blocs composant le ou les fichiers à traiter puis dépose les références vers ces blocs (split) dans un répertoire partagé, où sera également copié le JAR à exécuter.

· Etape 3

Le ResourceManager crée un ApplicationMaster.

· Etape 4

L'ApplicationMaster lit les « input splits » afin de déterminer l'emplacement des données.

· Etape 5

L'ApplicationMaster crée des containers qui pourront exécuter le JAR, sur les NodeManagers les plus proches des DataNodes contenant les données (voir annexe).

L'exécution du JAR contenant le programme « WordCount » et qui implémente MapReduce impliquera les phases suivantes :

Figure 5 : phases d'exécution de l'application « WordCount »

Explication :

· Input Splits

L'ApplicationMaster associe un split à un container. Ce dernier exécutera un et un seul Mapper sur les données pointées par le split.

· Map

Chaque Mapper exécutesur ses données, la méthode « Map » qui lira chaque mot séparément.Pour chaque mot lu, cette méthode générera un couple clé/valeur où la clé, correspondant au mot parcouru, sera associée à la valeur 1. Le fait de rencontrer 2 fois le mot « vert » implique que le couple {vert, 1} sera généré donc deux fois.

· Partition & Sort

A l'issue de la méthode « Map », chaque Mapper déversera les couples clé/valeurgénérés dans un fichier (sur le disque local du Mapper, annexe) où ils seront groupésdans des partitions suivant leur clé.Une partition contientainsi un ensemble de couples clé/valeur et, par construction,chaque clé ne peut figurer que dans une unique partition(par Mapper).

Le nombre de partitions est déterminé dès le lancement de l'application en précisant le nombre de Reducers souhaité, par exemple :

job.setNumReduceTasks(2)

Si le nombre de Reducers n'est pas précisé, il sera fixé par défaut à 1.

Par défaut, le framework MapReduce implémente le calcul de la partition pour chaque clé comme suit :

(key.hashCode() &Integer.MAX_VALUE) % numReduceTasks

Ce procédé permet de répartir les clés de manière équilibréelorsque leur nombre est important.

· Shuffle & Merge & Sort

Chaque Reducer reçoit les partitions identiques de chaque Mapper (par transfert HTTP) pour ensuiteles fusionner entre elles et leur appliquerde nouveau un tri par clé. A l'issue de cette étape, autant de fichiers que de partitions fusionnées auront été créés (sur le disque local du Reducer).

Il est important de noter qu'un Reducer est ainsi garanti de traiter l'intégralité des couples clé/valeur pour une clé donnée et est donc à même d'effectuer seul le traitement de cette clé.

· Reduce

Chaque fichier local est traité par son Reducer qui exécutera la méthode « Reduce » pour chaque clé. Chaque valeur d'une même clé (d'une même couleur dans notre exemple),sera ainsi comptée pour obtenir le nombre d'occurrences.

· Output

Chaque Reducer génère un fichier comportant des couples clé/valeur et affichera sur la sortie standard son contenu. Le résultat, vu de l'utilisateur, est la concaténation de l'ensemble du contenu de ces fichiers.

3.4 Discussion

Cette première partie nous a permis de mieux comprendre le Paradigme MapReduce et au travers d'un premier exemple, d'entrevoir les premiers enjeux et problématiques.

3.4.1 Du point de vue HDFS

Le NameNode répond seul aux sollicitations de toutes les applications, pour leurs besoins de lecture et d'écriture. Nous avons considéré dans notre exemple « WordCount », 3 fichiers de quelques octets. La conséquence immédiate est le besoin de solliciter 3 fois le NameNode au lieu d'une seulesi les données avaient été regroupées dans un seul fichier.

Dans ce contexte de « petits fichiers » :

Ø Le NameNode peut porter atteinte aux performances du cluster. D'une part, parce que la table d'allocation qu'il stocke en mémoire peut se saturer rapidement (au regard des 150B nécessaires pour référencer un fichier, un répertoire, un bloc). D'autre part, parce qu'il devra gérer seul un grand nombre d'opérations sur ces métadonnées (plus il y a de fichiers, plus il y a d'opérations).

Ø Les performances pourront aussi être affectées par la nécessité d'accéder simultanément à beaucoup de fichiers stockés sur les DataNodes. Prenons le cas théorique, d'un ensemble de données faisant 1GB. Selon la répartition de ces données, pouvant aller d'un seul fichier jusqu'à 10 000 fichiers, lestemps d'accès cumulés (à mettre en relation avec la taille totale à traiter) peuvent provoquer le ralentissement du cluster dans son ensemble.

nombre de fichiers

taille en bytes

nombre de blocs

par fichier

temps

d'accès total (sec)

1

1000000000

8,0

0,08

1000

1000000

1,0

10

10000

100000

1,0

100

100000

10000

1,0

1000

Tableau 1 : temps d'accès cumulés sur 1GB de données suivant le nombre de fichiers

3.4.2 Du point de vue YARN

Dans ce même contexte de « petits fichiers » :

Ø L'ApplicationMaster réalise les demandes de création de containersauprès du ResourceManager, pour exécuter des tâches Map ou Reduce sur les NodeManagers. Chaquebloc étant associé à une tâche Map (exécuté par un container), il faudra donc impliquer un plus grand nombre de containers lors de traitement sur des petits fichiers.De plus, les ressources sont négociées au travers d'une file d'attente et elles ne seront pas systématiquement disponibles en même temps.

Ø L'ApplicationMaster réalise également la gestion des tâches entre elles (notamment, leur synchronisation), et ce coût est fixe quel que soit le volume de données manipulées.Ainsi, les performances de traitement d'un grand nombre de tâches ne manipulant que peu de données seront fortement impactées.

A la lumière de ces observations, nous constatons qu'il faudra être vigilant et bien maîtriser la quantité de petits fichiers afin d'éviter de ralentir les performances du cluster et donc du traitement.

4 SQL SUR HADOOP

4.1 Hive

4.1.1 Histoire

Hive est un projet initié en 2007 par Facebook(AshishThusoo, 2009). Le réseau social disposait alors d'une infrastructure de traitement de données classique, basée sur un entrepôt de données construit surun SGBDR du marché.Année après année, la quantité de données à traiter devenait de plus en plus importante et l'architecture en place devint rapidement obsolète. Hadoop s'est ainsi rapidement imposé comme la solution à ces difficultés de montée en charge : c'est un projet Open Source et surtout, il propose une infrastructure scalable déjà éprouvée.

Si la capacité de traitement de l'infrastructure Hadoop était aisément extensible et si les traitements s'exécutaient dorénavant en quelques heures au lieu d'un jour ou plus, il fallait en revanche les développer en utilisant le paradigme MapReduce. Les utilisateurs des entrepôts de données, plus habitués au SQL, n'étaient pas forcément à l'aise dans l'écriture de programmes de ce type.

L'équipe de Jeff Hammerbacher créa alors Hive, un entrepôt de données au-dessus de Hadoop, dont le langage d'interrogation des données, le HQL (Hive Query Langage), est le portage « distribué »du SQL des SGBDR classiques.

Hive est rapidement passé sous licence Apache, sa première version stable date d'octobre 2010, et continue d'être améliorée tout en bénéficiant des évolutions continues de Hadoop.

4.1.2 Architecture

Hive est composé des éléments suivants (LeftyLeverenz, 2015) :

· le metastore

Le metastore contient l'ensemble des données décrivant les tables (schéma de données, localisation des fichiers, et les statistiques concernant les données telles que le nombre de lignes, la taille moyenne d'une ligne, etc.).

· un compilateur (compiler) et un optimiseur (optimizer)

Le compilateur réalise l'analyse syntaxique (générant un arbre syntaxique abstrait ou AST) puis l'analyse sémantique (génération d'un arbre d'opérations ou operatortree) définissant l'ensemble des étapes nécessaires à l'accomplissement de la requête. Au cours de cette analyse, le metastoresera sollicité pour validerque l'ensemble des objets (fichiers et champs) décrits dans la requête sont valides.

Il est à noter qu'il ne figure aucune dépendance à MapReduce, ni à d'autres frameworks distribués, à ce stade de la compilation.

Ensuite, selon le framework distribué utilisé par Hive, un compilateur de tâche sera exécuté (MapRedCompiler par exemple) et génèrera une tâche exécutable (MapRedTask, c'est-à-dire du code Java implémentant le framework MapReduce dans ce cas de figure) décrit par un arbre d'opérations qui aura d'abord été optimisé puis transformé en enchaînement de travaux (MapRedWork).

Les dernières versions de Hive permettent d'utiliser d'autres frameworks distribués comme Tez et Spark, sans remettre en question cette architecture.

· un exécuteur (executor)

L'exécuteur est en charge de soumettre lestâches auNodeManager(côté Hadoop). Il supervisera ainsi leur exécution jusqu'à leur achèvement.

· une interface utilisateur (UI)

L'interface utilisateur permet à l'utilisateur d'interagir avec Hive (envoyer des requêtes / obtenir les résultats) au travers d'un client connecté au serveur Hive via un connecteur ODBC ou JDBC dans la plupart des cas.

· le pilote (driver)

Le pilote contrôle les échanges entre l'interface utilisateur, le compilateur et l'exécuteur. Il reçoit ainsi la demande d'exécution d'une requête par l'utilisateur qu'il soumettra au compilateur, puis à l'exécuteur après compilation. Il est également en charge de restituer le résultat de la requête, reçu par l'exécuteur, à l'utilisateur.

Figure 6 : architecture de Hive

4.2 La commande « EXPLAIN »

Tout comme dans les SGBDR classiques, Hive propose la commande EXPLAIN qui permet d'obtenir le plan d'exécution de la requête à exécuter. Dans notre environnement distribué, le plan d'exécution liste l'ensemble des étapesqui seront exécutées sur le cluster Hadoop.

Nous allons tirer parti de cette commande pour constituer notre connaissance du fonctionnement de Hive. Ce qui nous permettra plus tard, d'optimiser nos requêtes. Nous jugeons fondamental de proposer une méthode d'interprétation des résultats d'un EXPLAIN. En effet, c'est un outil que tout utilisateur de Hive doit pouvoir s'approprier afin d'en faire de même. Nous espérons que nos explications détaillées pourront ainsi servir de modèle. En effet, il nous a été nécessaire d'analyser le code source de l'EXPLAIN ( https://github.com/apache/hive) pour le comprendre pleinement, n'ayant trouvé aucun document réellement explicatif à son propos.

Commençons donc à construire notre compréhension du fonctionnement de Hive en utilisant la commande EXPLAIN EXTENDED sur une requête que nous complexifierons au fur et à mesure.

Le rajout du mot clé « EXTENDED » permet d'obtenir notamment l'arbre syntaxique abstraitde la requête ainsi que des informations issues du metastore, relatives auxtables manipulées.

En outre, le résultat d'un « EXPLAIN » étant relativement verbeux, nous le limiterons dans nos exemples aux informations que nous jugerons pertinentes.

Enfin, pour des raisons de confidentialité d'entreprise, le nom des bases, des tables et des champs ont été anonymisés.

4.2.1 Explication d'une projection simple

La commande suivante permet d'expliquer une requête simpleeffectuant une projection :

EXPLAIN EXTENDED

SELECT field1FROM z_database1.table1;

Le résultat de l' « EXPLAIN EXTENDED » est :

ABSTRACT SYNTAX TREE:

TOK_QUERY

TOK_FROM

TOK_TABREF

TOK_TABNAME

z_database1

table1

TOK_INSERT

TOK_DESTINATION

TOK_DIR

TOK_TMP_FILE

TOK_SELECT

TOK_SELEXPR

TOK_TABLE_OR_COL

field1

STAGE DEPENDENCIES:

Stage-0 is a root stage

STAGE PLANS:

Stage: Stage-0

Fetch Operator

limit: -1

Processor Tree:

TableScan

alias: table1

Statistics:

Num rows: 38877279

Data size: 95714574567

Basic stats: COMPLETE

Column stats: NONE

GatherStats: false

Select Operator

expressions: field1 (type: varchar(10))

outputColumnNames: _col0

Statistics:

Num rows: 38877279

Data size: 95714574567

Basic stats: COMPLETE

Column stats: NONE

ListSink

Explications :

Le résultat d'un « EXPLAIN EXTENDED » contient 3 parties.

· (1) L'arbre syntaxique abstrait

Il décrit un ensemble de blocs « TOK_* » symbolisant les tokens (des mots clé) reconnus par le parser.

Dans notre exemple, nous remarquons notamment les tokens suivant :

- TOK_QUERY : la racine de l'arbre

o TOK_FROM : la base et la table interrogées

o TOK_INSERT : la destination du résultat dans un fichier temporaire

o TOK_SELECT : les champs sélectionnés

· (2) Les dépendances entre étapes

Cette partie décrit le plan de dépendances entre chaque étape où une étape peut :

- êtreracine (« is a root stage »)

- dépendre d'une autre étape (« depends on stage... »)

- consister à déclencher une autre étape (« consists of stage... »)

Dans notre exemple, il ne figure qu'une seule étape :

- Stage-0 : l'étape racine

· (3) Le plan d'étapes

Cette partie décrit la séquence des étapes qui seront réalisées où chaque étape peut être de type :

- Map Reduce

- Fetch Operator (lire les données d'une table)

- Move Operator (déplacer un fichier du HDFS)

Une étape peut contenir un ou plusieurs arbres de traitements, dont chacun peut contenir un ou plusieurs traitements.

Dans notre exemple :

- Stage-0

o est de type « Fetch Operator »

o contient un arbre de traitements « Processor Tree » décrivantla séquence de traitements suivants :

§ TableScanSelect OperatorListSink

Où :

- TableScan : « balaye » la table ligne par ligne

- Select Operator : projette le champ sélectionné vers un fichier temporaire

- ListSink : transmetles données du fichier temporaire à l' « executor »

Une simple projection n'implique ni opération « Map », ni opération « Reduce » carelle ne requiert aucune manipulation des données. Les informations fournies par le metastore(l'emplacement des fichiers contenant la table et la définition de chacun de ses champs) suffiront pour demander directement au cluster HDFS (NameNode + Datanodes) de transmettre à l' « executor », les données projetées de la table, sans solliciter le cluster YARN (ResourceManager + NodeManagers).

4.2.2 Explication d'une projection avec une restriction

Complexifions légèrement notre requête en lui appliquant une restriction puis expliquons-la :

EXPLAIN EXTENDED

SELECT field1 FROM z_database1.table1

WHERE field1="123";

Le résultat de cet « EXPLAIN EXTENDED » est :

ABSTRACT SYNTAX TREE:

TOK_QUERY

TOK_FROM

TOK_TABREF

TOK_TABNAME

z_database1

table1

TOK_INSERT

TOK_DESTINATION

TOK_DIR

TOK_TMP_FILE

TOK_SELECT

TOK_SELEXPR

TOK_TABLE_OR_COL

field1

TOK_WHERE

=

TOK_TABLE_OR_COL

field1

"123"

STAGE DEPENDENCIES:

Stage-1 is a root stage

Stage-0 depends on stages: Stage-1

STAGE PLANS:

Stage: Stage-1

Map Reduce

Map Operator Tree:

TableScan

alias: table1

filterExpr: (field1 = '123') (type: boolean)

Statistics:

...

GatherStats: false

Filter Operator

isSamplingPred: false

predicate: (field1 = '123') (type: boolean)

Statistics:

...

Select Operator

expressions: field1 (type: varchar(10))

outputColumnNames: _col0

Statistics:

...

File Output Operator

compressed: true

GlobalTableId: 0

directory:

hdfs://NAMENODE/tmp/hive/u_xxxx_yyy/.../-ext-10001

NumFilesPerFileSink: 1

Statistics:

...

Stats Publishing Key Prefix:

hdfs://NAMENODE/tmp/hive/u_xxxx_yyy/.../-ext-10001/

table:

input format:

org.apache.hadoop.mapred.TextInputFormat

output format:

org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

...

TotalFiles: 1

GatherStats: false

MultiFileSpray: false

Path -> Alias:

hdfs://NAMENODE/.../z_database1.db/table1 [table1]

Path -> Partition:

hdfs://NAMENODE/.../z_database1.db/table1

Partition

base file name: table1

input format:

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat

output format:

org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat

properties:

...

Stage: Stage-0

Fetch Operator

limit: -1

Processor Tree:

ListSink

Explications :

Le résultat de cet « EXPLAIN EXTENDED » contient les 3 mêmes parties.

· (1) L'arbre syntaxique abstrait

Nous remarquons qu'il contient un nouveau token « TOK_WHERE » définissant notre restriction, nous avons en effet à présent :

- TOK_QUERY : la racine de l'arbre

o TOK_FROM : la base et la table interrogées

o TOK_INSERT : la destination du résultat dans un fichier temporaire

o TOK_SELECT : les champs sélectionnés

o TOK_WHERE : la restriction opérée sur un champ

· (2) Les dépendances entre étapes

Dans notre exemple, le plan de dépendance est devenu plus complexe puisqu'il figure à présent deux étapes :

- Stage-1: l'étaperacine

- Stage-0 : une étape dépendant de l'étape racine

· (3) Le plan d'étapes

Dans notre exemple :

- Stage-0 :

o est de type « Map Reduce »

o contient un arbre de traitements « Map Operator Tree » décrivant une séquence de traitements :

§ TableScanFilter OperatorSelect OperatorFile Output Operator

Où :

Ø TableScan : « balaye » la table ligne par ligne

Ø Filter Operator : filtre la ligne parcourue

Ø Select Operator : projette le champ sélectionné

Ø File Output Operator : définit la table temporaire qui contiendra le résultat et notamment son emplacement physique sur HDFS, ainsi que ses statistiques

Voici un schéma décrivant cette étape :

Figure 7 : diagramme de traitement du parcours d'une table avec projection et restriction

- Stage-1 :

o est de type « Fetch Operator »

o contient un arbre de traitements « Processor Tree » réalisant un unique traitement :

§ ListSink

Où :

Ø ListSink : transmet les données du fichier temporaire à l' « executor »

Résumons :

- Le Stage-0 est chargé de construire une table temporaire, réalisant la projection et la restriction d'une table (phase Map).

- Le Stage-1 est chargé de transférer les données de la table temporaire à l' « executor » de Hive qui se chargera de les acheminer au client qui aura effectué la requête.Dans cette étape, l'absence de « TableScan » provient du fait qu'il n'y plus aucune opération à réaliser (ni projection, ni restriction), mis à part le transfert.

Figure 8 : processus de traitement MapReduce d'une requête avec projection et restriction

Un tel fonctionnement est appelé « Map-Only Job ». Sa particularité est de ne faire appel qu'à une phase Map, épargnant ainsi notamment les étapes « Partition & Sort » et « Shuffle & Merge & Sort » préalables à la phase Reduce. C'est un procédé qui permet dans tous les cas de figure de réaliser un traitement avec le maximum de distribution sans avoir à agréger les données sur un ensemble de Reducers.

Notons enfin, les sections « Path Alias » et « Path Partition » qui s'ajoutent et qui donnent un ensemble d'informations relatives à la table utilisée dans notre requête :

· Path Alias : l'emplacement physique de la table dans HDFS

· Path Partition : l'emplacement physique de la partition dans HDFS(le rôle d'une partition sera expliqué dans le prochain chapitre)

o input / output format : le format de la table

o properties

§ columns : la liste des champs de la table

§ columns.types : le type de chaque champ de la table

o numFiles : la quantité de fichiers utilisés par la table

o numRows : le nombre de lignes contenues par la table

o rawDataSize : la taille brute de la table (non compressée)

o totalSize : la taille réelle de la table (compressée)

4.2.3 Explication d'une projection avec une restriction et une agrégation

Complexifions à nouveau notre requête en lui ajoutant une agrégation puis expliquons-la :

EXPLAIN EXTENDED

SELECT field1 FROM z_database1.table1

WHERE field1="123"

GROUP BY field1

Rappelons qu'un « GROUP BY colonne1, ... colonneN » regroupera sur une même ligne chaque partition d'une liste de colonnes. Il est donc nécessaire que cette liste de colonnes apparaisse immédiatement suite au « SELECT ».L'usage habituel est d'ajouter ensuite des fonctions d'agrégation comme « COUNT », qui s'appliqueront à chacune des partitions listées. Notre exemple n'utilise pas de telles fonctions, pour ne pas ajouter de complexité inutile.

Le résultat de cet « EXPLAIN EXTENDED » est :

ABSTRACT SYNTAX TREE:

...

STAGE DEPENDENCIES:

Stage-1 is a root stage

Stage-0 depends on stages: Stage-1

STAGE PLANS:

Stage: Stage-1

Map Reduce

Map Operator Tree:

TableScan

alias: table1

filterExpr: (field1 = '123') (type: boolean)

Statistics:

...

GatherStats: false

Filter Operator

isSamplingPred: false

predicate: (field1 = '123') (type: boolean)

Statistics:

...

Reduce Output Operator

key expressions: field1 (type: varchar(10))

sort order: +

Map-reduce partition columns: field1 (type: varchar(10))

Statistics:

...

tag: -1

auto parallelism: false

Path -> Alias:

hdfs://NAMENODE/.../z_database1/table1 [table1]

Path -> Partition:

hdfs://NAMENODE/.../z_database1/table1

Partition

...

Reduce Operator Tree:

Group By Operator

keys: KEY._col0 (type: varchar(10))

mode: complete

outputColumnNames: _col0

Statistics:

...

File Output Operator

...

Stage: Stage-0

Fetch Operator

limit: -1

Processor Tree:

ListSink

Nous ne reviendrons ni sur l'arbre syntaxique abstrait (qui ajoute à l'exemple précédent un nouveau token « TOK_GROUPBY ») ni sur les dépendances entre étapes (qui sont strictement identiques à l'exemple précédent : Stage-1Stage-0).

Détaillons en revanche le plan d'étapes :

- Stage-0

o est de type « Map Reduce »

o contient un arbre de traitements « Map Operator Tree » décrivant une séquence de traitements :

§ TableScanFilter OperatorReduce Output Operator

o contient un arbre de traitements « Reduce Operator Tree » décrivant une séquence de traitements :

§ GroupBy OperatorFile Output Operator

- Stage-1

o est identique à l'exemple précédent, il consiste à transférer les données issues du « File Output Operator » (ListSink) à l' « executor » de Hive

Résumons :

Figure 9 : processus de traitement MapReduce d'une requête SQL avec agrégation

Nous identifions que le Stage-0implique l'exécution d'une phase Map (A) puis d'une phase Reduce (B).

A. Phase Map

Dans « Map OperatorTree », le « File Output Operator » de l'exemple précédent est remplacé par un « Reduce Output Operator ». Cela veut dire que la sortie ne s'effectuera pas sur un fichier du HDFS mais au travers du réseau vers les noeuds réalisant la phase Reducer. Le « Reduce Output Operator » définit plusieurs éléments fondamentaux pour la suite.

· key expressions

Une expression (Ki) peut être une simple colonne ou une combinaison de plusieurs colonnes dans une formule (concaténation de deux colonnes par exemple).

Par exemple, nous pourrions avoir :

K1 = field1

ou bien

K1 = concat(field2, field3)

Une « key expressions » est l'ensemble des expressions composant la clé qui sera utilisée durant toutes les phases du Map Reduce. Chaque clé peut être ainsi considérée comme un n-uplet (K1, ..., Kn).

Nous aurions ainsi des couples clé (K) / valeur (V) de la forme suivante :

((K1, ..., Kn), (V1, ..., Vm))

Notons que le framework MapReduce définit une clé comme un seul élément (il en va de même pour la valeur), cela implique donc l'utilisation de mécaniques de sérialisation par Hive afin d'assurer la compatibilité entre ces deux environnements ( annexe).

· sort order

Il s'agit du sens du tri (« + » pour ascendant et « - » pour descendant) qui sera appliqué à la clé durant les phases de « Partition & Sort » et « Shuffle & Merge & Sort ».

Il est à noter qu'il y aura autant de « + » ou de « - » que d'expressions définissant la clé (le premier ordre s'applique à K1, le n-ième ordre à Kn).

· map-reduce partition columns

Il s'agit de l'ensemble des colonnes qui définiront le partitionnement qui sera utilisé durant les phases « Partition & Sort & Shuffle » et « Merge & Sort ».

Le partitionnement permet d'affecter chaque ligne parcourue à un Reducer. Il garantit que toutes les lignes identiques sur cet ensemble de colonnes seront traitées au sein d'un même Reducer.

Dans notre exemple, la définition de la clé est identique au partitionnement. Il s'agit du cas d'application classique du paradigme MapReduce. Il est cependant possible de distinguer la clé et le partitionnement en utilisant notamment les instructions suivantes en fin de requête :

DISTRIBUTE BY <ci, ..., cn> SORT BY <cj, ..., cm>

ou bien

CLUSTER BY <ci, ..., cn>

« DISTRIBUTE BY » indique l'ensemble de colonnes qui définira le partitionnement ( annexe).

« SORT BY » indique le sens du tri sur chaque colonne ou expression définissant la clé.

« CLUSTER BY » consiste en un « DISTRIBUTE BY » et « SORT BY ».

B. Phase Reduce :

Dans « Reduce Operator Tree », il figure un bloc « Group By Operator » qui définit les éléments suivant :

· keys

Il s'agit de l'ensemble des Ki composant la clé défini durant le « Map Operator Tree » et qui sera utilisée durant la phase Reduce qui traitera donc séparément chaque n-uplet.

· mode

Dans le contexte du « GROUP BY », il indique si le mode est :

- complete

o la phase Reduce a réalisé toute la phase d'agrégation

- mergepartial

o la phase Reduce n'a réalisé qu'une agrégation partielle, ce qui indique qu'une agrégation préliminaire a déjà été réalisée durant la phase Map

Dans notre exemple, l'agrégation est « complete ».

· outputColumnNames

Il s'agit des noms des colonnes qui seront retournées dans la table temporaire.

4.2.4 Explication d'une jointure entre deux tables

Modifions une dernière fois notre requête, cette fois-ci en faisant apparaître une jointure et expliquons-la.

EXPLAIN

SELECT t1.field1 FROM z_database1.table1 t1

INNER JOIN z_database1.table2 t2 ON t1.field1=t2.field1

WHERE t1.field1='123'

GROUP BY field1

Le principe de l'EXPLAIN ayant déjà été expliqué, une représentation en schéma suffira pour comprendre les différentes étapes.

· Les dépendances entre étapes :

Figure 10 : graphe des dépendances d'une requête avec jointure et agrégation

Tout d'abord, nous remarquons la présence d'une étape « racine » de type « Conditional Operator » (Stage-6).

Cette étape est générée du fait de la présence d'une jointure. Elle modifiera la manière dont sera réalisée cette jointure (Stage-1) si certaines conditions sont remplies (nous le verrons dans le prochain chapitre consacré à l'optimisation).

· Le plan d'étapes :

Figure 11 : processus de traitement MapReduce d'une requête SQL avec jointure

- Stage-1

Par défaut, la jointure s'exécutera de manière commune(Common-Join), il y aura une phase « Map » durant laquelle les Mappers seront chargées de balayer intégralement les deux tables à joindre pour constituer un ensemble de clé/valeur. La clé choisie sera celle de la jointure indiquée par l'utilisateur.

Ensuite, dans la phase « Partition & Sort », chaque Mapper aura à répartir l'ensemble des couples clé / valeurdes deux tables dans les partitions qui leur auront été attribuées, puis à appliquer un tri par clé.

Figure 12 : phase « Map » d'une jointure sur un Mapper

Dans la phase « Shuffle & Merge & Sort », les partitions identiques de chaque Mapper seront fusionnées entre-elles par les Reducers, puis, un nouveau tri par clé sera appliqué à chaque partition fusionnée.

En fin de traitement, chaque Reducer aura écrit indépendamment sur HDFS, un des fichiers composant la table temporaire qui alimentera la prochaine étape.

Figure 13 : phase « Reduce » d'une jointure sur deux Reducers

- Stage-2

Cette étape est identique à l'exemple précédent, en revanche notons que les données qu'elle considérera sont celles en sortie du Stage-1.

4.3 Discussion

Notre approche « par l'EXPLAIN », nous a permis de découvrir comment Hive transforme une requête SQL en application MapReduce. Elle doit permettre de mieux appréhender toute requête demandant un travail d'optimisation.

4.3.1 Requête avec une restriction

Elle montre qu'un« Map-Only Job » permet de parcourir et traiter très efficacement l'intégralité d'une tablesans déclencher de phase « Reduce » qui impliqueraitles phases intermédiaires« Partition & Sort » et « Shuffle & Merge & Sort ».

ð Il serait donc intéressant d'utiliser ce mode de traitement le plus souvent possible.

4.3.2 Requête avec une agrégation

Ellemontre un « Map Reduce » et permet de mieux appréhender la puissance de ce paradigme. En effet, il ne semble plus aussi problématique que sur un SGBDR de réaliser un « full scan » d'une table, puisque plusieurs Mappers se répartissent le travail.

En revanche, le « Shuffle & Merge & Sort » est coûteux du fait de la nécessité de transmettre des données au travers du réseau, de fusionner puis d'appliquer un tri sur les données, ce qui est particulièrement vrai lorsque le volume de données à traiter est volumineux.

ð Il serait intéressant de transférer uniquement les données qui seront utiles pour la suite de la requête.

4.3.3 Requête avec une jointure et une agrégation

Elle montre un enchaînement de deux « Map Reduce » qui implique de lire et d'écrire plusieurs fois sur HDFS, sollicitant donc le cluster davantage. Ainsi, les données issues du premier « Map Reduce » seront écrites sur HDFS, et le second « Map Reduce » commencera par les charger. Intuitivement, ilaurait été plus immédiat que le premier « Reduce » (de jointure) enchaîne immédiatement sur le second « Reduce » (d'agrégation), puisque les données auraient déjà toutes été chargées.

ð Il faudrait pouvoir éviter d'une part des lectures/écritures superflues sur HDFS et d'autre part, une phase « Map » redondante qui ne ferait que lire une deuxième fois les mêmes données.

En outre, nous remarquons que la problématique des « petits fichiers » s'applique également dans le contexte de Hive. La figure 13 montre que la table temporaire générée sera composée de plusieurs fichiers et potentiellement, de petits fichiers. Dans ce cas de figure, la génération de petits fichiersserait liéeaux faibles occurrences des clés dans chaque partition.

ð Fusionner en fin de traitement les petits fichiers entre eux, permettrait d'alléger la pression exercée sur le NameNode et ses DataNodes. De même, cela optimiserait les traitementsfuturs sur ces données.

A l'inverse, un déséquilibre pourrait être provoqué par une partition qui contiendrait beaucoup plus de données que la moyenne des autres partitions. Cela pourrait provenir d'une clé représentée bien plus fréquemment par rapport aux autres. La charge imputée au Reducer responsable de cette partition serait alors bien supérieure à celle des autres.

ð Il serait intéressant d'être en mesure d'influencer Hive à l'exécution en lui indiquant comment sont réparties les données,si elles sont déséquilibrées.

A la lumière de toutes ces observations, nous disposons maintenant de nombreuses pistes à explorer afin d'optimiser les traitements de nos requêtes SQL. C'est ce que nous allons tenter de faire dans notre prochaine partie.

5 OPTIMISATION DU SQL SUR HADOOP

Les deux premières parties nous ont permis d'expliquer le fonctionnement de Hadoop, le paradigme « MapReduce » et comment il s'applique au SQL au travers de Hive.

Les prérequis ayant été exposés, nous allons pouvoir nous concentrer sur l'objectif de ce mémoire qui est de proposer des stratégies d'optimisation des requêtes SQL.

Nous aborderons cette partie sous plusieurs angles, d'une part en proposant des réglages techniques et systématiques pour Hive (tuning) et d'autre part en proposant des bonnes pratiques de développement des requêtes qui tirent parti de notre maîtrise des différentes phases de MapReduce.

5.1 Optimisation par le réglage ou « tuning »

5.1.1 Utiliser Tez

La première optimisation que nous allons aborder est l'usage du framework Tez au lieu du framework classique MapReduce.

Comme nous l'avons vu précédemment, MapReduce est un framework très puissant mais qui montre ses limites dès qu'il est question d'enchaîner plusieurs traitements impliquant l'écriture de résultats intermédiaires sur HDFS, ce qui pénalise les performances.

L'application Tez repose sur YARN (Arun Murthy, 2013), tout comme l'application MapReduce depuis Hadoop 2.0. Alors que MapReduce exécute une séquence immuable HDFS Map Reduce HDFS, Tez exécute un Diagramme Acyclique Orienté(plus communément appelé DAG pour Directed Acyclic Graph) qui a l'avantage d'être flexible.

5.1.1.1 Principe d'un Diagramme Acyclique Orienté

Un DAG est composé de sommets (vertices) et d'arêtes (edges). Un sommet définit une étape (pouvant être un Map ou un Reduce par exemple) et une arête définit le mouvement des données émises (Bikas Saha, 2013).

Les arêtes peuvent être de plusieurs types :

- One-To-Onela tâche i d'une étape A productrice transmet toutes ses données à la tâche i d'une étape B consommatrice

Figure 14 : mouvement de données de type « One-To-One »

- Broadcastchaque tâche d'une étape A productrice transmet toutes ses données à toutes les tâches d'une étape B consommatrice

Figure 15 : mouvement de données de type « Broadcast »

- Scatter-Gatherchaque tâche d'une étape A productrice transmet une partie de ses données à chaque tâche d'une étape B consommatrice

Figure 16 : mouvement de données de type « Scatter-Gather »

De plus il sera possible de préciser, pour chaque sommet (correspondant à une étape) :

- le mode de déclenchement des tâches :

o Sequential la tâche consommatrice est exécutée lorsque ses tâches productrices sont terminées

o Concurrent la tâche consommatrice est exécutée en parallèle des tâches productrices

- la durée de vie des données :

o Persistedles données en sortie d'une tâche sont disponibles quelques temps après sa fin

o Persisted-Reliable les données en sortie d'une tâche sont garanties d'être disponibles indéfiniment après sa fin

o Ephemeral les données en sortie d'une tâche ne sont disponibles que durant son exécution

Ces capacités permettent de transformer le SQL de Hive en DAG, permettant d'économiser des phases de lecture/écriture superflues sur HDFS (les données peuvent être conservées en local du noeud sur lequel la tâche s'exécute) et donc d'épargner le cluster (NameNode et DataNodes). De plus, elles permettent également d'être en mesure de paralléliser l'exécution de certaines tâches (Bikas Saha, 2016).

5.1.1.2 Comparaison MapReduce / Tez

Comparons l'exécution de notre requête la plus complexe, comportant une jointure et une agrégation, sous MapReduce et sous Tez :

Figure 17 : comparaison MapReduce / Tez

Dans le cas MapReduce, les Reducers du premier « Map Reduce » (jointure) devront écrire leur sortie sur HDFS. Les Mappers du second « Map Reduce » (agrégation) pourront ainsi lireces données sur HDFS, les traiter et les transmettre au dernier Reducer pour réaliser l'agrégation.

Dans le cas Tez, les Reducers du premier « Map Reduce » (jointure) pourront retourner directement leur sortie vers le Reducer réalisant l'agrégation finale. Cela est rendu possible grâce à la flexibilité de Tez qui n'implémente pas « en dur » le paradigme MapReduce mais en fait une généralisation.

Pour activer Tez, il suffit de placer cette ligne au-dessus de la requête à exécuter.

set hive.execution.engine=tez

Un exemple d'une implémentation de « WordCount » pour Tez est fourni en annexe.

5.1.2 Contrôler la taille des fichiers manipulés

Comme nous l'avons vu dans le chapitre précédent, la phase « Reduce » est génératrice de fichiers et selon la qualité du partitionnement (homogénéité de la représentation d'une clé), leur quantité pourra varier. Lorsque ces fichiers sont de petite taille, toutes les couches composant Hadoop sont impactées.

Montrons comment il est possible d'optimiser la taille des fichiers manipulés, soit en phase de lecture (Map), soit en phase d'écriture (Reduce).

5.1.2.1 durant la phase de lecture (Map)

Une étape « Map » ayant à traiter de nombreux petits fichiers, engendrera beaucoup de Mappers (et donc de containers YARN)qui devront solliciter massivement le HDFS au travers du NameNode et de ses DataNodes pourlire leurs données.

Tez permet de consommer moins de Mappers en créant des groupes de splits (Bikas Saha, 2016 + annexe).

Pour rappel, un split est un pointeur sur un blocHDFS et indique à un Mapper l'emplacement des données à traiter.

Les options suivantes permettent de définir la taille d'un groupe :

settez.grouping.min-size=16777216; -- 16 MB min splitset tez.grouping.max-size=1073741824; -- 1 GB max split

Il est également possible de procéder de la façon suivante, en définissant le nombre de splits que contiendra un groupe (ce paramétrage est prioritaire par rapport au précédent) :

set tez.grouping.split-count=50;

Cette approche est très pratique mais n'empêche pas la sollicitationde HDFS, car il faudra bien que le NameNode soit interrogé pour indiquer l'emplacement des blocs de données. Il est donc nécessaire d'agir en amont (c'est-à-dire durant le processus de création des fichiers) afin d'éviter cela. Or, les données traitées par les Mappers pourront être issues de phases Reduce de traitements antérieurs. Il faudra donc agir au niveau de la phase Reduce.

5.1.2.2 durant la phase d'écriture (Reduce)

Le problème provient des étapes« Reduce » utilisant de nombreuses partitions et qui devront également solliciter massivement le HDFS.En effet, elles devront générer un fichier par partition.

Il est possible d'indiquer à Hive de réaliser une étape « Map » supplémentaire consistant à fusionner ces fichiers et ainsi permettre pour un prochain traitement d'alléger la pression exercée sur le HDFS (Hao Zhu, 2014). Nous aurons ainsi obtenu un traitement du type Map Reduce HDFS Map.

Pour ce faire, les directives à utiliser sont les suivantes :

Pour Tez :

set hive.merge.tezfiles=true;set hive.merge.smallfiles.avgsize=16000000;set hive.merge.size.per.task=128000000;

Pour MapReduce :

set hive.merge.mapredfiles=true;set hive.merge.mapfiles=true;set hive.merge.smallfiles.avgsize=16000000;set hive.merge.size.per.task=128000000;

- Les directives en rouge activent le mécanisme de fusion

- Les deux autres directives définissent respectivement

o la taille moyenne d'un petit fichier (16MB).

o la taille des fichiers générés (128MB).

Il est important de désactiver la compression des données, sinon la fusion des petits fichiers ne fonctionnera pas(Hao Zhu, 2015) une fusion de deux fichiers compressés n'étant pas réalisable.

set hive.exec.compress.output=false;

Il d'autant plus important de le savoir qu'aucune erreur ne sera signalée en cas d'échec de la fusion durant la phase de Map réalisant l'opération.

5.1.3 Agréger en amont

Nous avons vu que la phase de « Shuffle & Merge & Sort » est coûteuse, au regard de la nécessité de transmettre les données au travers du réseau mais aussi,parce que cette phase implique des fusions et des tris à répétition.

Afin d'alléger considérablement cette charge, les Mappers peuvent agréger partiellement les données ce qui réduira la quantité de données à transmettre. Les Reducers n'auront plus qu'à effectuer un « Group By Operator » dont le mode sera « mergepartial » au lieu de « complete », ce qui sera moins coûteux.

La directive suivante devra être utilisée pour activer l'agrégation au niveau « Map » :

set hive.map.aggr=true;

5.1.4 Réaliser un « benchmark » significatif

Terminons cette partie en abordant la problématique de la réalisation d'un « benchmark » d'une requête SQL sous Hive.

Il n'y a pas de déterminisme dans le temps qui sera nécessaire pour obtenir du NameNode et des DataNodes les informations permettant de lire les données et surtout pour obtenir des containers YARN.

Ainsi, une même requête pourra avoir des temps d'exécution drastiquement différents, suivant la disponibilité du cluster. Il conviendra donc de s'affranchir au maximum de ce temps préalable à l'exécution réelle de la requête, afin de mieux évaluer son coût d'exécution.

Les directives suivantes permettent de réserver à l'avance une quantité de containers.

set hive.prewarm.enabled=trueset hive.prewarm.numcontainers=10

Suite à ses directives, nous constatons sur l'interface graphique d'administration YARN la présence de 11 containers (1 ApplicationMaster et 10 containers), chacun reposant sur une JVM de 2GB.

Bien sûr, cette préemption ne dure que le temps de la session Tez qui est défini par la directive suivante (par défaut réglé à 30 secondes) :

set tez.client.timeout-ms=30000

Il devient ainsi possible de « jouer » plusieurs fois une requête et d'estimer son temps d'exécution sans qu'il soit parasité par la phase de réservation des ressources.

Enfin, il sera intéressant d'étudier les techniques du site http://www.tpc.org/tpcds/ qui propose plusieurs méthodes de benchmarking. Celles-ci n'ont pu être étudiées dans le cadre de ce mémoire du fait de la nécessité de disposer de droits « administrateur » sur l'infrastructure Hadoop.

Mais c'est un sujet important qui devra faire l'objet d'un approfondissement durant la réalisation des projets à venir.

5.2 Optimisation par la conception ou « design »

Si le tuning permet d'améliorer les temps d'exécutiondes requêtesde manière mécanique, l'importance de leur conception ne doit pas être occultée, en particulier lorsqu'il est question de produire un « Use Case Big Data ».

Un « Use Case Big Data » consiste à produire de nouvelles données par croisement périodiquede données sources (issues du SI mais aussi de données externes), qui pourront être consommées par des tiers, afin de répondre à un besoin métier spécifique.

Cette production de données consiste ainsi à réaliser des requêtes sur le puits de données, à éventuellement créer des tables intermédiaires, puis à stocker le résultat final pour qu'il puisse être utilisé.

Figure 18 : du Système d'Information au Use Case Big Data

Nous appellerons « plan de requêtes », l'ensemble des requêtes permettant d'aboutir au croisement des données du Use Case.

Un plan de requêtes qui ne tiendrait pas compte de la spécificité des données à traiter (par exemple, pour chaque table, le nombre de lignes et la répartition des clés de jointures) ne pourra pas être optimisé.

En effet, en fonction de la typologie des données à traiter, il sera possible d'utiliser différentes techniques de requêtage.

Ainsi, cette partie se propose d'étudier ces différentes techniques avec le souci d'expliquer les situations qui leur seront particulièrement adaptées.

5.2.1 Utiliser les tables partitionnées

Les tables du puits de données atteignent régulièrement plusieurs TB. Lorsqu'une requête interroge l'une d'elle, il est rarement utile d'avoir à parcourir l'intégralité des données qu'elle contient.

Ces tables de grandes tailles sont ainsi partitionnées. Le partitionnement consiste à disposer les fichiers qui les composent de manière hiérarchisée dans le HDFS. Cette hiérarchie est définie à la création de la table dont voici la syntaxe :

CREATE TABLE z_database1.table1 (field1                  string, ...)PARTITIONED BY (year                    string,month                   string, daystring);

Les champs définis dans la section « PARTITIONED BY » définissent la hiérarchie de stockage dans le HDFS.

Considérons la table « table1 » que nous venons de définir, dont les fichiers sont sous cette arborescence :

2.6TB  /apps/hive/warehouse/z_database1.db/table1

La requête suivante précise une partition à considérer :

SELECT * FROM z_database1.table1WHERE year="2018" AND month="01" AND day="01"

Les fichiers réellement considérés seront donc sous cette arborescence :

5.9GB  /apps/hive/warehouse/z_database1.db/table1/year=2018/month=01/day=05

Seuls 5.9GB des données seront ainsi manipulées au lieu des 2.6TB de départ, ce qui épargnera le cluster HDFS (le NameNode sera sollicité sur une plus faible quantité de fichiers) et le cluster YARN (il ne sera pas nécessaire d'allouer une quantité importante de containers).

Cette méthode n'est pas réservée aux tables du puits de données, il est également intéressant d'utiliser le partitionnement pour les tables qui seront produites dans le cadre d'un « Use Case ». Dans ce cas, le partitionnement devra être réalisé en considérant des colonnes de faible cardinalité. En effet, des cardinalités trop importantes impliqueraient la création de nombreux répertoires et fichiers, ce qui, nous le savons à présent, est contre-productif.

Les partitions des tables du puits sont couramment hiérarchisées par la date de chargement des données.

Le tableau suivant montre la répartition du nombre de lignes insérées par jourde chargement dans la table « table1 » (sur 6 mois de données).

 

0,25

0,5

0,75

0,9

0,95

0,99

centile

46 114 093

47 831 809

48 907 836

50 198 535

51 131 198

90 529 083

Tableau 2 :centiles du nombre de lignes chargées par jour

Avec MIN=201 789 et MAX=108 132 735

Le tableau suivant montre la même répartition mais à l'aide des dates fonctionnelles, par exemple la date à laquelle a été passée une commande dans le cas d'une table enregistrant des commandes de clients.

 

0,25

0,5

0,75

0,9

0,95

0,99

centile

16 130 510

38 035 331

42 779 757

45 833 922

47 710 421

49 173 539

Tableau 3 : centiles du nombre de lignes par date fonctionnelle

Avec MIN=2 609 997 et MAX=49 361 016

Cette répartition est moins intéressante que la première caril y a de plus grands écarts entre chaque centile. La répartition des données est donc moins régulière.

Une étude préalable de la répartition des données est donc souhaitable avant de déterminer la manière de partitionner une table.

Cette méthodologie pourra être appliquée dès lors où il est nécessaire de partitionner une table volumineuse et que plusieurs colonnes sont en concurrence pour en établir la hiérarchie.

5.2.2 Optimiser les jointures

Les jointures constituent un challenge pour Hadoop. En effet, comme nous l'avons vu, dans le contexte d'un Common-Join, la phase « Shuffle & Merge & Sort » sollicite énormément le cluster du fait des mouvements et des traitements de données nécessaires pour réunir les données de plusieurs tables.

La connaissance des données manipulées et la compréhension des principaux mécanismes de jointure implémentés par Hive sont deux atouts pour optimiser leur fonctionnement.

5.2.2.1 Map-Join

Un Map-Join consiste à réaliser la jointure de plusieurs tables en une seule étape « Map »(Map-Only Job) au lieu d'une étape « Map Reduce » (LeftyLeverenz, 2017). Son intérêt est de traiter avantageusement le cas de plusieurs tables de tailles très différentes.

Ø Considérons un premier cas de figure avec le« 2-way join » (jointure entre deux tables) impliquant une « grande » table (table1) et une « petite » table (table2).

SELECT *FROM table1INNER JOIN table2 ON table1.field1=table2.field1;

La directive suivante permet d'identifier automatiquement le rôle de chaque table (grande ou petite) et convertira automatiquement un Common-Join en Map-Join si la « petite »table est suffisamment petite.

set hive.auto.convert.join=true

La directive suivante permet de fixer la taille maximum de la petite table (en MB). Si la « petite » table est suffisamment petite, celle-ci sera chargée en mémoire de tous les Mappers assignés au traitement de la « grande » table.

sethive.mapjoin.smalltable.filesize=25000000

Les dépendances entre étapes fournies par la commande EXPLAIN deviennent alors :

Figure 19 : graphe des dépendances d'une requête SQL avec jointure « Map-Only » (1/2)

Explication :

Le Stage-6 réalise un travail local consistant à parcourir la petite table et à l'envoyer directement aux Mappers du Stage-3. Ces derniers la chargeront en mémoire sous forme de table de hachage et pourront alors parcourir la grande table et réaliser la jointure à la volée.

Notons que le Stage-6 a un « plan de secours ». En effet la détection de la petite table est réalisée sur la base de statistiques et dans le cas où sa taille dépasse effectivement la limite fixée, il sera possible de basculer sur le Stage-1 qui réalisera un « Common-Join ».

Ø Considérons à présent un deuxième cas de figure avec le « n-wayjoin » (jointures d'une grande table avec plusieurs tables de tailles inférieures).

SELECT *FROM table1INNER JOIN table2 ON table1.field1=table2.field1;INNER JOIN table3 ON table1.field2=table3.field2;

Les directives suivantes permettent d'activer la conversion automatique d'un Common-Join en Map-Join en considérant cette fois-ci les N-1 plus petites tables de la requête.

set hive.auto.convert.join.noconditionaltask=trueset hive.auto.convert.join.nonconditionaltask.size=10000000

Ainsi, dans ce cas de figure, si la taille cumulée des tables table2 et table3 est inférieure à 10MB, elles seront chargées en mémoire de tous les Mappers assignés au traitement de la « grande » table (Hao Zhu, 2016).

Les dépendances entre étapes fournies par la commande EXPLAIN deviennent alors :

Figure 20 : graphe des dépendances d'une requête SQL avec jointure « Map-Only » (2/2)

Notons le retrait de d'étape préliminaire « Conditional Operator », et donc l'absence de « plan de secours ». Il faut donc être assuré de la bonne tenue des statistiques des tables afin d'estimer correctement leur taille. Dans le cas contraire, la jointure peut échouer et impliquer une erreur à l'exécution.

5.2.2.2 Skew-Join

Une jointure de deux grandes tables est habituellement réalisée par un traitement MapReduce qui commence par trier les tables par la clé de jointure. Chaque Mapper communiquera ensuite les lignes d'une clé particulière à un même Reducer (Nadeem Moidu, 2015).

Le Skew-Join permet de différencier le traitement d'une clé massivement représentée par rapport aux autres. Son avantage est donc d'éviter que le Reducer en charge de cette clé devienne un goulot d'étranglement pour le traitement Map Reduce.

Considérons une table table1 que nous souhaitons joindre à la table table2 par la clé de jointure champ1.

SELECT * FROM table1 INNER JOIN table2 ON table1.champ1=table2.champ1

Une étude sur table1 montre que le nombre d'occurrences des valeurs possibles de champ1 est relativement constant (entre 2457 et 2470), à l'exception de la valeur « v1 », qui apparaît3000 fois plus.

champ1

nombre d'occurrences

v1

7487662

v2

2470

v3

2469

...

2459

v5000

2457

Tableau 4 : exemple de données « biaisées » ou « skewed »

Figure 21 : Reducer devant traiter une donnée « biaisée » ou « skewed »

Dans ce schéma, nous remarquons que le Reducer R1 est surchargé par rapport aux autres.

Ce cas de figure nécessite donc une approche particulière.

Il serait envisageable, connaissant spécifiquement la donnée « biaisée », de la traiterséparément.

Considérons une table « table1 » dont le champ « champ1 » est biaisé sur la valeur « v1 » et une table « table2 », ayant une répartition homogène.

Nous procéderions ainsi :

SELECT * FROM table1INNER JOIN table2 ON table1.champ1=table2.champ1WHEREtable1.champ1<>"v1"

et dans un deuxième temps :

SELECT * FROM table1INNER JOIN table2 ON table1.champ1=table2.champ1WHEREtable1.champ1="v1"

La première requête génèrera un Common-Join équilibré (la valeur « v1 » est évitée), alors que la seconde sera facilement transformable en « Map-Join ».

Cette approche conduit à réaliser deux traitements, où il s'agira de lire plusieurs fois les mêmes données ce qui n'est pas économique.Il est possible de créer une table biaisée ou « Skewed Table » très simplement :

CREATE TABLE table1 (champ1 string,champ2 string,...)SKEWED BY (champ1) ON ("v1")

Cette déclaration de table, permet d'indiquer explicitement quel champ pourrait être biaisé (notions qu'il est possible d'indiquer plusieurs champs potentiellement biaisés).

La directive suivante active le « Skew-Join » :

set hive.optimize.skewjoin=true

Et celle-ci définit le seuil d'occurrences déclencheur :

set hive.skewjoin.key=7500000

Lorsqu'un Skew-Join est effectivement détecté durant l'exécution, un traitement Map Reduce sera opéré sur l'ensemble des données, excepté pour la valeur biaisée qui sera traitée séparément, évitant ainsi l'effet « goulot d'étranglement ».

5.3 Discussion

Au travers des stratégies d'optimisation que nous venons de voir, il apparaît que la connaissance fine des données à traiterest un prérequis indispensable à l'optimisation du requêtage.Ceci est vrai aussi bien dans le cas d'optimisations par le réglage que d'optimisations par la conception.

Dans le cadre de la conception d'un Use Case Big Data, la géométrie des données devra donc impérativement être prise en compte, dès lors où sont créées de nouvelles données (intermédiaires ou définitives).

Nous proposons les étapes suivantes comme modèle méthodologique à respecter avant le développement d'un Use Case Big Data :

- Analyser statistiquement les données : déterminer la répartition de chaque donnée « clé » impliquée dans la requête, estimer la taille des tables à interroger, ainsi que la taille des tables à construire.

- Déterminer la meilleure stratégie d'optimisation en fonction des éléments recueillis : est-ce que des réglages suffiront ou bien faudra-t-il altérer la conception des tables et/ou des requêtes ?

- Maintenir une veille technologique : à chaque changement de version de Hadoop, il faudra prendre le temps de se documenter sur les nouvelles capacités implémentées. Ce coût sera vite amorti dans la pratique.

Il ne s'agit là que d'une première ébauche méthodologique qui demanderait à être approfondie et complétée. De plus, celle-ci pourrait évidemment être remise en question selon les évolutions futures de Hadoop.

6 CONCLUSION

Ce mémoire nous a permis de découvrir en détail le fonctionnement de l'écosystème de Hadoop. Ce dernier était relativement rigide à ses débuts, l'allocation des ressources et le framework de traitement formaient une seule entité. De plus, MapReduce ne pouvait être utilisé autrement que par une succession immuable de phases Map puis Reduce.Au fil des années, de nombreux projets se sont greffés à lui et ont contribué à lui apporter plus de souplesse (YARN) permettant l'essor d'autres frameworks de traitement distribué comme Tez (qui généralise MapReduce).

Nous avons pu dresser un état de l'art à jour de Hadoop, tout en établissant du sens entre son infrastructure (NameNode+DataNodes, RessourceManager+NodeManager+ApplicationMaster) et la réalité d'un traitement (via MapReduce ou Tez).Il en a été de même vis-à-vis de Hive, application transformant du SQL en traitement distribué. Nous avons ainsi pu comprendre la logique MapReduce cachée derrière l'usage de requêtes SQL classiques.En effet, il est fondamental de bien comprendre le fonctionnement de Hive, car son usage ne peut se faire en mode « boîte noire » dès lors où il est question d'optimiser ses performances.

Notre approche de l'optimisation nous a ensuite permis d'aborder deux aspects. D'une part, le réglage, dans cette partie nous avons passé en revue de nombreuses options disponibles pour régler au mieux la façon dont Hive transformera le SQL en traitement distribué ; nous avons abordé les plus fondamentales, apportant de réels gains, tout en sachant qu'il en existe une myriade.D'autre part, la conceptionqui nous a permis de souligner l'importance de la connaissance fine des données à traiter, et ce, durant tout leur cycle de vie ; nous avons établi que les parades techniques abordées dans cette partien'ont de sens que dans un contexte d'étude préliminaire et statistique des données.

Cependant, nous n'avons fait qu'effleurer les quantités de solutions existantes. Si nous sommes capables de les appréhender plus facilement à présent, certaines mériteraient de s'y attarder davantage.Néanmoins, l'écosystème Hadoop est en mouvement permanent. Ce qui nous semble vrai aujourd'hui ne le sera peut-être plus demain, aussi sera-t-il plus que nécessaire de réaliser une veille technologique permanente pour continuer à maîtriser son usage.

Hadoop, depuis ses débuts, ne cesse de gagner en maturité. La communauté Open Source qui l'anime est très active et s'organise professionnellement au côté d'entreprises qui participent à son développement.A l'heure actuelle, des travaux sont déjà en cours pour résorber les principaux goulots d'étranglement comme le NameNode du HDFS ou les latences liées à l'allocation des ressources par YARN. Mais c'est surtout la capacité de Hadoopà ne pas se borner au seul paradigme MapReduce, à continuer de généraliser les concepts qu'il véhicule, à s'ouvrir aux autres technologiesqui sera déterminante pour son futur.

Hive se révèlequant à lui, un outil de haut niveau formidable pour entrer rapidement dans le Big Data de Hadoop.Sa dernière architecture lui permet d'utiliser indifféremment plusieurs frameworks de traitement distribués. Et si Tez est un véritable progrès par rapport à MapReduce, il en existe un autre, plus récent : Spark. Ce dernier est réputé pour être encore plus généraliste et entièrement « in-memory », mais il reste à déterminer pour quels volumes de données il excelle réellement.

Enfin, Hive nécessiteencore des améliorations, notamment au regard de l' « art » du paramétrage qu'il véhicule et qui impose audéveloppeur d'être à un niveau d'expertise comparable à celui d'un administrateur de base de données. Ce qui n'est pas acceptable de nos jours. Un système mature se caractérise notamment par la confiance que l'utilisateur peut accorder aux choix d'optimisation qu'il opère sans intervention humaine. Pour que Hive puisse continuer d'exister, il devra ainsi se charger de ses propres réglages et ne pas les laisser à la charge du développeur.Une piste envisageable serait donc d'ajouter davantage d'intelligence artificielle à Hive. Cela afin de l'aider à la prise de décisions efficaces quant aux stratégies d'optimisation qu'il devra mettre en oeuvre.

7 BIBLIOGRAPHIE

Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung (2003). The Google File System

https://research.google.com/archive/gfs.html

Jeffrey Dean and Sanjay Ghemawat (2004).MapReduce: Simplified Data Processing on Large Clusters

https://research.google.com/archive/mapreduce.html

Tom White (2015). Hadoop: The definitive guide

Ashish Thusoo (2009). Hive - A Petabyte Scale Data Warehouse using Hadoop

https://fr-fr.facebook.com/notes/facebook-engineering/hive-a-petabyte-scale-data-warehouse-using-hadoop/89508453919/

Lefty Leverenz (2015). Hive Design

https://cwiki.apache.org/confluence/display/Hive/Design#Design-HiveArchitecture

Arun Murthy (2013). Tez: Accelerating processing of data stored in HDFS

https://fr.hortonworks.com/blog/introducing-tez-faster-hadoop-processing/

BikasSaha (2013). Accelerating Hadoop Query Processing

https://www.slideshare.net/hortonworks/apache-tez-accelerating-hadoop-query-processing

BikasSaha (2016). How initial task parallelism works

https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works

Hao Zhu (2014).How to control the file numbers of hive table after inserting data on MapR-FS

http://www.openkb.info/2014/12/how-to-control-file-numbers-of-hive.html

Hao Zhu (2015).Hive did not start small file merge if the source table has .deflate files

https://issues.apache.org/jira/browse/HIVE-9398

Lefty Leverenz (2017).LanguageManualJoinOptimization https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization

Hao Zhu (2016). Difference between hive.mapjoin.smalltable.filesize and hive.auto.convert.join.noconditionaltask.size

http://www.openkb.info/2016/01/difference-between-hivemapjoinsmalltabl.html

Nadeem Moidu (2015).Skewed Join Optimization

https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization

8 ANNEXES

8.1 Hadoop

8.1.1 Partition & Sort détaillé

Le fichier local au Mapper qui contient les données partitionnées et triées par clé, est conçu en détail par le processus suivant (Tom White, 2015) :

- le Mapper envoie ses données dans un tampon circulaire, en mémoire (100MB par défaut)

- à chaque fois que ce tampon est chargé à plus de 80% :

o ses données sont divisées en partitions

o chaque partition est triée par clé

o chaque partition triée par clé est déversée dans un fichier du disque local du Mapper

o le Mapper continue d'envoyer ses données dans le tampon circulaire

- à la fin du traitement Map du Mapper, tous les fichiers générés sont fusionnés par partition puis retriés par clé

Figure 22 : traitement MapReduce détaillant le « Partition & Sort »

8.1.2 Définition de la distance entre deux noeuds

Hadoop assimile son infrastructure à un arbre (Tom White, 2015).

Les niveaux d'un arbre sont définis par le datacenter, le rack et enfin, le noeud.

La distance entre deux noeuds est la somme de leur distance au plus proche ancêtre commun.

Ainsi, le noeud le plus proche sera ainsi prioritairement :

- le même noeud

- un autre noeud sur le même rack

- un autre noeud sur le même datacenter

8.2 SQL sur Hadoop

8.2.1 Gérer manuellement le partitionnement dans une requête

Cet exemple est intéressant pour comprendre le rôle de « DISTRIBUTE BY » et « SORT BY ».

Créons une table contenant deux champs « x1 » et « y1 » et chargeons quelques données :

CREATE TABLE z_database1.test AS SELECT 'x1' AS field1,'y1' AS field2;INSERT INTO TABLE z_database1.test VALUES('x2','y2');INSERT INTO TABLE z_database1.test VALUES('x3','y3');INSERT INTO TABLE z_database1.test VALUES('x1','y3');INSERT INTO TABLE z_database1.test VALUES('x0','y3');

Forçons le nombre de Reducer à 3 :

hive> set mapreduce.job.reduces=3;hive> SELECT * FROM z_database1.test DISTRIBUTE BY field2 SORT BY field1;OKx2      y2x0      y3x1      y3x3      y3x1      y1

Nous remarquons bien la distribution par « field2 » et à l'intérieur, le tri par « field1 ».

8.2.2 Comprendre la sérialisation sous Hadoop

L'article suivant explique le principe de la sérialisation sous Hadoop :

http://www.dummies.com/programming/big-data/hadoop/defining-table-record-formats-in-hive/

8.3 Optimisation du SQL sur Hadoop

8.3.1 Exemple de WordCount avec Tez

Voici le lien vers le code source d'un exemple « WordCount » pour Tez :

https://github.com/apache/tez/blob/master/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java

8.3.2 Grouper les splits avec Tez

Voici une illustration d'une table contenant beaucoup de petits fichiers.

hdfsdfs -ls /apps/hive/warehouse/z_database1.db/table1 | grep wc -l13102

L'usage de Tez, permettra de constituer un ensemble de splits pointant chacun vers un ensemble de bloc.

Dans cet exemple, deux containers ont pu être créés au lieu des 13102.

--------------------------------------------------------------------------------

VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED

--------------------------------------------------------------------------------

Map 1 RUNNING 2 0 2 0 0 0

Reducer 2 INITED 1 0 0 1 0 0

Reducer 3 INITED 1 0 0 1 0 0

--------------------------------------------------------------------------------

VERTICES: 00/03 [>>--------------------------] 0% ELAPSED TIME: xx.xx s

--------------------------------------------------------------------------------






Bitcoin is a swarm of cyber hornets serving the goddess of wisdom, feeding on the fire of truth, exponentially growing ever smarter, faster, and stronger behind a wall of encrypted energy








"Les esprits médiocres condamnent d'ordinaire tout ce qui passe leur portée"   François de la Rochefoucauld