2016-12-16 141 views
3

我期待在Spark 1.6上使用DataFrames API構建Spark Streaming應用程序。在我陷入兔洞之前,我希望有人能夠幫助我理解DataFrames如何處理具有不同模式的數據。在運行Spark Streaming應用程序時處理模式更改

這個想法是消息將通過Avro模式流入卡夫卡。我們應該能夠以向後兼容的方式發展架構,而無需重新啓動流應用程序(應用程序邏輯仍然可以工作)。

使用模式註冊表反編譯消息的新版本以及使用KafkaUtils創建直接流和AvroKafkaDecoder(來自Confluent)消息中嵌入的模式標識似乎並不重要。這讓我儘可能擁有DStream。

問題1: 在該DStream中,將會有具有不同版本模式的對象。所以,當我將每一個轉換成一個Row對象時,我應該傳入一個最新的reader模式來正確地遷移數據,並且我需要將最新模式傳遞到sqlContext.createDataFrame(rowRdd,schema)調用中。 DStream中的對象是GenericData.Record類型,並且據我所知,沒有簡單的方法可以確定哪個是最新版本。我看到兩種可能的解決方案,一種是調用模式註冊表以獲取每個微博上最新版本的模式。另一種是修改解碼器以附加模式標識。然後我可以迭代rdd來查找最高的id並從本地緩存中獲取模式。

我希望有人已經以可重用的方式很好地解決了這個問題。

問題#2: Spark將爲每個分區提供一個與Kafka不同的執行程序。當一個執行者接收到與其他「最新」模式不同的應用程序時,我的應用程序會發生什麼。由一個執行器創建的DataFrame在同一個時間窗口中將具有與另一個不同的模式。我實際上並不知道這是否是一個真正的問題。我在查看數據流時遇到問題,以及哪種操作會導致問題。如果這是一個問題,那就意味着執行者之間需要共享一些數據,這聽起來既複雜又低效。

我需要擔心這個嗎?如果我這樣做,如何解決模式差異?

感謝, --Ben

回答

2

我相信我已經解決了這一點。我正在使用Confluent的模式註冊表和KafkaAvroDecoder。簡化代碼如下所示:

// Get the latest schema here. This schema will be used inside the 
// closure below to ensure that all executors are using the same 
// version for this time slice. 
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) 
val m = sr.getLatestSchemaMetadata(subject) 
val schemaId = m.getId 
val schemaString = m.getSchema 

val outRdd = rdd.mapPartitions(partitions => { 
    // Note: we cannot use the schema registry from above because this code 
    // will execute on remote machines, requiring the schema registry to be 
    // serialized. We could use a pool of these. 
    val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) 
    val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) 
    val parser = new Schema.Parser() 
    val avroSchema = parser.parse(schemaString) 
    val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema) 

    partitions.map(input => { 
    // Decode the message using the latest version of the schema. 
    // This will apply Avro's standard schema evolution rules 
    // (for compatible schemas) to migrate the message to the 
    // latest version of the schema. 
    val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record] 
    // Convert record into a DataFrame with columns according to the schema 
    avroRecordConverter(record).asInstanceOf[Row] 
    }) 
}) 

// Get a Spark StructType representation of the schema to apply 
// to the DataFrame. 
val sparkSchema = AvroSchemaConverter.toSqlType(
     new Schema.Parser().parse(schemaString) 
    ).dataType.asInstanceOf[StructType] 
sqlContext.createDataFrame(outRdd, sparkSchema) 
相關問題