0
在Spark結構化Streaming中,我想從STRING創建一個StructType。在Spark Streaming中從字符串創建StructType
在下面的例子中,spark讀取方法只接受Schema的「Struct Type」,我怎樣才能從String創建一個StructType。我想將employeeSchema字符串轉換爲StructType。
public static void main(String[] args) throws AnalysisException {
String master = "local[*]";
SparkSession sparkSession = SparkSession
.builder().appName(EmployeeSchemaLoader.class.getName())
.master(master).getOrCreate();
String employeeSchema = "StructType(\n" +
"StructField(firstName,StringType,true),\n" +
"StructField(lastName,StringType,true),\n" +
"StructField(addresses,\n" +
"ArrayType(\n" +
"StructType(\n" +
"StructField(city,StringType,true), \n" +
"StructField(state,StringType,true)\n" +
"),\n" +
"true),\n" +
"true) \n" +
")";
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> employeeDataset = sparkSession.read()
//.schema(employeeSchema) // Accepts only Struct Type
.json("simple_employees.json");
employeeDataset.printSchema();
employeeDataset.createOrReplaceTempView("employeeView");
sparkSession.catalog().listTables().show();
sqlCtx.sql("select * from employeeView").show();
如果是字符串,那麼它可以在程序啓動時讀取。 – Manjesh
如果你想這樣做,然後創建一個列名爲Array [String]。然後,循環訪問數組並將其轉換爲StructField,最後將其包裝在StructType中。這應該夠了吧! –
我已經在Java中爲StructType程序編寫了一個自定義的JSON ...將在一段時間內將它放到Git中... – Manjesh