2017-05-24 84 views
1

我有一個由7-8個字段組成的數據集,這些字段的類型是String,Int & Float。Spark - 使用不同數據類型以編程方式創建模式

我試圖用它來創建通過編程的方式架構:

val schema = StructType(header.split(",").map(column => StructField(column, StringType, true))) 

然後通過映射它排樣型:

val dataRdd = datafile.filter(x => x!=header).map(x => x.split(",")).map(col => Row(col(0).trim, col(1).toInt, col(2).toFloat, col(3), col(4) ,col(5), col(6), col(7), col(8))) 

但是,創造數據幀後,當我使用DF.show( )它給整數字段錯誤。

那麼如何創建這樣的模式,我們在數據集中

回答

1

有多個數據類型,你在你的代碼的問題是,你正在分配的所有領域StringType。

假設在標題中只有字段的名稱,那麼你不能猜測類型。

讓我們假設頭部字符串是這樣

val header = "field1:Int,field2:Double,field3:String" 

然後代碼應該是

def inferType(field: String) = field.split(":")(1) match { 
    case "Int" => IntegerType 
    case "Double" => DoubleType 
    case "String" => StringType 
    case _ => StringType 
} 

val schema = StructType(header.split(",").map(column => StructField(column, inferType(column), true))) 

你所得到

root 
|-- field1:Int: integer (nullable = true) 
|-- field2:Double: double (nullable = true) 
|-- field3:String: string (nullable = true) 

在另一方面頭部字符串例子。如果你需要它是一個來自文本的數據框架,我建議你直接從文件本身創建DataFrame。從RDD創建它毫無意義。

val fileReader = spark.read.format("com.databricks.spark.csv") 
    .option("mode", "DROPMALFORMED") 
    .option("header", "true") 
    .option("inferschema", "true") 
    .option("delimiter", ",") 

val df = fileReader.load(PATH_TO_FILE) 
+0

但是標題字符串不是這樣,數據就像 'dfs8768768,65,76.34,234,dfgdg,34.65 dfs8768768,65,76.34,234,dfgdg,34.65' – AJm

+0

然後就不可能從標題中知道數據的類型,因爲它沒有提供。 – elghoto

+0

這是標題的確切數據: '拍賣,競價,bidtime,投標人,bidderrate,openbid,價格,項目,daystolive 8213034715,15,12.373,baman,3,12,20,book1,5 8213034725, 65,21.33,thmpu,2,64,75,watch1,9 8213034735,85,23.3,lovekush,4,45,90,remote1,10 8213034745,115,44.44,jaipanee,3,111,130,s3phone,4' – AJm

1

定義結構類型第一:

val schema1 = StructType(Array(
    StructField("AcutionId", StringType, true), 
    StructField("Bid", IntegerType, false), 
    StructField("BidTime", FloatType, false), 
    StructField("Bidder", StringType, true), 
    StructField("BidderRate", FloatType, false), 
    StructField("OpenBid", FloatType, false), 
    StructField("Price", FloatType, false), 
    StructField("Item", StringType, true), 
    StructField("DaystoLive", IntegerType, false) 
)) 

然後,通過將其轉換爲特定類型的指定將要B存在一個行內的每個柱:

val dataRdd = datafile.filter(x => x!=header).map(x => x.split(",")) 
    .map(col => Row(
    col(0).trim, 
    col(1).trim.toInt, 
    col(2).trim.toFloat, 
    col(3).trim, 
    col(4).trim.toFloat, 
    col(5).trim.toFloat, 
    col(6).trim.toFloat, 
    col(7).trim, 
    col(8).trim.toInt) 
) 

然後施加Schema to the RDD

val auctionDF = spark.sqlContext.createDataFrame(dataRdd,schema1) 
相關問題