DPark MapReduce (Davies) davies@douban.com 2011/12/07 Velocity China 2011
Douban
Douban 5500
Douban 5500 1000G,
Douban 5500 1000G, 60+
Douban 5500 1000G, 60+ 200+
Douban 5500 1000G, 60+ 200+ >
MooseFS 260 T, 44
MooseFS 260 T, 44 InfoBright ICE weblog 60+ B
MooseFS 260 T, 44 InfoBright ICE weblog 60+ B Python R MPI (C++)
Hadoop
Hadoop 08
Hadoop 08
Hadoop 08 09
Hadoop 08 09 HDFS, MapR, ZooKeeper, Hive
Hadoop 08 09 HDFS, MapR, ZooKeeper, Hive Dumbo
Hadoop, MPI
Hadoop, MPI
Hadoop, MPI Hadoop
Hadoop, MPI Hadoop MPI
Apache Mesos* Linux Container MPI Hadoop DRun DPark Mesos Node Node Node Node http://www.mesosproject.org/
DRun
DRun Mesos
DRun Mesos MPICH2
DRun Mesos MPICH2 ad-hoc RANK, SIZE
DRun Mesos MPICH2 ad-hoc RANK, SIZE $ drun -n 10 -c 1 -m 10m -r 2 hostname [3@alg221] alg221 [2@alg224] alg224...
DRun
DRun 70 AAC
DRun 70 AAC
DRun + Mesos
Spark* Lightning-Fast Cluster Computing val file = spark.textfile("hdfs://...") file.flatmap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) Word Count in Spark * http://www.spark-project.org/
Spark
Spark Logistic regression in Spark vs Hadoop
Spark
Spark Spark: Cluster Computing with Working Sets. Matei Zaharia, Mosharaf Chowdhury, Michael J. Franklin, Scott Shenker, Ion Stoica. USENIX HotCloud 2010. June 2010. 1 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica.Technical Report UCB/EECS-2011-82. July 2011.
Spark
Spark
Spark Scala, 10000 locs
Spark Scala, 10000 locs
Spark Scala, 10000 locs
Spark Scala, 10000 locs Scala
Spark Scala, 10000 locs Scala Python / C
DPark
DPark Spark Python
DPark Spark Python Python
DPark Spark Python Python 7
DPark Spark Python Python 7 Hive 15 :-)
DPark Spark Python Python 7 Hive 15 :-),
DPark Spark Python Python 7 Hive 15 :-), Hadoop
DPark
DPark RDD (Resilient Distributed Dataset)
DPark RDD (Resilient Distributed Dataset) RDD Split Split Split Split Split
RDD
RDD
RDD dpark.parallelize(range(100), 10)
RDD dpark.parallelize(range(100), 10)
RDD dpark.parallelize(range(100), 10) dpark.textfile( /mfs/weblog/ )
RDD
RDD RDD RDD
RDD RDD RDD
RDD RDD RDD FileRDD Split Split Split Split
RDD RDD RDD FileRDD Split Split Split Split map()
RDD RDD RDD FileRDD Split Split Split Split map() MappedRDD Split Split Split Split
RDD RDD RDD FileRDD Split Split Split Split map() MappedRDD Split Split Split Split reduce()
RDD RDD RDD FileRDD Split Split Split Split map() MappedRDD Split Split Split Split reduce() ShuffledRDD Split Split Split
RDD count(), collect(),save() DAG Stage Shuffle, Result
RDD FileRDD count(), collect(),save() DAG Stage map() MappedRDD reduce() ShuffledRDD Shuffle, Result
RDD FileRDD count(), collect(),save() DAG Stage Shuffle, Result map() MappedRDD reduce() ShuffledRDD collect() Result
RDD FileRDD count(), collect(),save() map() MappedRDD DAG reduce()... Stage Shuffle, Result collect() ShuffledRDD Result
RDD FileRDD count(), collect(),save() map() MappedRDD Shuffle DAG reduce()... Stage Shuffle, Result collect() ShuffledRDD Result
RDD FileRDD count(), collect(),save() map() MappedRDD Shuffle DAG reduce()... Stage Shuffle, Result ShuffledRDD Result collect() Result
RDD FileRDD Split Split Split Split map() MappedRDD Split Split Split Split reduce()............ ShuffledRDD Split Split Split collect() Result Result
RDD FileRDD Split Split Split Split map() MappedRDD Split Split Split Split reduce()............ ShuffledRDD Split Split Split Task collect() Result Result
DPark on Mesos Mesos Mesos Master Slave 1 Mesos Slave 2...
DPark on Mesos Mesos Mesos Master Slave 1 Mesos Slave 2... DPark App
DPark on Mesos Mesos Mesos Master Slave 1 Mesos Slave 2... DPark App
DPark on Mesos Mesos Mesos Master Slave 1 Mesos Slave 2... DPark Executor DPark App
DPark on Mesos Mesos Mesos Master DPark App Slave 1 Mesos Slave 2... DPark Executor Worker Worker Worker
DPark on Mesos Mesos Mesos Master DPark App Slave 1 Mesos Slave 2 DPark Executor... Cache, Broadcast Worker Worker Worker
Word Count
Word Count files = dpark.textfile( word.txt')
Word Count files = dpark.textfile( word.txt') words = files.flatmap(str.split).map(lambda x:(x,1))
Word Count files = dpark.textfile( word.txt') words = files.flatmap(str.split).map(lambda x:(x,1)) cnt = words.reducebykey(add).collectasmap()
Word Count files = dpark.textfile( word.txt') words = files.flatmap(str.split).map(lambda x:(x,1)) cnt = words.reducebykey(add).collectasmap() $ python wordcount.py
Word Count files = dpark.textfile( word.txt') words = files.flatmap(str.split).map(lambda x:(x,1)) cnt = words.reducebykey(add).collectasmap() $ python wordcount.py $ python wordcount.py -m mesos
>>> dpark = DparkContext('mesos') >>> d = dpark.parallelize(range(1000), 10) >>> print d.reduce(add) [dpark] 2011-12-02 17:10:47,670 Got a job with 10 tasks [dpark] 2011-12-02 17:11:04,926 Finished 5 (progress: 1/10) in 6.64s... 499500
K-Means
K-Means
K-Means
K-Means
K-Means
K-Means
K-Means
K-Means
K-Means
K-Means points = dpark.textfile('kmeans.txt').map(parsevector).cache()
K-Means points = dpark.textfile('kmeans.txt').map(parsevector).cache() centers = [Vector() for i in range(k)]
K-Means points = dpark.textfile('kmeans.txt').map(parsevector).cache() centers = [Vector() for i in range(k)] for it in range(n):
K-Means points = dpark.textfile('kmeans.txt').map(parsevector).cache() centers = [Vector() for i in range(k)] for it in range(n): ps = points.map(lambda p:(closestcenter(p, centers), (p, 1)))
K-Means points = dpark.textfile('kmeans.txt').map(parsevector).cache() centers = [Vector() for i in range(k)] for it in range(n): ps = points.map(lambda p:(closestcenter(p, centers), (p, 1))) ncenters = ps.reducebykey(mergepoints).collectasmap()
K-Means points = dpark.textfile('kmeans.txt').map(parsevector).cache() centers = [Vector() for i in range(k)] for it in range(n): ps = points.map(lambda p:(closestcenter(p, centers), (p, 1))) ncenters = ps.reducebykey(mergepoints).collectasmap() centers = ncenters.values()
def cos((a, b)): return cos_sim(a, b) ratings = spark.csvfile( r.csv ).map(parse).groupbykey() blocks = ratings.glom().cache() sims = blocks.cartesion(blocks).flatmap(cos) final = sims.reducebykey(lambda x,y:x+y)
def cos((a, b)): return cos_sim(a, b) ratings = spark.csvfile( r.csv ).map(parse).groupbykey() blocks = ratings.glom().cache() sims = blocks.cartesion(blocks).flatmap(cos) final = sims.reducebykey(lambda x,y:x+y) C
DPark
DPark Hadoop
DPark Hadoop
DPark Hadoop
DPark Hadoop C
DPark Hadoop C PyPy
Hadoop
DPark
DPark
DPark
DPark
Thank you Question???