我在databricks的網站上發現了這個博客。它展示瞭如何利用Spark SQL的API來消費和轉換來自Apache Kafka的複雜數據流。
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
有解釋UDF如何被用來解串器行的部分:
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
我用java,所以只好寫下面的示例UDF,以檢查它怎麼能在Java中被稱爲:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
現在我可以使用這個UDF在結構化流字數舉例如下:
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select(callUDF("mode", col("value")))
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
對我來說,下一步是爲節儉反序列化寫一個UDF。我會盡快發佈。