我最初的RDD如下所示RDD[(String, List[(String,String)])]
:獲取列表[Serializable接口]而不是列表[(字符串,字符串)
(600,List((22,33),(55,88)))
(700,List((12,13),(15,18),(18,88)))
我想從Redis的緩存數據庫獲得額外的數據每個條目追加。爲此,我使用Sedis
,它是Scala的Jedis
的包裝。這是我的代碼片段:
import org.sedis._
import redis.clients.jedis._
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
val appended = filtered.map({line => (line._1,
redisPool.withJedisClient { client =>
val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
line._2.union(additionalData)
})
})
的問題是,appended
是格式RDD[(String, List[Serializable]
而不是RDD[(String, List[(String,String)])]
的。我究竟做錯了什麼? 另外,我在哪裏使用redisPool
內部map
足夠高效或有沒有其他更好的選擇?
什麼是'line._2's類型? –
你說你的初始RDD的類型是'RDD [(String,List [(String,String)])]',但是你提供的示例數據不匹配 - '(600,List(22,33), List(55,88))'是類型'(String,List [(String,String)],List [(String,String)])'...並且第二條記錄再次不同。由編譯器爲您提供的_input_推斷的類型是'RDD [(帶有可序列化的產品)]' –
@TzachZohar:對不起,我的錯。它確實是'List((22,33),(55,88))' – HackerDuck