2016-08-20 93 views
12

有沒有辦法使用模式將消息從轉換爲?用戶記錄的模式文件:從SqlNetworkWordCount exampleKafka, Spark and Avro - Part 3, Producing and consuming Avro messages使用模式將AVRO消息與Spark轉換爲DataFrame

{ 
    "fields": [ 
    { "name": "firstName", "type": "string" }, 
    { "name": "lastName", "type": "string" } 
    ], 
    "name": "user", 
    "type": "record" 
} 

和代碼片段在短信讀取。

object Injection { 
    val parser = new Schema.Parser() 
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json")) 
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema) 
} 

... 

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => { 
    val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
    import sqlContext.implicits._ 

    val df = rdd.map(message => Injection.injection.invert(message._2).get) 
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF() 

    df.show() 
}) 

case class User(firstName: String, lastName: String) 

不知何故,我找不到另一種方式比使用案例類將AVRO消息轉換爲DataFrame。是否有可能使用該模式?我正在使用Spark 1.6.2Kafka 0.10

完整的代碼,以防您感興趣。

import com.twitter.bijection.Injection 
import com.twitter.bijection.avro.GenericAvroCodecs 
import kafka.serializer.{DefaultDecoder, StringDecoder} 
import org.apache.avro.Schema 
import org.apache.avro.generic.GenericRecord 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.streaming.{Seconds, StreamingContext, Time} 
import org.apache.spark.{SparkConf, SparkContext} 

object ReadMessagesFromKafka { 
    object Injection { 
    val parser = new Schema.Parser() 
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json")) 
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema) 
    } 

    def main(args: Array[String]) { 
    val brokers = "127.0.0.1:9092" 
    val topics = "test" 

    // Create context with 2 second batch interval 
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]") 
    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    // Create direct kafka stream with brokers and topics 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
    ssc, kafkaParams, topicsSet) 

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => { 
     val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
     import sqlContext.implicits._ 

     val df = rdd.map(message => Injection.injection.invert(message._2).get) 
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF() 

     df.show() 
    }) 

    // Start the computation 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

/** Case class for converting RDD to DataFrame */ 
case class User(firstName: String, lastName: String) 

/** Lazily instantiated singleton instance of SQLContext */ 
object SQLContextSingleton { 
    @transient private var instance: SQLContext = _ 

    def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
     instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
} 

回答

2

OP可能解決這個問題,但以備將來參考我相當普遍地解決了這個問題,因此認爲在這裏發佈可能會有所幫助。

所以,一般來說,你應該在Avro的模式轉換到火花StructType並轉換你的對象在你的RDD到行[任何],然後用:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType> 

爲了將Avro的模式轉換我用spark-avro像這樣:

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] 

的RDD的皈依更棘手的..如果你的模式是簡單的,你可以很可能只是做一個簡單的地圖..這樣的事情:

rdd.map(obj=>{ 
    val seq = (obj.getName(),obj.getAge() 
    Row.fromSeq(seq)) 
    }) 

在這個例子中,對象有2個字段的名稱和年齡。

重要的是確保行中的元素與之前的StructType中的字段的順序和類型相匹配。

在我的情況下,我有一個複雜得多的對象,我希望一般處理它以支持未來的模式更改,所以我的代碼更加複雜。

由OP建議的方法也應該對一些casese工作,但將難以意味着對複雜對象(不是原始的或案件級)

另一個技巧是,如果一個類中有一個類你應該這個類轉換成一排,這樣的包裝類將被轉換爲類似:

Row(Any,Any,Any,Row,...) 

,你也可以看看我前面就如何對象轉換爲行提到的火花Avro的項目。我使用了一些我自己的邏輯

如果有人閱讀本文需要進一步幫助,請在評論中詢問我,我將盡力幫助

3

請看一看這個 https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala

所以不是

val df = rdd.map(message => Injection.injection.invert(message._2).get) 
.map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF() 

你可以試試這個

val df = spark.read.avro(message._2.get) 
+0

'spark-avro 2.0.1'需要一個路徑作爲輸入並且不能處理Array [Byte]。因此'spark.read.avro(message._2)'會引發類型不匹配。 –

+1

如何複製多條消息並將其寫入/ tmp/目錄並從中讀取?如果您使用的是Spark 2.0,這將工作: spark.read.format(「com.databricks.spark.avro」)。schema(DataType.fromJson(「path/to/schema.json」)。asInstanceOf [StructType] ).load(「/ tmp/」).show() –

1

對於任何有興趣處理此問題的人,可以在不需要停止和重新部署Spark應用程序的情況下處理模式更改(假設您應用程序邏輯可以處理這個)看到這個question/answer

相關問題