Skip to content
Snippets Groups Projects

Version de pool.py mais avec la simulation effective

Merged Arnault requested to merge chris.arnault/PiscineJI:master into master
+ 106
2
Compare changes
  • Side-by-side
  • Inline
Files
+ 104
0
import csv
import numpy as np
import random
import sys, os
import pickle
from scipy import stats
from pyspark import SparkConf, SparkContext
from pyspark.serializers import BatchedSerializer, PickleSerializer
def hash(z0, galType, magnitude):
return z0 + 61 * galType + 3 * 61 * magnitude
class Generator:
def __init__(self, dz):
# self.dz = v
N = len(dz)
xk = np.arange(N)
pk = np.array(dz)
pk = pk / pk.sum()
self.my_generator = stats.rv_discrete(name='zdist', values=(xk, pk))
self.zspec = np.linspace(0.05, 3, 60)
self.mag = np.linspace(-24, -12, 121)
self.galType = {0: "Ell", 1: "S", 2: "SB"}
def generate(self, u):
z0 = u % 61
u2 = (u - z0) / 61
t = u2 % 3
m = (u2 - t) / 3
zspec = self.zspec[z0]
magnitude = self.mag[m]
z_simul = self.my_generator.rvs(size=1)
tries = (z_simul * 9.0 / 901.0) - 4.5 + zspec
return "%s;%s;%s" % ( tries[0], t, m )
def main(sc, N=100):
print "start main..."
d1 = sc.textFile('pzdist.txt')
d2 = d1.filter(lambda u: u[0] != '#')
d3 = d2.map(lambda u: u.split())
d4 = d3.map(lambda u : (hash(int(u[0]), int(u[1]), int(u[2])), [float(z) for z in u[3:]]) )
data = d4.filter(lambda u : sum(u[1]) > 0)
# print "data", data.take(3)
dv = data.collect()
da = [None for i in range(61*3*122)]
total = 0
for k, v in dv:
o = Generator(v)
da[k] = o
total += 1
print "N=%d total=%d" % (N, total)
r1 = sc.parallelize(xrange(N))
sc.broadcast(da)
r2 = r1.map(lambda u: hash(random.randint(0, 60), random.randint(0, 2), random.randint(0, 121)))
def empty(da, u):
return da[u] is None
r3 = r2.filter(lambda u : not empty(da, u))
print "filtered samples = %s" % (r3.count())
r4 = r3.map(lambda u : da[u].generate(u))
print r3.count()
# r3.saveAsTextFile("/user/christian.arnault/pool.csv")
print "end main..."
if __name__ == "__main__":
# Configure Spark
print sys.argv
N = 1000
if len(sys.argv) >= 2:
N = int(sys.argv[1])
conf = SparkConf().setAppName("POOL") #.set("spark.kryoserializer.buffer.max", "1g") # .set("spark.executor.cores", 12).set("spark.executor.memory", "16g")
sc = SparkContext(conf=conf,
serializer=PickleSerializer(),
batchSize=-1
)
main(sc, N=N)
print 'end...............'
Loading