0
我正在使用Spark流來連續讀取來自kafka的數據並執行一些統計。我每秒都在流。如何查找每個DStream中RDD中所有值的總和?
所以我有一秒批(dstreams)。此dstream內的每個RDD都包含一個JSON。
這是我有我的DSTREAM:
kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'})
raw = kafkaStream.map(lambda kafkaS: kafkaS[1])
clean = raw.map(lambda xs:json.loads(xs))
一個RDDS在我乾淨 DSTREAM看起來是這樣的:
{u'epochseconds': 1458841451, u'protocol': 6, u'source_ip': u'192.168.1.124', \
u'destination_ip': u'149.154.167.120', u'datetime': u'2016-03-24 17:44:11', \
u'length': 1589, u'partitionkey': u'partitionkey', u'packetcount': 10,\
u'source_port': 43375, u'destination_port': 443}
而且我喜歡在30-150這樣的RDDS每個DStream。
現在,我想要做的是,獲得'長度'的總和或在每個DStream中說'packetcounts'。也就是說,
rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length
我試了一下:
add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b)
我得到了什麼:
頻率,而不是總和。
(17, 6)
(6, 24)
我該怎麼做纔能有總和而不是密鑰的頻率?
作品,謝謝!只是一個額外的問題,我想從clean添加2個參數到add,比如說('partitionkey','timestamp'),以及剛剛計算的'length'參數。我怎麼做? – HackCode