Cet petit exercice présente un aperçu très simpliste de l'utilisation de Spark. Pour ce faire nous avons préparé une machine virtuelle Ubuntu packagée pour être activable avec VirtualBox que vous devez avoir installé au préalable.
Cet petit exercice présente un aperçu très simpliste de l'utilisation de Spark. Pour ce faire nous avons préparé une machine virtuelle Ubuntu packagée pour être activable avec VirtualBox que vous devez avoir installé au préalable.
L'image de la VM contient déjà
L'image de la VM contient déjà
- une installation de Spark 1.6
- les librairies scientifiques *numpy*, *scipy*, *matplotlib*
- un fichier de données nécessaire à l'exercice
- un fichier de données nécessaire à l'exercice
- un script python fournissant la solution de l'exercice
On se place dans un environnement d'astrophysique. Nous considérons une petite simulation de galaxies (très simpliste !!).
On se place dans un environnement d'astrophysique. Nous considérons une petite simulation de galaxies (très simpliste !!).
Un modèle physique fournit la **fonction de probabilité** du décalage vers le rouge pour deux caractéristiques des galaxies:
Un modèle physique fournit la **fonction de probabilité** du décalage vers le rouge pour deux caractéristiques des galaxies:
- la **magnitude** (M) qui correspond au logarithme de la luminosité apparente de la galaxie *relative*à un objet céleste de référence (très brillant), et est un nombre essentiellement négatif (entre -24 et -12 pour notre exercice)
- la **magnitude** (M) qui correspond au logarithme de la luminosité apparente de la galaxie *relative* à un objet céleste de référence (très brillant), et est un nombre essentiellement négatif (entre -24 et -12 pour notre exercice)
- et le **type** de la galaxie (T). Nous considérons des galaxies pouvant être de trois types: **elliptiques** (E), **spirales** (S) ou **spirales barrées** (SB)
- et le **type** de la galaxie (T). Nous considérons des galaxies pouvant être de trois types: **elliptiques** (E), **spirales** (S) ou **spirales barrées** (SB)
- Enfin le modèle physique considéré donne la **fonction de distribution de probabilité** (PDF) du décalage vers le rouge pour un échantillonnage de valeurs (discrètes) pour M, T et Z0 où ZO est un offset du décalage vers le rouge entre 0.5 et 3.
- Enfin le modèle physique considéré donne la **fonction de distribution de probabilité** (PDF) du décalage vers le rouge pour un échantillonnage de valeurs (discrètes) pour M, T et Z0 où ZO est un offset du décalage vers le rouge entre 0.5 et 3.
Ces PDF sont fournies sous forme tabulée à partir de chaque valeur de Z0 (avec des valeurs sans dimension dont l'intégrale vaut 1 puisque ce sont des distribution de probabilité)
Ces PDF sont fournies sous forme tabulée à partir de chaque valeur de Z0 (avec des valeurs sans dimension dont l'intégrale vaut 1 puisque ce sont des distribution de probabilité)
Pour cet exercice extrêmement simplifié, nous allons simplement manipuler ces données en exploitant les mécanismes fonctionnels de Spark.
Pour cet exercice extrêmement simplifié, nous allons simplement manipuler ces données en exploitant les mécanismes fonctionnels de Spark.
La structure de données que Spark manipule bien est naturellement organisée comme une liste de **(clés, valeurs)**. Dans notre cas, la première étape va être de lire les données du modèle de physique tout en les formattant selon la logique clés valeurs.
La structure de données que Spark manipule bien est naturellement organisée comme une liste de **(clés, valeurs)**. Dans notre cas, la première étape va être de lire les données du modèle de physique tout en les formattant selon la logique clés valeurs.
Le fichier de données `pzdist.txt` est un fichier texte organisé en lignes :
Le fichier de données `pzdist.txt` est un fichier texte organisé en lignes :
<Z0> <T> <M> <PDF>
- Z0 est un offset pour le décalage vers le rouge dont est une valeur échantillonnée selon **linspace(0.05, 3, 60)**
- T est le type 0, 1, 2 pour les types "Ellipse", "Spirale", "Spirale barrée"
- M est la magnitude échantillonnée selon **linspace(-24, -12, 121)**
- PDF est la fonction de distribution de probabilité du décalage vers le rouge à ajouter à Z0. Les valeurs sont échantillonnées selon **linspace(-4.5, 4.5, 901)**
- Z0 est un offset pour le décalage vers le rouge dont est une valeur échantillonnée selon **linspace(0.05, 3, 60)**
- T est le type 0, 1, 2 pour les types "Ellipse", "Spirale", "Spirale barrée"
- M est la magnitude échantillonnée selon **linspace(-24, -12, 121)**
- PDF est la fonction de distribution de probabilité du décalage vers le rouge à ajouter à Z0. Les valeurs sont échantillonnées selon **linspace(-4.5, 4.5, 901)**
(ici nous utilisons la fonction `linspace(min, max, bins)` de **Numpy** qui distribue linéairement **bins** valeurs entre **min** et **max**)
(ici nous utilisons la fonction `linspace(min, max, bins)` de **Numpy** qui distribue linéairement **bins** valeurs entre **min** et **max**)
Nous choisissons:
- comme **clé** le triplet (Z0, T, M)
- comme **clé** le triplet (Z0, T, M) converti en un *hash* entier
- et comme **valeur** le tableau des 901 valeurs de la PDF.
Le opérations de Spark sont des chaînes de processus individuels organisés en un graphe acyclique.
Le opérations de Spark sont des chaînes de processus individuels organisés en un graphe acyclique.
Chaque noeud de ce graphe va:
- soit transformer une donnée (opération de type `map`)
- soit collaborer avec un autre noeud pour assembler deux données (opération de type `reduce`)
- soit transformer une donnée (opération de type `map`)
- soit collaborer avec un autre noeud pour assembler deux données (opération de type `reduce`)
Pour qu'un ensemble de données soit compréhensible par Spark il faut que sa structure permette de
découper cet ensemble en données élémentaires indiscernables vis-à-vis de leur structure.
Pour qu'un ensemble de données soit compréhensible par Spark il faut que sa structure permette de
découper cet ensemble en données élémentaires indiscernables vis-à-vis de leur structure.
On appelle l'ensemble de données un **Resilient Distributed Dataset** (RDD)
On appelle l'ensemble de données un **Resilient Distributed Dataset** (RDD)
Le travail de Spark va consister à
Le travail de Spark va consister à
- découper l'ensemble de données en sous-ensembles de données individuelles (*partitionnement*)
- de distribuer les partitions sur les opérateurs (les *machines*, ou *workers*)
- découper l'ensemble de données en sous-ensembles de données individuelles (*partitionnement*)
- de distribuer les partitions sur les opérateurs (les *machines*, ou *workers*)
- de distribuer les noeuds du graphe de processus sur les workers
- de décider automatiquement la répartition des données et des opérations
- de décider automatiquement la répartition des données et des opérations
Chaque opération Spark (*donc chaque noeud du graphe*) reçoit un bloc de données (un sous-ensemble d'un RDD)
Chaque opération Spark (*donc chaque noeud du graphe*) reçoit un bloc de données (un sous-ensemble d'un RDD)
et le transforme en un nouveau bloc faisant partie d'un nouveau RDD.
Ici nous utilisons l'API Python de SPark, donc le processus est codé en Python. Pour l'exercice, nous utilisons la console interactive **pyspark** qui fournit un contexte opérationnel (avec la variable python **sc**) pour opérer les opérations Spark.
Ici nous utilisons l'API Python de SPark, donc le processus est codé en Python. Pour l'exercice, nous utilisons la console interactive **pyspark** qui fournit un contexte opérationnel (avec la variable python **sc**) pour opérer les opérations Spark.
Nous allons décomposer le processus étape par étape
Nous allons décomposer le processus étape par étape
## Lecture simple ligne par ligne:
Les données produites par cette opération sont un ensemble de lignes de texte (chaque ligne est un tableau de caractères)
Les données produites par cette opération sont un ensemble de lignes de texte (chaque ligne est un tableau de caractères)
d1 = sc.textFile('pzdist.txt')
`Attention:`
Dans la suite, chaque opération s'applique individuellement sur une *donnée* (ici la structure des données de simulation est définie par ligne) mais, après chaque opération le format interne d'une donnée va changer.
Dans la suite, chaque opération s'applique individuellement sur une *donnée* (ici la structure des données de simulation est définie par ligne) mais, après chaque opération le format interne d'une donnée va changer.
L'opération élémentaire qui transforme une donnée est l'opération **map** qui peut à la fois changer la structure interne d'une donnée mais aussi ses valeurs
L'opération élémentaire qui transforme une donnée est l'opération **map** qui peut à la fois changer la structure interne d'une donnée mais aussi ses valeurs
Notez que l'on peut utiliser l'opération **take(n)** pour visualiser les n premières données d'un RDD, exemple:
Notez que l'on peut utiliser l'opération **take(n)** pour visualiser les n premières données d'un RDD, exemple:
print data.take(1)
## Filtrage
Pour suprimer les lignes de commentaire. Ici on utilise l'opération "filter" qui élimine les données qui ne vérifient pas une condition:
Pour suprimer les lignes de commentaire. Ici on utilise l'opération "filter" qui élimine les données qui ne vérifient pas une condition:
d2 = d1.filter(lambda u: u[0] != '#')
## Découpage de chaque ligne en un tableau de mots
## Découpage de chaque ligne en un tableau de mots
d3 = d2.map(lambda u: u.split())
## formattage en (clé, valeur)
## formattage en (clé, valeur)
Avec comme choix de formattage:
- clé=(Z0, T, M)
- et valeur=[PDF]
- clé = hash(Z0, T, M)
- et valeur = [PDF]
d4 = d3.map(lambda u : ((int(u[0]), int(u[1]), int(u[2])), [float(z) for z in u[3:]]) )
d4 = d3.map(lambda u : (hash(int(u[0]), int(u[1]), int(u[2])), [float(z) for z in u[3:]]) )
Pour produire une clé (*hash*) nous définissons une fonction python toute simple qui tient compte de la cardinalité des valeurs pour Z0, T, M:
En fait, le modèle de physique ne fournit pas de PDF pour toutes les valeurs possible de (Z0, T, M) on ne va donc garder que les valeurs utiles:
def hash(z, galType, magnitude):
return z + galType * 60 + magnitude * 3 * 60
En fait, le modèle de physique ne fournit pas de PDF pour toutes les valeurs possible de (Z0, T, M) on ne va donc garder que les valeurs utiles:
data = d4.filter(lambda u : sum(u[0]) > 0)
==> Ici nous avons obtenu le RDD **data** l'ensemble des ((Z0, T, M), [PDF]) qui va nous servir à générer nos galaxies.
==> Ici nous avons obtenu le RDD **data** l'ensemble des ((Z0, T, M), [PDF]) qui va nous servir à générer nos galaxies.
Ici, les données de simulation que nous venons de préparer ne devront pas être "distribuées" au sens Spark puisqu'une distribution
basique signifierait un "partitionnement" alors que nous souhaitons ici partager l'intégralité des PDF dans chacun des workers qui vont produire les galaxies.
Ici, les données de simulation que nous venons de préparer ne devront pas être "distribuées" au sens Spark puisqu'une distribution
basique signifierait un "partitionnement" alors que nous souhaitons ici partager l'intégralité des PDF dans chacun des workers qui vont produire les galaxies.
Pour cela nous allons utiliser une opération particulière **broadcast** qui enverra une copie de la structure de données en parallèle vers tous les workers.
Pour cela nous allons utiliser une opération particulière**broadcast** qui enverra une copie de la structure de données en parallèle vers tous les workers.
## Récupération des données de simulation dans le monde Python:
## Récupération des données de simulation dans le monde Python:
dv = data.collect()
Mais pour accéder facilement à une PDF donnée, associée à une clé (Z0, T, V) on préfère utiliser un **dictionnaire** python. Donc on va convertir les données liste (K, V) en un dictionnaire {K: V}:
Mais pour accéder facilement à une PDF donnée, associée à une clé (Z0, T, V) on préfère utiliser un **dictionnaire** python. Donc on va convertir les données liste (K, V) en un dictionnaire {K: V}: