2017-09-13 69 views
2

我正在使用Spark 2.0.2,使用Kafka 0.11.0和 我試圖在火花流中使用來自kafka的消息。以下是代碼:使用模式轉換ConsumerRecord值到Spark-kafka中的Dataframe

val topics = "notes" 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:7092", 
    "schema.registry.url" -> "http://localhost:7070", 
    "group.id" -> "connect-cluster1", 
    "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer", 
    "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer" 
) 
val topicSet: Set[String] = Set(topics) 
val stream = KafkaUtils.createDirectStream[String, String](
    SparkStream.ssc, 
    PreferConsistent, 
    Subscribe[String, String](topicSet, kafkaParams) 
) 
stream.foreachRDD (rdd => { 
    rdd.foreachPartition(iterator => { 
    while (iterator.hasNext) { 
     val next = iterator.next() 
     println(next.value()) 
    } 
    }) 
}) 

如果卡夫卡消息包含的記錄,輸出會是:

{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"} 
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"} 

因此,接收到的消息是Avro的解碼,從consumerRecord的價值可見。 現在我需要在一個數據幀格式的記錄,但我不知道如何從這裏做起,甚至與架構如下:

val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000) 
val m = sr.getLatestSchemaMetadata(topics + "-value") 
val schemaId = m.getId 
val schemaString = m.getSchema 

val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000) 
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) 
val parser = new Schema.Parser() 
val avroSchema = parser.parse(schemaString) 
println(avroSchema) 

印作爲架構如下:

{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}

任何人都可以幫助我理解如何從消費者記錄的價值中獲取數據框?我已經看過其他問題,例如Use schema to convert AVRO messages with Spark to DataFrame,Handling schema changes in running Spark Streaming application,但他們沒有處理consumerRecord在第一位。

+0

我遇到了類似的情況。你能弄清楚這一點嗎? –

回答

0

我是新來的scala \ kafka \ spark自己,所以我不確定這是否完全回答這個問題,但它會幫助我。我確信有比這更好的方法,所以希望有更多經驗的人可以出席並提供更好的答案。

// KafkaRDD 
stream.foreachRDD { rdd => { 

    // pull the values I'm looking for into a string array 
    var x = rdd.map(row => row.value()).collect() 

    // convert to dataframe 
    val df = spark.createDataFrame(x).toDF("record") 

    // write data frame to datastore (MySQL in my case) 
    df.write 
    .mode(SaveMode.Append) 
    .jdbc(url, table, props) 

    } 
} 
1

您可以使用以下代碼片段: 流是消費記錄的DSTREAM從kafka010的kafkaUtils API返回:

stream.foreachRDD(rdd => 
    if (!rdd.isEmpty()) { 
     val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) 
     import sqlContext.implicits._ 
     val topicValueStrings = rdd.map(record => (record.value()).toString) 
     val df = sqlContext.read.json(topicValueStrings) 
     df.show() 
    }) 
相關問題