更新 原來,這有什麼用方式做Databricks星火CSV讀者創建數據幀。在下面的示例中,這不起作用,我使用Databricks CSV閱讀器讀取人員和地址CSV,然後以Parquet格式將生成的DataFrame寫入HDFS。簡單連接兩個星火據幀與失敗的「org.apache.spark.sql.AnalysisException:無法解析列名」
我改變了代碼來創建數據框:
JavaRDD<Address> address = context.textFile("/Users/sfelsheim/data/address.csv").map(
new Function<String, Address>() {
public Address call(String line) throws Exception {
String[] parts = line.split(",");
Address addr = new Address();
addr.setAddrId(parts[0]);
addr.setCity(parts[1]);
addr.setState(parts[2]);
addr.setZip(parts[3]);
return addr;
}
});
(用於people.csv相似),然後寫在木地板格式,所產生的數據幀到HDFS和連接將按預期
我在這兩種情況下讀取完全相同的CSV。
運行到一個問題,試圖執行從兩個不同的地板上的文件HDFS創造了兩個DataFrames的簡單連接。
[主] INFO org.apache.spark.SparkContext - 運行星火版本1.4.1
從使用Hadoop的HDFS 2.7.0
這裏是一個樣本來說明。
public void testStrangeness(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("joinIssue");
JavaSparkContext context = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(context);
DataFrame people = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/people.parquet");
DataFrame address = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/address.parquet");
people.printSchema();
address.printSchema();
// yeah, works
DataFrame cartJoin = address.join(people);
cartJoin.printSchema();
// boo, fails
DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));
joined.printSchema();
}
內容的人
first,last,addressid
your,mom,1
fred,flintstone,2
地址的內容
addrid,city,state,zip
1,sometown,wi,4444
2,bedrock,il,1111
people.printSchema();
個
結果...
root
|-- first: string (nullable = true)
|-- last: string (nullable = true)
|-- addressid: integer (nullable = true)
address.printSchema();
結果...
root
|-- addrid: integer (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- zip: integer (nullable = true)
DataFrame cartJoin = address.join(people);
cartJoin.printSchema();
笛卡爾參加作品...
root
|-- addrid: integer (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- zip: integer (nullable = true)
|-- first: string (nullable = true)
|-- last: string (nullable = true)
|-- addressid: integer (nullable = true)
精,printSchema()結果此連接。 ..
DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));
產生以下異常。
Exception in thread "main" org.apache.spark.sql.AnalysisException: **Cannot resolve column name "addrid" among (addrid, city, state, zip);**
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:558)
at dw.dataflow.DataflowParser.testStrangeness(DataflowParser.java:36)
at dw.dataflow.DataflowParser.main(DataflowParser.java:119)
我試圖改變它使人們和地址都有一個共同的關鍵屬性(addressid)和使用..
address.join(people, "addressid");
但得到了同樣的結果。
任何想法??
感謝
你能解釋一下這種情況下的BOM嗎? – dmux
BOM是字節順序標記http://stackoverflow.com/questions/2223882/whats-different-between-utf-8-and-utf-8-without-bom –