My Spark Streaming應用程序將數據存儲在MongoDB中。MongoDB和Spark中的連接太多
不幸的是每個星火工人打開太多的連接,同時將其存儲在MongoDB中
以下是我的代碼星火 - 蒙戈DB代碼:
public static void main(String[] args) {
int numThreads = Integer.parseInt(args[3]);
String mongodbOutputURL = args[4];
String masterURL = args[5];
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
// Create a Spark configuration object to establish connection between the application and spark cluster
SparkConf sparkConf = new SparkConf().setAppName("AppName").setMaster(masterURL);
// Configure the Spark microbatch with interval time
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(60*1000));
Configuration config = new Configuration();
config.set("mongo.output.uri", "mongodb://host:port/database.collection");
// Set the topics that should be consumed from Kafka cluster
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
// Establish the connection between kafka and Spark
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaPairDStream<Object, BSONObject> save = lines.mapToPair(new PairFunction<String, Object, BSONObject>() {
@Override
public Tuple2<Object, BSONObject> call(String input) {
BSONObject bson = new BasicBSONObject();
bson.put("field1", input.split(",")[0]);
bson.put("field2", input.split(",")[1]);
return new Tuple2<>(null, bson);
}
});
// Store the records in database
save.saveAsNewAPIHadoopFiles("prefix","suffix" ,Object.class, Object.class, MongoOutputFormat.class, config);
jssc.start();
jssc.awaitTermination();
}
如何控制沒有連接在每個工人?
我是否缺少任何配置參數?
更新1:
我使用的Spark 1.3和Java API。
我無法執行coalesce()
但我能夠做到repartition(2)
操作。
現在沒有連接得到控制。
但我認爲連接沒有被關閉或不在工作人員重複使用。
請找到下面的截圖:
看起來你正在爲每個分區創建1個MongoDB連接,'save' DStream是否有1000個分區?也許嘗試在'saveAsNewAPIHadoopFiles'之前拋出一個'.coalesce(20)',看看是否可以緩解這個問題。 –
@Ewan感謝您的回覆。請找到有問題的更新1。 –