2015-04-15 201 views
1

我有一個包含90列和大約28000行的CSV文件。我想加載它並將其分爲火車(75%)和測試(25%)。我用下面的代碼:Spark:使用標題讀取CSV文件

代碼:

val data = sc.textFile(datadir + "/dados_frontwave_corte_pedra_ferramenta.csv") 
     .map(line => line.split(",")) 
      .filter(line => line.length>1) 
     .collect(); 

// Building the model 
val numIterations = 20; 
val model = LinearRegressionWithSGD.train(data, numIterations); 

我上 「數據」 以下錯誤:

type mismatch; found : Array[Array[String]] required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] 

問題是什麼?我在互聯網上搜索它,我找不到任何明確的答案。

我發現這個Spark Example,改變我的代碼如下:

val data = sc.textFile(datadir + "/dados_frontwave_corte_pedra_ferramenta.csv") 
val parsedData = data.map {line => 
    val parts = line.split(",") 
    LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(",").map(x => x.toDouble).toArray)) 
    }; 

錯誤已經但是當我運行時產生以下錯誤代碼:

15/04/15 16:53:52 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) 
java.lang.NumberFormatException: For input string ""12316"" 
at sun.misc.FloatingDecimal.readJavaFormatString(Unknown Source) 
at sun.misc.FloatingDecimal.parseDouble(Unknown Source) 
at scala.collection.immutable.StringLinke$class.toDouble(StringOps.scala:31) 
.... 

輸入文件的示例:

"","Ferramenta","Pedra","ensaio","Nrasgo","Vcorte","Vrotacional","PCorte","Tempo","Fh","Fr","Energia","Caudal","Vib_disco","Vib_maquina","xx","yy","zz","Fonte","id","rocha_classe","rocha_tipo","Resistência_mecânica_à_compressão","Res._mec._à_compr._após_teste_de_gelividade","Resistência_mecânica_à_flexão","Massa_volúmica_aparente","Absorção_de_água_à_P._At.N.","Porosidade_aberta","Coef._de_dilatação_linear_térmica_val._máx","Resistência_ao_desgaste","Resistência_ao_choque_altura_minima_de_queda","Resistência_ao_gelo","Al2O3","CaO","H2O.","K2O","MgO","MnO","Na2O","P2O5","SiO2","TiO2","microclina","plagioclase","quartzo","page_id","rocha_nome_2","P.R._.L.O.I..","plagioclase_.oligoclase.albite.","feldspato_potássico_.microclina.","feldspato_potássico_.essencialmente_microclina.","biotite","rocha_nome_3","oligoclase","plagioclase_.andesina.","horneblenda","feldspato_potássico","nefelina","aegirina_e_aegirina.augite","esfena","piroxena","olivina","horneblenda_verde","plagioclase_.oligoclase.","CO2","clorite","cloritóide","quartzo.feldspato","SO3","cloritóide.clorite","calcite","dolomite","serpentina_.antigorite.crisótilo.","mica_.biotite.moscovite.","feldspato","Fe2O3","plagioclase_ácida","cristobalite","rocha_nome_1","Ferramentas","Binder","LM","graf","WC","T","sigma","epsilon","m","E","H" 
"1","A-010-13","estremoz","ECE-E1",5,26,1430,5,6.08,-0.0981,57,720,23.5,0.9,3.5,162,197.2,5,"ECE-A-010-13-Estremoz_1",2,"sedimentares ","calcário",960,767,276,2711,0.07,0.18,11.5,3.4,57.5,48,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"A-010/13","A-038/11","LM 156",0,0,800,425,0.0062,1.09,159,2085 
"2","A-010-13","estremoz","ECE-E1",5,26,1430,5,5.9,-0.0981,63,720,23.5,0.9,3.5,157,197.2,5,"ECE-A-010-13-Estremoz_1",2,"sedimentares ","calcário",960,767,276,2711,0.07,0.18,11.5,3.4,57.5,48,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"A-010/13","A-038/11","LM 156",0,0,800,425,0.0062,1.09,159,2085 
+0

也許你可以看看使用火花CSV:https://github.com/databricks/spark-csv,它不解析爲您服務。 – sgvd

+0

你有你的領域雙引號,你需要修剪!如果你給我們一行或兩行輸入格式,我會很高興 – eliasah

+0

輸入文件的前三行被添加到問題中。 – Mohammad

回答

0

也許這是一個奇怪的解決方案,但試試這個:

val parts = line.split(",").map(x => x.replace("\"", "")).filter(x => x.length > 0) 
+0

你的意思是:「val parts = line.split(」,「)。map(x => x.replace(」\「」, 「」))「」而不是「val parts = line.split(」,「)」? – Mohammad

+0

準確地說,你需要用這些引號做些什麼 – znurgl

+0

我做了,但唯一的變化是以下一行錯誤:java .lang.NumberFormatException:空字符串 – Mohammad

1

在第一個示例中,方法train需要RDD,並且您正在向它傳遞一個Array。

collect是一種操作而不是轉換。刪除對collect的呼叫應該可以解決您的問題。

這應該工作

val data = sc.textFile(datadir + "/dados_frontwave_corte_pedra_ferramenta.csv") 
      .map(line => line.split(",")) 
      .filter(line => line.length>1); 

// Building the model 
val numIterations = 20; 
val model = LinearRegressionWithSGD.train(data, numIterations);