Commit 1fca4c44 authored by CHAMONT David's avatar CHAMONT David
Browse files

move to NatationSynchronisee

parent b0615bf4
Petit aperçu de l'utilisation de Spark
============================================================
1) Introduction
============================================================
Cet petit exercice présente un aperçu très simpliste de l'utilisation de Spark. Pour ce faire nous avons préparé une machine virtuelle Ubuntu packagée pour être activable avec VirtualBox que vous devez avoir installé au préalable.
L'image de la VM contient déjà
- une installation de Spark 2.0
- les librairies scientifiques *numpy*, *scipy*, *matplotlib*
- un fichier de données nécessaire à l'exercice
- un script python fournissant la solution de l'exercice
Un support comparable mais sous forme de conteneur Docker est aussi fourni
> docker run -it piscineri3/spark bash
#> cd $HOME
#> source setup_spark.sh
#> pyspark
>>> sc.textFile('pzdist.txt').count()
============================================================
2) Sujet de l'exercice
============================================================
On se place dans un environnement d'astrophysique. Nous considérons une petite simulation de galaxies (très simpliste !!).
Un modèle physique fournit la **fonction de probabilité** du décalage vers le rouge pour deux caractéristiques des galaxies:
- la **magnitude** (M) qui correspond au logarithme de la luminosité apparente de la galaxie *relative* à un objet céleste de référence (très brillant), et est un nombre essentiellement négatif (entre -24 et -12 pour notre exercice)
- et le **type** de la galaxie (T). Nous considérons des galaxies pouvant être de trois types: **elliptiques** (E), **spirales** (S) ou **spirales barrées** (SB)
- Enfin le modèle physique considéré donne la **fonction de distribution de probabilité** (PDF) du décalage vers le rouge pour un échantillonnage de valeurs (discrètes) pour M, T et Z0 où ZO est un offset du décalage vers le rouge entre 0.5 et 3.
Ces PDF sont fournies sous forme tabulée à partir de chaque valeur de Z0 (avec des valeurs sans dimension dont l'intégrale vaut 1 puisque ce sont des distribution de probabilité)
Pour cet exercice extrêmement simplifié, nous allons simplement manipuler ces données en exploitant les mécanismes fonctionnels de Spark.
============================================================
3) Implémentation en Spark (avec son API Python)
============================================================
La structure de données que Spark manipule bien est naturellement organisée comme une liste de **(clés, valeurs)**. Dans notre cas, la première étape va être de lire les données du modèle de physique tout en les formattant selon la logique clés valeurs.
Le fichier de données `pzdist.txt` est un fichier texte organisé en lignes :
<Z0> <T> <M> <PDF>
- Z0 est un offset pour le décalage vers le rouge dont est une valeur échantillonnée selon **linspace(0.05, 3, 60)**
- T est le type 0, 1, 2 pour les types "Ellipse", "Spirale", "Spirale barrée"
- M est la magnitude échantillonnée selon **linspace(-24, -12, 121)**
- PDF est la fonction de distribution de probabilité du décalage vers le rouge à ajouter à Z0. Les valeurs sont échantillonnées selon **linspace(-4.5, 4.5, 901)**
(ici nous utilisons la fonction `linspace(min, max, bins)` de **Numpy** qui distribue linéairement **bins** valeurs entre **min** et **max**)
Nous choisissons:
- comme **clé** le triplet (Z0, T, M) converti en un *hash* entier
- et comme **valeur** le tableau des 901 valeurs de la PDF.
============================================================
4) Quelques mots sur Spark
============================================================
Le opérations de Spark sont des chaînes de processus individuels organisés en un graphe acyclique.
Chaque noeud de ce graphe va:
- soit transformer une donnée (opération de type `map`)
- soit collaborer avec un autre noeud pour assembler deux données (opération de type `reduce`)
Pour qu'un ensemble de données soit compréhensible par Spark il faut que sa structure permette de
découper cet ensemble en données élémentaires indiscernables vis-à-vis de leur structure.
On appelle l'ensemble de données un **Resilient Distributed Dataset** (RDD)
Le travail de Spark va consister à
- découper l'ensemble de données en sous-ensembles de données individuelles (*partitionnement*)
- de distribuer les partitions sur les opérateurs (les *machines*, ou *workers*)
- de distribuer les noeuds du graphe de processus sur les workers
- de décider automatiquement la répartition des données et des opérations
Chaque opération Spark (*donc chaque noeud du graphe*) reçoit un bloc de données (un sous-ensemble d'un RDD)
et le transforme en un nouveau bloc faisant partie d'un nouveau RDD.
Ici nous utilisons l'API Python de SPark, donc le processus est codé en Python. Pour l'exercice, nous utilisons la console interactive **pyspark** qui fournit un contexte opérationnel (avec la variable python **sc**) pour opérer les opérations Spark.
## Utiliser Spark
Commencez par définir quelques variables d'environnement:
source setup_spark.sh
Nous allons ici utiliser l'outil **pyspark** qui est un interpréteur interactif python qui est configuré pour connaître le contexte Spark
pyspark
============================================================
5) Première étape: lecture des données.
============================================================
Nous allons décomposer le processus étape par étape
## Lecture simple ligne par ligne:
Les données produites par cette opération sont un ensemble de lignes de texte (chaque ligne est un tableau de caractères)
d1 = sc.textFile('pzdist.txt')
`Attention:`
Dans la suite, chaque opération s'applique individuellement sur une *donnée* (ici la structure des données de simulation est définie par ligne) mais, après chaque opération le format interne d'une donnée va changer.
L'opération élémentaire qui transforme une donnée est l'opération **map** qui peut à la fois changer la structure interne d'une donnée mais aussi ses valeurs
Notez que l'on peut utiliser l'opération **take(n)** pour visualiser les n premières données d'un RDD, exemple:
print data.take(1)
## Filtrage
Pour suprimer les lignes de commentaire. Ici on utilise l'opération "filter" qui élimine les données qui ne vérifient pas une condition:
d2 = d1.filter(lambda u: u[0] != '#')
## Découpage de chaque ligne en un tableau de mots
d3 = d2.map(lambda u: u.split())
## formattage en (clé, valeur)
Avec comme choix de formattage:
- clé = hash(Z0, T, M)
- et valeur = [PDF]
d4 = d3.map(lambda u : (hash(int(u[0]), int(u[1]), int(u[2])), [float(z) for z in u[3:]]) )
Pour produire une clé (*hash*) nous définissons une fonction python toute simple qui tient compte de la cardinalité des valeurs pour Z0, T, M:
def hash(z, galType, magnitude):
return z + galType * 60 + magnitude * 3 * 60
En fait, le modèle de physique ne fournit pas de PDF pour toutes les valeurs possible de (Z0, T, M) on ne va donc garder que les valeurs utiles:
data = d4.filter(lambda u : sum(u[0]) > 0)
==> Ici nous avons obtenu le RDD **data** l'ensemble des ((Z0, T, M), [PDF]) qui va nous servir à générer nos galaxies.
Vous pouvez visualisez ces données avec:
print data.take(3)
(pour visualiser les 3 premières données)
============================================================
6) Deuxième étape: Distribution des données de simulation pour la génération
============================================================
Ici, les données de simulation que nous venons de préparer ne devront pas être "distribuées" au sens Spark puisqu'une distribution
basique signifierait un "partitionnement" alors que nous souhaitons ici partager l'intégralité des PDF dans chacun des workers qui vont produire les galaxies.
Pour cela nous allons utiliser une opération particulière **broadcast** qui enverra une copie de la structure de données en parallèle vers tous les workers.
## Récupération des données de simulation dans le monde Python:
dv = data.collect()
Mais pour accéder facilement à une PDF donnée, associée à une clé (Z0, T, V) on préfère utiliser un **dictionnaire** python. Donc on va convertir les données liste (K, V) en un dictionnaire {K: V}:
simulations = {k: v for k, v in dv}
## Envoi du dictionnaire vers tous les workers
sc.broadcast(simulations)
============================================================
6) Troisième étape: Génération des galaxies
============================================================
## on va choisir le nombre de générations:
N = 100
## Déclaration de N noeuds de calcul en parallèle:
r1 = sc.parallelize(xrange(N))
## Tirage aléatoire d'une clé hash(Z0, T, M)
Dans chacun des noeuds de calcul en utilisant l'échantillonage des données de simulation:
r2 = r1.map(lambda u: hash(random.randint(0, 60), random.randint(0, 2), random.randint(0, 121)))
## Sélection des tirages utiles
on ne va considérer que les tirages réellement associés à une PDF
r3 = r2.filter(lambda u : u in d)
## Visualisation du nombre de galaxies produites:
print r3.count()
==> ensuite on pourrait réellement appliquer la fonction de probabilité pour chaque clé
galaxies = r3.map(lambda u: generate(simulations[u]) )
... en écrivant la fonction **generate** mais ceci n'est pas l'objectif de cet exercice.
Prparation d'un environnement Spark partir d'une machine VirtualBox
0) Installation de VirtualBox
https://www.virtualbox.org/wiki/Downloads
1) cration de la machine virtuelle
1.1) Rcuprer deux images iso
- ubuntu-16.04-desktop-amd64.iso
+ http://www.ubuntu.com/download/desktop
- VBoxGuestAdditions_5.1.2.iso
+ https://www.virtualbox.org/wiki/Downloads
+ http://download.virtualbox.org/virtualbox/5.1.2/VBoxGuestAdditions_5.1.2.iso
1.2) dans VirtualBox:
- crer un nouvelle machine
+ ubuntu
+ RAM=4 Go
+ Disk allocation dynamique 40 Go
+ Storage:
- Primary master: ubuntu-16.04-desktop-amd64.iso
- secondary master: VBoxGuestAdditions_5.1.2.iso
2) Dmarrer la VM et configurer le systme Ubuntu
- slectionner le langage
- installer ubuntu
- tlcherger les update
- effacer le disque
- configurer le clavier
- crer l'utilisateur principal: spark/spark
lancer l'installation puis redmarer
3) Prparation des additions
- ouvrir un terminal
> sudo apt-get install dkms build-essential linux-headers-generic
> cd /media/spark/VBOX*
> sudo sh VBoxAdditions.run
redmarrer la machine.
4) Installation de Spark
sudo apt-get -y update
sudo apt-get -y install openjdk-8-jdk
export SPARK_VERSION="2.0.0"
export HADOOP_VERSION="2.7"
export APACHE_MIRROR="http://apache.crihan.fr/dist/spark"
export SPARK=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
rm -rf /tmp/spark-*
curl ${APACHE_MIRROR}/spark-${SPARK_VERSION}/${SPARK}.tgz -o /tmp/${SPARK}.tgz
tar xvz -C $HOME -f /tmp/${SPARK}.tgz
ln -s ${HOME}/${SPARK}/ ${HOME}/spark
sed -i 's/# - SPARK_MASTER_OPTS.*/SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4 -Dspark.executor.memory=2G"/' spark/conf/spark-env.sh
export PATH=$HOME/spark/bin:$PATH
sudo apt-get install python-numpy python-scipy python-matplotlib
vi setup_spark.sh
export SPARK_VERSION="2.0.0"
export HADOOP_VERSION="2.7"
export PATH=$HOME/spark/bin:$PATH
spark/bin/pyspark
5) En partant de la machine machine prconfigure
- Lancer un terminal
> source spark-setup.sh
> pyspark
sc.textFile('pzdist.txt').count()
name := "Pool"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.0"
time spark-submit --class PoolApp target/scala-2.11/pool_2.11-1.0.jar --executor-cores 18 --executor-memory 30g
import csv
import numpy as np
import random
import sys
import pickle
from scipy import stats
from pyspark import SparkConf, SparkContext
def hash(z0, galType, magnitude):
return z0 + 60 * galType + 3 * 60 * magnitude
def main(sc, N=100):
print "start main..."
d1 = sc.textFile('./pzdist.txt')
d2 = d1.filter(lambda u: u[0] != '#')
d3 = d2.map(lambda u: u.split())
d4 = d3.map(lambda u : (hash(int(u[0]), int(u[1]), int(u[2])), [float(z) for z in u[3:]]) )
data = d4.filter(lambda u : sum(u[1]) > 0)
# print "data", data.take(3)
dv = data.collect()
d = {k: v for k, v in dv}
# print d.keys()
r1 = sc.parallelize(xrange(N))
r2 = r1.map(lambda u: hash(random.randint(0, 60), random.randint(0, 2), random.randint(0, 121)))
print r2.take(10)
r3 = r2.filter(lambda u : u in d)
print "r3 count=", r3.count()
print "end main..."
if __name__ == "__main__":
# Configure Spark
print sys.argv
N = 1000
if len(sys.argv) >= 2:
N = int(sys.argv[1])
conf = SparkConf().setAppName("POOL").set("spark.kryoserializer.buffer.max", "100m") # .set("spark.executor.cores", 12).set("spark.executor.memory", "16g")
sc = SparkContext(conf=conf)
main(sc, N=N)
print 'end...............'
import csv
import numpy as np
import random
import sys, os
import pickle
from scipy import stats
from pyspark import SparkConf, SparkContext
from pyspark.serializers import BatchedSerializer, PickleSerializer
def hash(z0, galType, magnitude):
return z0 + 61 * galType + 3 * 61 * magnitude
class Generator:
def __init__(self, dz):
# self.dz = v
N = len(dz)
xk = np.arange(N)
pk = np.array(dz)
pk = pk / pk.sum()
self.my_generator = stats.rv_discrete(name='zdist', values=(xk, pk))
self.zspec = np.linspace(0.05, 3, 60)
self.mag = np.linspace(-24, -12, 121)
self.galType = {0: "Ell", 1: "S", 2: "SB"}
def generate(self, u):
z0 = u % 61
u2 = (u - z0) / 61
t = u2 % 3
m = (u2 - t) / 3
zspec = self.zspec[z0]
magnitude = self.mag[m]
z_simul = self.my_generator.rvs(size=1)
tries = (z_simul * 9.0 / 901.0) - 4.5 + zspec
return "%s;%s;%s" % ( tries[0], t, m )
def main(sc, N=100):
print "start main..."
d1 = sc.textFile('pzdist.txt')
d2 = d1.filter(lambda u: u[0] != '#')
d3 = d2.map(lambda u: u.split())
d4 = d3.map(lambda u : (hash(int(u[0]), int(u[1]), int(u[2])), [float(z) for z in u[3:]]) )
data = d4.filter(lambda u : sum(u[1]) > 0)
# print "data", data.take(3)
dv = data.collect()
da = [None for i in range(61*3*122)]
total = 0
for k, v in dv:
o = Generator(v)
da[k] = o
total += 1
print "N=%d total=%d" % (N, total)
r1 = sc.parallelize(xrange(N))
sc.broadcast(da)
r2 = r1.map(lambda u: hash(random.randint(0, 60), random.randint(0, 2), random.randint(0, 121)))
def empty(da, u):
return da[u] is None
r3 = r2.filter(lambda u : not empty(da, u))
print "filtered samples = %s" % (r3.count())
r4 = r3.map(lambda u : da[u].generate(u))
print r3.count()
# r3.saveAsTextFile("/user/christian.arnault/pool.csv")
print "end main..."
if __name__ == "__main__":
# Configure Spark
print sys.argv
N = 1000
if len(sys.argv) >= 2:
N = int(sys.argv[1])
conf = SparkConf().setAppName("POOL") #.set("spark.kryoserializer.buffer.max", "1g") # .set("spark.executor.cores", 12).set("spark.executor.memory", "16g")
sc = SparkContext(conf=conf,
serializer=PickleSerializer(),
batchSize=-1
)
main(sc, N=N)
print 'end...............'
/* Pool.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
object PoolApp {
case class Key(z: Int, galType: Int, magnitude: Int) { def hash: Int = z + galType*60 + magnitude*60*3 }
case class PDF(pdf: Array[Float])
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
val sc = new SparkContext(conf)
val logFile = "./pzdist.txt"
val data = sc.textFile(logFile)
.filter(line => ! line.contains('#')) // ignore comments
.map(line => line.split(" ")) // split in words
.map(line => ( Key(line(0).toInt, line(1).toInt, line(2).toInt).hash, // format as (K,V, boolean) to catch empty PDFs
PDF(line.takeRight(line.length - 3).map(_.toFloat)),
line.takeRight(line.length - 3).forall(_ == "0") ) )
.filter(_._3 == false) // ignore empty PDFs
.map(line => (line._1, line._2)) // reformat as (K, V) = (Key hash, PDF)
.map(identity).collect() // collect data
val simulationArray = data.asInstanceOf[Array[(Int, PDF)]] // cast to Scala know type
val sims : Map[Int, PDF] = Map(simulationArray : _*) // change to Map[Key hash, PDF]
val N: Long = 100L*1000L*1000L*1000L
val limit: Long = Int.MaxValue / 2
var samples: Long = 0L
var each: Int = 0
var times: Long = 1
if (N > limit)
{
times = N / limit
each = limit.toInt
}
else
{
times = 1
each = N.toInt
}
println("N:" + N)
println("limit:" + limit)
println("times:" + times)
for (small <- 1 to times.toInt)
{
val sample = sc.parallelize(0 to limit.toInt) // build sample size
.map(x => ( Key( (Math.random()*60).toInt, (Math.random()*3).toInt, (Math.random()*121).toInt ).hash ) ) // generate random keys
.filter(x => sims.contains(x)) // select only non empty PDFs
samples += sample.count()
}
println("samplings: " + samples)
}
}
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment