2016-10-04 78 views
2

我需要創建一個火花流從幾個主題讀取,以及每個主題使用不同的解碼器(每個主題包含不同的Avro編碼obect):火花流每卡夫卡主題不同值解碼器

def decode_avro(message): 
    schem = avro.schema.parse(open("error_list.avsc").read()) 
    bytes_reader = io.BytesIO(message) 
    decoder = avro.io.BinaryDecoder(bytes_reader) 
    reader = avro.io.DatumReader(schem) 
    return reader.read(decoder) 

ssc = StreamingContext(sc, 2) 
kvs = KafkaUtils.createDirectStream(ssc, [topic, topic2], { 
    "metadata.broker.list": brokers}, valueDecoder = decode_avro) 

我不知道是否有可能爲每個主題指定不同的解碼器回調,或者是否有可能知道解碼器功能的主題名稱(這樣我就可以使用avro模式文件的主題名稱和解碼所有信息在同一功能)

謝謝

+0

我面臨同樣的挑戰。我看到這個問題已經超過1歲。你怎麼繞過這個障礙? –

+0

我們終於沒有使用這個aproach(即使沒有使用卡夫卡的時刻)。關於一個try/catch系統,如果引發一個異常,它會跳轉到下面的解碼器。是一個醜陋的解決方案,但我沒有找到更好的解決方案! – dhalfageme

+0

好的感謝您的更新。我找到了一個合適的解決方案,因此我將在此添加它作爲答案。 –

回答

1

我們也有這種情況,我們從不同的主題中讀取不同的消息格式,然後處理每個主題並將輸出存儲到每個源主題的專用存儲中。 去這裏的正確方法是創建多個流。在同一個應用程序中使用相同的Spark上下文對每個主題進行流處理。 每個流都會得到相關的ValueDecoder,如果它們共享相同的格式,您仍然可以從多個主題中讀取。

+1

謝謝你的迴應。我會投票,當我有一段時間我會測試它。 – dhalfageme