2015-12-12 26 views
0

我正在嘗試寫入來自spark的redis。但是我得到一個編譯時錯誤,說「類BinaryJedis中的變量管道無法在redis.clients.jedis.Jedis中訪問」。我的代碼如下(部分顯示):Redis on spark:無法訪問類BinaryJedis中的變量管道

import org.sedis._ 
    import redis.clients.jedis._ 
    ... 
    val myRDD = KafkaUtils.createStream(ssc, zkQuorum, group, topic).map(_._2).window(Seconds(300), Seconds(10)) 
    myRDD.foreachRDD(rdd => {rdd.foreachPartition(it =>{ 
     val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000)) 
     pool.withJedisClient { client => 
     val pipeline = client.pipeline() 
     it.foreach { 
      case (a,b,c) => pipeline.hmset(a,Map("b" -> b, "c" -> c)) 
     } 
     } 

    })}) 

我得到的錯誤如下:

error: variable pipeline in class BinaryJedis cannot be accessed in redis.clients.jedis.Jedis 
    Access to protected variable pipeline not permitted because enclosing object MainExample in package examples is not a subclass of class BinaryJedis in package jedis where target is defined 
    val pipeline = client.pipeline() 

...我已經尋找一個解決方案,但沒能找到。 有人可以幫我嗎? 在此先感謝。

+0

上述問題已解決。見下文 – Asibe

回答

0

上面的問題可以通過刪除分區並用寫入redis的每個數據元組的jedis新實例替換Jedis池來解決,如下所示。這對我有效。

import org.sedis._ 
import redis.clients.jedis._ 
... 
val myRDD = KafkaUtils.createStream(ssc, zkQuorum, group, topic) 
         .map(_._2) 
         .window(Seconds(300), Seconds(10)) 

myRDD.foreachRDD(rdd => { 
     rdd.foreach(case(a, b, c) =>{ 
      val jedis = new Jedis("localhost", 6379) 
      val pipeline = jedis.pipelined 
      pipeline.hmset(a,Map("b" -> b, "c" -> c)) 
    }) 
})