2015-12-23 46 views
3

My Spark Streaming應用程序將數據存儲在MongoDB中。MongoDB和Spark中的連接太多

不幸的是每個星火工人打開太多的連接,同時將其存儲在MongoDB中

enter image description here

以下是我的代碼星火 - 蒙戈DB代碼:

public static void main(String[] args) { 

    int numThreads = Integer.parseInt(args[3]); 
    String mongodbOutputURL = args[4]; 
    String masterURL = args[5]; 

    Logger.getLogger("org").setLevel(Level.OFF); 
    Logger.getLogger("akka").setLevel(Level.OFF); 

// Create a Spark configuration object to establish connection between the application and spark cluster 
    SparkConf sparkConf = new SparkConf().setAppName("AppName").setMaster(masterURL); 

    // Configure the Spark microbatch with interval time 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(60*1000)); 

    Configuration config = new Configuration(); 
    config.set("mongo.output.uri", "mongodb://host:port/database.collection"); 

// Set the topics that should be consumed from Kafka cluster 
    Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
    String[] topics = args[2].split(","); 
    for (String topic: topics) { 
     topicMap.put(topic, numThreads); 
    } 

// Establish the connection between kafka and Spark 
    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, args[0], args[1], topicMap); 

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
     @Override 
     public String call(Tuple2<String, String> tuple2) { 
     return tuple2._2(); 
     } 
    }); 

    JavaPairDStream<Object, BSONObject> save = lines.mapToPair(new PairFunction<String, Object, BSONObject>() { 
     @Override 
     public Tuple2<Object, BSONObject> call(String input) { 
      BSONObject bson = new BasicBSONObject(); 
      bson.put("field1", input.split(",")[0]); 
      bson.put("field2", input.split(",")[1]); 
      return new Tuple2<>(null, bson); 
     } 
    }); 
    // Store the records in database  
    save.saveAsNewAPIHadoopFiles("prefix","suffix" ,Object.class, Object.class, MongoOutputFormat.class, config); 

    jssc.start(); 
    jssc.awaitTermination(); 
    } 

如何控制沒有連接在每個工人?

我是否缺少任何配置參數?

更新1:

我使用的Spark 1.3和Java API。

我無法執行coalesce()但我能夠做到repartition(2)操作。

現在沒有連接得到控制。

但我認爲連接沒有被關閉或不在工作人員重複使用。

請找到下面的截圖:

流間隔1分鐘和2個分區 enter image description here

+1

看起來你正在爲每個分區創建1個MongoDB連接,'save' DStream是否有1000個分區?也許嘗試在'saveAsNewAPIHadoopFiles'之前拋出一個'.coalesce(20)',看看是否可以緩解這個問題。 –

+0

@Ewan感謝您的回覆。請找到有問題的更新1。 –

回答

0

我能夠通過使用foreachRDD來解決問題。

我正在建立連接並在每個DStream後關閉它。

myRDD.foreachRDD(new Function<JavaRDD<String>, Void>() { 
      @Override 
      public Void call(JavaRDD<String> rdd) throws Exception { 
       rdd.foreachPartition(new VoidFunction<Iterator<String>>() { 
        @Override 
        public void call(Iterator<String> record) throws Exception { 
       MongoClient mongo = new MongoClient(server:port); 
       DB db = mongo.getDB(database); 
       DBCollection targetTable = db.getCollection(collection); 
       BasicDBObject doc = new BasicDBObject(); 
       while (record.hasNext()) { 
        String currentRecord = record.next(); 
        String[] delim_records = currentRecord.split(","); 
        doc.append("column1", insert_time); 
        doc.append("column2", delim_records[1]); 
        doc.append("column3",delim_records[0]); 
        targetTable.insert(doc); 
        doc.clear();       
       } 
       mongo.close();     
       } 
       }); 
       return null; 
      } 
     }); 
0

你可以試試地圖分區,它在分區級別而不是創紀錄水平的作品,即任務的一個執行節點將共享一個數據庫連接而不是每個記錄。

另外我想你可以使用預分區(而不是流RDD)。 Spark非常聰明,可以利用它來減少隨機播放。