0

我是非常新的Spark機器學習我想要傳遞多個列的功能,在我的下面的代碼中,我只是將日期列傳遞給功能,但現在我想通過Userid和日期列到要素。我試圖用向量,但它僅支持雙數據類型,但在我的情況下,我有int和string如何提供多列到setInputCol()

我會感激,如果任何人提供任何建議/解決方案或任何代碼示例,這將滿足我的要求

代碼:

case class LabeledDocument(Userid: Double, Date: String, label: Double) 
val training = spark.read.option("inferSchema", true).csv("/root/Predictiondata3.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument] 
import scala.beans.BeanInfo 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.classification.LogisticRegression 
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
import org.apache.spark.mllib.linalg.Vector 
import org.apache.spark.sql.{Row, SQLContext} 
val tokenizer = new Tokenizer().setInputCol("Date").setOutputCol("words") 
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") 
import org.apache.spark.ml.regression.LinearRegression 
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001) 
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) 
val model = pipeline.fit(training.toDF()) 
case class Document(Userid: Integer, Date: String) 
val test = sc.parallelize(Seq(Document(4, "04-Jan-18"),Document(5, "01-Jan-17"),Document(2, "03-Jan-17"))) 
model.transform(test.toDF()).show() 

輸入數據使用列

Userid,Date,SwipeIntime 
1, 1-Jan-2017,9.30 
1, 2-Jan-2017,9.35 
1, 3-Jan-2017,9.45 
1, 4-Jan-2017,9.26 
2, 1-Jan-2017,9.37 
2, 2-Jan-2017,9.35 
2, 3-Jan-2017,9.45 
2, 4-Jan-2017,9.46 
+0

您將需要使用vectorAssembler。它需要數值,矢量,布爾類型的數據。您可以使用stringindexer將字符串轉換爲索引。 – hadooper

+0

謝謝我會試試看,看看 – Bhavesh

+0

@hadooper你可以分享一些例子,我嘗試了下面的代碼val assembler1 = new VectorAssembler()。setInputCols(Array(「Userid」,「Date」))。setOutputCol(「vec1」) val assemble1 = assembler1.transform(training)java.lang.IllegalArgumentException:不支持數據類型StringType。 – Bhavesh

回答

-1

我得到了我能夠做到的解決方案。

import scala.beans.BeanInfo 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.classification.LogisticRegression 
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
import org.apache.spark.mllib.linalg.Vector 
import org.apache.spark.sql.{Row, SQLContext} 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.ml.attribute.NominalAttribute 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType,StructField,StringType} 
case class LabeledDocument(Userid: Double, Date: String, label: Double) 
val trainingData = spark.read.option("inferSchema", true).csv("/root/Predictiondata10.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument] 
import org.apache.spark.ml.feature.StringIndexer 
import org.apache.spark.ml.feature.VectorAssembler 
val DateIndexer = new StringIndexer().setInputCol("Date").setOutputCol("DateCat") 
val indexed = DateIndexer.fit(trainingData).transform(trainingData) 
val assembler = new VectorAssembler().setInputCols(Array("DateCat", "Userid")).setOutputCol("rawfeatures") 
val output = assembler.transform(indexed) 
val rows = output.select("Userid","Date","label","DateCat","rawfeatures").collect() 
val asTuple=rows.map(a=>(a.getInt(0),a.getString(1),a.getDouble(2),a.getDouble(3),a(4).toString())) 
val r2 = sc.parallelize(asTuple).toDF("Userid","Date","label","DateCat","rawfeatures") 
val Array(training, testData) = r2.randomSplit(Array(0.7, 0.3)) 
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
val tokenizer = new Tokenizer().setInputCol("rawfeatures").setOutputCol("words") 
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") 
import org.apache.spark.ml.regression.LinearRegression 
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001) 
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) 
val model = pipeline.fit(training.toDF()) 
model.transform(testData.toDF()).show()