mongodb - spark集群中每个节点都有一个独立数据库,可以实现分布式统计计算吗?

浏览:46日期:2023-07-02

问题描述

我将spark搭建在两台机器上,其中一台即是master又是slave,另一台是slave,两台机器上均装有独立的mongodb数据库。我的主程序让它们统计自身数据库的内容,然后将结果汇总到一台服务器上的数据库里。目前代码是在master节点上提交的。但是我spark-submit之后,好像只统计master节点上的mongodb里的数据了,另一个worker节点没有统计上。请问这是什么原因?代码如下:

val conf = new SparkConf().setAppName('Scala Word Count')val sc = new SparkContext(conf)val config = new Configuration()//以下代码表示只统计本机数据库上的数据,猜测问题可能出在这里config.set('mongo.input.uri', 'mongodb://127.0.0.1:27017/local.test')//统计结果输出到服务器上config.set('mongo.output.uri', 'mongodb://103.25.23.80:60013/test_hao.result')val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])// Input contains tuples of (ObjectId, BSONObject)val countsRDD = mongoRDD.flatMap(arg => { var str = arg._2.get('type').toString str = str.toLowerCase().replaceAll('[.,!?n]', ' ') str.split(' ')}).map(word => (word, 1)).reduceByKey((a, b) => a + b)// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if nullval saveRDD = countsRDD.map((tuple) => { var bson = new BasicBSONObject() bson.put('word', tuple._1) bson.put('count', tuple._2.toString() ) (null, bson)})// Only MongoOutputFormat and config are relevantsaveRDD.saveAsNewAPIHadoopFile('file:///bogus', classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)

问题解答

回答1:

自问自答。原因可能是这样:

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

这行代码表示这是由driver读取数据库,然后将符合条件的数据载入RDD,由于之前设置了是将127.0.0.1作为输入,也就是从driver的mongodb上读取数据。由于driver就在master上,所以读取的数据也自然就是master上的数据了。

相关文章: