Skip to content
Snippets Groups Projects
Commit f92f4781 authored by Christian Arnault's avatar Christian Arnault
Browse files

pool.scala

parent d627f9d6
No related branches found
No related tags found
1 merge request!3cleanup pool.scala
**/*~
**/target/*
**/project/*
File mode changed from 100644 to 100755
Spark/pool/run 100644 → 100755
File mode changed from 100644 to 100755
......@@ -4,25 +4,10 @@ import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
import List._
import Array._
import Seq._
import Map._
import Math._
object PoolApp {
case class Key(z: Int, galType: Int, magnitude: Int) { def hash: Int = z + galType*3 + magnitude*121 }
case class Key(z: Int, galType: Int, magnitude: Int) { def hash: Int = z + galType*60 + magnitude*60*3 }
case class PDF(pdf: Array[Float])
def main(args: Array[String]) {
......@@ -35,27 +20,28 @@ object PoolApp {
val logFile = "./pzdist.txt"
val d1 = sc.textFile(logFile)
val d2 = d1.filter(line => ! line.contains('#'))
val d3 = d2.map(line => line.split(" "))
val d4 = d3.map(line => ( Key(line(0).toInt, line(1).toInt, line(2).toInt).hash,
PDF(line.takeRight(line.length - 3).map(_.toFloat)),
line.takeRight(line.length - 3).forall(_ == "0") ) )
val d5 = d4.filter(_._3 == false)
val d6 = d5.map(line => (line._1, line._2))
val data = d6.map(identity).collect()
val data = sc.textFile(logFile)
.filter(line => ! line.contains('#')) // ignore comments
.map(line => line.split(" ")) // split in words
.map(line => ( Key(line(0).toInt, line(1).toInt, line(2).toInt).hash, // format as (K,V, boolean) to catch empty PDFs
PDF(line.takeRight(line.length - 3).map(_.toFloat)),
line.takeRight(line.length - 3).forall(_ == "0") ) )
.filter(_._3 == false) // ignore empty PDFs
.map(line => (line._1, line._2)) // reformat as (K, V) = (Key hash, PDF)
.map(identity).collect() // collect data
val simulationArray = data.asInstanceOf[Array[(Int, PDF)]]
val simulationArray = data.asInstanceOf[Array[(Int, PDF)]] // cast to Scala know type
val sims : Map[Int, PDF] = Map(simulationArray : _*)
val sims : Map[Int, PDF] = Map(simulationArray : _*) // change to Map[Key hash, PDF]
val r1 = sc.parallelize(0 to 1000*1000*1000).map(x => ( Key( (Math.random()*60).toInt, (Math.random()*3).toInt, (Math.random()*121).toInt ).hash ) )
val r2 = r1.filter(x => sims.contains(x))
val sample = sc.parallelize(0 to 1000*1000*1000) // build sample size
.map(x => ( Key( (Math.random()*60).toInt, (Math.random()*3).toInt, (Math.random()*121).toInt ).hash ) ) // generate random keys
.filter(x => sims.contains(x)) // select only non empty PDFs
val n = r2.count()
val samples = sample.count()
println("samplings: " + n)
println("samplings: " + samples)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment