2017-02-24 84 views
0

我正在使用Kafka和Spark結構化流式處理。我收到以下格式的卡巴消息。在Spark結構化流中處理二進制數據

{"deviceId":"001","sNo":1,"data":"aaaaa"} 
{"deviceId":"002","sNo":1,"data":"bbbbb"} 
{"deviceId":"001","sNo":2,"data":"ccccc"} 
{"deviceId":"002","sNo":2,"data":"ddddd"} 

我讀它像下面。

Dataset<String> data = spark 
     .readStream() 
     .format("kafka") 
     .option("kafka.bootstrap.servers", bootstrapServers) 
     .option(subscribeType, topics) 
     .load() 
     .selectExpr("CAST(value AS STRING)") 
     .as(Encoders.STRING()); 
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
     processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes()) 
);} 

private void processData(String deviceId,int SNo, byte[] data) 
{ 
    //How to check previous processed Dataset??? 
} 

以我JSON消息 「數據」 是字節[]的字符串形式。我有一個要求,我需要按照「sNo」的順序處理給定「deviceId」的二進制「數據」。因此,對於「deviceId」=「001」,我必須處理「sNo」= 1,然後「sNo」= 2等二進制數據。如何在結構化流式傳輸中檢查之前處理過的數據集的狀態?任何樣本或鏈接都會有很大的幫助。我是Spark的新手,請和我一起裸照。謝謝。

+0

你到目前爲止嘗試了什麼? – Jan

+0

我已更新我的代碼。請檢查。我在做orderBy然後forEach來處理數據。我被卡在processData方法中,如何處理來自流式傳輸接收的數據集的當前和以前的數據。 – user7615505

回答