2015-11-19 24 views
0

我正在使用spark 1.5.0和java 7.如何將JavaPairDStream寫入Redis?

輸入來自kafka,其形式爲不同的json對象,其中包含type字段。例如:

{'type': 'alpha', ...} 
{'type': 'beta', ...} 
... 

我創建從對應於每個事件類型的計數這個輸入數據JavaPairDStream<String, Integer>

我想將此數據存儲到redis。我怎麼能這樣做呢?

回答

2

使用的foreachRDDforEach功能來實現這一如下:

wordCounts.foreachRDD(
    new Function<JavaPairRDD<String, Integer>, Void>() { 
     public Void call(JavaPairRDD<String, Integer> rdd) { 
      rdd.foreach(
       new VoidFunction<Tuple2<String,Integer>>() { 
        public void call(Tuple2<String,Integer> wordCount) { 
         System.out.println(wordCount._1() + ":" + wordCount._2()); 
         JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost"); 
         Jedis jedis = pool.getResource(); 
         jedis.select(0); 
         jedis.set(wordCount._1(), wordCount._2().toString()); 
        } 
       } 
      ); 
      return null; 
     } 
    } 
); 
+0

我希望有人會之前你指出這一點,但我覺得有序集合對你的情況不是簡單的鍵(您正在使用使用更有益'SET' )。 – Niloct

0

創建一個新的連接池,每一個RDD是非常低效的。我建議創建的每個分區一個連接:

wordCount.mapPartitions(p->{ 
Jedis jd = new Jedis(getJedisConfig()); 
while (p->hasNext()) { 
    Tuple2<String,Integer> data = p.next(); 
    String word = data._1(); 
    Integer cnt = data._2(); 
    jd.set(word,count); // or any other format of save to Redis 
} 
} 
)