2015-09-02 38 views
1

更新 原來,這有什麼用方式做Databricks星火CSV讀者創建數據幀。在下面的示例中,這不起作用,我使用Databricks CSV閱讀器讀取人員和地址CSV,然後以Parquet格式將生成的DataFrame寫入HDFS。簡單連接兩個星火據幀與失敗的「org.apache.spark.sql.AnalysisException:無法解析列名」

我改變了代碼來創建數據框:

JavaRDD<Address> address = context.textFile("/Users/sfelsheim/data/address.csv").map(
      new Function<String, Address>() { 
       public Address call(String line) throws Exception { 
        String[] parts = line.split(","); 

        Address addr = new Address(); 
        addr.setAddrId(parts[0]); 
        addr.setCity(parts[1]); 
        addr.setState(parts[2]); 
        addr.setZip(parts[3]); 

        return addr; 
       } 
      }); 

(用於people.csv相似),然後寫在木地板格式,所產生的數據幀到HDFS和連接將按預期

我在這兩種情況下讀取完全相同的CSV。


運行到一個問題,試圖執行從兩個不同的地板上的文件HDFS創造了兩個DataFrames的簡單連接。


[主] INFO org.apache.spark.SparkContext - 運行星火版本1.4.1

使用Hadoop的HDFS 2.7.0


這裏是一個樣本來說明。

public void testStrangeness(String[] args) { 
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("joinIssue"); 
    JavaSparkContext context = new JavaSparkContext(conf); 
    SQLContext sqlContext = new SQLContext(context); 

    DataFrame people = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/people.parquet"); 
    DataFrame address = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/address.parquet"); 

    people.printSchema(); 
    address.printSchema(); 

    // yeah, works 
    DataFrame cartJoin = address.join(people); 
    cartJoin.printSchema(); 

    // boo, fails 
    DataFrame joined = address.join(people, 
      address.col("addrid").equalTo(people.col("addressid"))); 

    joined.printSchema(); 
} 

內容的人

first,last,addressid 
your,mom,1 
fred,flintstone,2 

地址的內容

addrid,city,state,zip 
1,sometown,wi,4444 
2,bedrock,il,1111 

people.printSchema(); 

結果...

root 
|-- first: string (nullable = true) 
|-- last: string (nullable = true) 
|-- addressid: integer (nullable = true) 

address.printSchema(); 

結果...

root 
|-- addrid: integer (nullable = true) 
|-- city: string (nullable = true) 
|-- state: string (nullable = true) 
|-- zip: integer (nullable = true) 


DataFrame cartJoin = address.join(people); 
cartJoin.printSchema(); 

笛卡爾參加作品...

root 
|-- addrid: integer (nullable = true) 
|-- city: string (nullable = true) 
|-- state: string (nullable = true) 
|-- zip: integer (nullable = true) 
|-- first: string (nullable = true) 
|-- last: string (nullable = true) 
|-- addressid: integer (nullable = true) 

精,printSchema()結果此連接。 ..

DataFrame joined = address.join(people, 
address.col("addrid").equalTo(people.col("addressid"))); 

產生以下異常。

Exception in thread "main" org.apache.spark.sql.AnalysisException: **Cannot resolve column name "addrid" among (addrid, city, state, zip);** 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159) 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158) 
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:558) 
    at dw.dataflow.DataflowParser.testStrangeness(DataflowParser.java:36) 
    at dw.dataflow.DataflowParser.main(DataflowParser.java:119) 

我試圖改變它使人們和地址都有一個共同的關鍵屬性(addressid)和使用..

address.join(people, "addressid"); 

但得到了同樣的結果。

任何想法??

感謝

回答

2

原來,問題是CSV文件是與BOM UTF-8格式。 DataBricks CSV實現不會使用BOM處理UTF-8。將文件轉換爲UTF-8 而沒有 BOM並且一切正常。

+0

你能解釋一下這種情況下的BOM嗎? – dmux

+0

BOM是字節順序標記http://stackoverflow.com/questions/2223882/whats-different-between-utf-8-and-utf-8-without-bom –

0

通過使用記事本++能夠解決這個問題。在「編碼」菜單下,我將它從「編碼爲UTF-8 BOM」切換爲「編碼爲UTF-8」。

相關問題