我需要創建一個火花流從幾個主題讀取,以及每個主題使用不同的解碼器(每個主題包含不同的Avro編碼obect):火花流每卡夫卡主題不同值解碼器
def decode_avro(message):
schem = avro.schema.parse(open("error_list.avsc").read())
bytes_reader = io.BytesIO(message)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schem)
return reader.read(decoder)
ssc = StreamingContext(sc, 2)
kvs = KafkaUtils.createDirectStream(ssc, [topic, topic2], {
"metadata.broker.list": brokers}, valueDecoder = decode_avro)
我不知道是否有可能爲每個主題指定不同的解碼器回調,或者是否有可能知道解碼器功能的主題名稱(這樣我就可以使用avro模式文件的主題名稱和解碼所有信息在同一功能)
謝謝
我面臨同樣的挑戰。我看到這個問題已經超過1歲。你怎麼繞過這個障礙? –
我們終於沒有使用這個aproach(即使沒有使用卡夫卡的時刻)。關於一個try/catch系統,如果引發一個異常,它會跳轉到下面的解碼器。是一個醜陋的解決方案,但我沒有找到更好的解決方案! – dhalfageme
好的感謝您的更新。我找到了一個合適的解決方案,因此我將在此添加它作爲答案。 –