完整的大数据解決方案
ABOUT ME AGENDA 唐建法 / TJ MongoDB 高级方案架构师 MongoDB 中文社区联合发起人 Spark 介绍 Spark 和 MongoDB 案例演示
Dataframe Pig YARN Spark Stand Alone HDFS Spark Stand Alone Mesos Mesos Spark Streaming Hive Hadoop MapReduce Spark Spark SQL
HDFS MongoDB 共同点 横向扩展, 支持 TB-PB 级数据量低成本, x86 数据自动多份复制支持非结构化数据 差异点 粗颗粒度存储 无索引 一次写入, 多次读 细颗粒度, 结构化存储 二级索引 读写混合 非交互式, 离线分钟级 SLA 交互式, 实时在线毫秒级 SLA
{ ts: 2016-07-31 23:50:50, host: xyz, error:404,.body: { } } { ts: 2016-07-31 23:49:23, host: def, error:019,.body: { } } { ts: 2016-07-31 23:49:22, host: xyz, error:null, body: { } }... { ts: 2016-07-01 02:04:12, host: abc, error: 500, body: { } }... HDFS MongoDB
map filter union intersect Parellelize Transform Action Parellelize Transform Action Result Parellelize Transform Action Result Result Result Parellelize Transform Action
STORAGE
STORAGE
https://github.com/mongodb/mongo-spark
50%
DISMISSED! x 1000+ LAX LAX NYC NYC BOS BOS LAX BOS LAX NYC PVG SZX Stay Duration Stay Duration Stay Duration Stay Duration Stay Duration MongoDB MongoDB 365 Date Date Date Date Date Date
B2T IB2C B2M Inv API Fare API Fare Inv Impl DRV DRV TEXT Spark Master Spark Master Spark Submit Call Center Mobile B2C Inv API Fare API Fare Inv Impl DRV DRV... Fare Cache Seat Inventory
START Vars: Flight schedule Base price Price Rules vars Submit Batch Job Load Reference Data Broadcast Variables Split Jobs vars Master Input job Parallel Compute Parallel Compute Input job output Collect Results Collect Results output END
// initialization dependencies including base prices, pricing rules and some reference data Map dependencies = MyDependencyManager.loadDependencies(); // broadcasting dependencies javasparkcontext.broadcast(dependencies); // create job rdd cabinsrdd = MongoSpark.load(javaSparkContext).withPipeline(pipeline) // for each cabin, date, airport pair, calculate the price cabinsrdd.map(function calc_price); // collect the result, which will cause the data to be stored into MongoDB cabinsrdd.collect() cabinsrdd.savetomongo()
3500 350 3000 300 2500 250 2000 200 1500 Throughput 150 Latency 1000 100 500 50 0 Legacy Spark+Mongo 0 Legacy Spark+Mongo
# curl -OL http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz # mkdir -p ~/spark # tar -xvf spark-1.6.0-bin-hadoop2.6.tgz -C ~/spark --strip-components=1
# cd spark #./bin/spark-shell \ --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/flights.av" \ --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/flights.output" \ --packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0 import com.mongodb.spark._ import org.bson.document MongoSpark.load(sc).take(10).foreach(println)
MongoSpark.load(sc).map(doc=>(doc.getString("flight"),doc.getLong("seats"))).reduceByKey((x,y)=>(x+y)).take(10).foreach(println)
import org.bson.document MongoSpark.load(sc).withPipeline(Seq(Document.parse("{ $match: { orig : 'KMG' } }"))).map(doc=>(doc.getstring("flight"),doc.getlong("seats"))).reducebykey((x,y)=>(x+y)).take(10).foreach(println)
chunksize (MB) Total data size / chunksize = chunks = RDD partitions = spark tasks CPUSpark 1-2core spark+mongoio
+ MongoDB Spark SparkHadoop MongoDB
更快 更敏捷 Questions