我載入我的CSV數據框使用我,然後轉換爲數據集,但它顯示像這行這個如何使用Scala在Spark中使用DataSet?
多個標記:
- 無法找到存儲在數據集型編碼器。通過導入
spark.implicits._支持原始類型(Int,字符串等)和產品類型(案例類別)。將來版本中將添加對序列化其他類型的支持。
- 方法的參數不夠:(隱式證據$ 2:
org.apache.spark.sql.Encoder [DataSet.spark.aacsv])org.apache.spark.sql.Dataset [DataSet.spark.aacsv] 。未指定值參數證據$ 2
如何解決此問題? 我的代碼 -
case class aaCSV(
a: String,
b: String
)
object WorkShop {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv")
df.printSchema()
df.show()
val googleDS = df.as[aaCSV]
googleDS.show()
}
}
現在我改變的主要功能是這樣 -
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
sa.printSchema()
sa.show()
}
但它拋出錯誤 - 異常線程 「main」 org.apache.spark.sql.AnalysisException:不能給定輸入列:[_c1,_c2,_c5,_c4,_c6,_c3,_c0]解析'Adj_Close
';第1行pos 7。我該怎麼辦 ?
現在我執行我的方法使用基於給定的時間間隔使用火花調度。但我提到這個鏈接 - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application。請幫助我們。
「沒有足夠的論據法」 ......有什麼辦法?你的代碼在哪裏? –
嗯。請不要使用註釋代碼。編輯你的問題並適當地格式化它。謝謝 –
@Sarathkumar Vulchi:在將df轉換爲ds之前,您是否可以嘗試添加此行'sqlContext.implicits._'。 – Shankar