0
下我的數據幀模式我如何訪問嵌套字段數據幀.proto,ScalaPB
root
|-- name: string (nullable = true)
|-- addresses: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- street: string (nullable = true)
| | |-- city: string (nullable = true)
我要輸出名稱和城市。以下是我的火花流應用程序,它輸出名稱和地址,但我想輸出中的名稱和城市。 感謝您的幫助。謝謝。
object PersonConsumer {
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","person").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses")
ds2.printSchema()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}