0
我在scala腳本中編寫簡單的ETL過程,並以'spark-shell -i rawetl.scala'運行,但得到異常「Table not found BaseTable」。我也檢查了文件,它正在選擇正確。ETL Scala腳本得到了異常
下面是示例代碼
import java.io.File
import sqlContext.implicits._
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import sys.process._
case class pageRow(affiliateid : String , pageurl : String, alertedbrandsafety : String, blockedbrandsafety : String, grossimpressions : String, spider_bot : String, invalidbrowser : String ,outlieractivity : String , day : String)
object batch_nht {
def main() {
processRawNHT()
}
def processRawNHT() {
val rawFile = "hadoop fs -ls /tmp/XXX/rawDB/" #| "tail -1" !!
val fileName = rawFile.substring(rawFile.indexOf("/"))
val filePathName = "hdfs://AAAAA:8020" + fileName.trim()
println(filePathName)
val sc = new SparkContext(new SparkConf().setAppName("analyzeBlog"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val inviewraw = sc.textFile(filePathName).map(_.split(",")).map(x=>x.map(_.replace("\"","")))
val base_people = inviewraw.map{r => if(r(13) == null || r(13).trim.isEmpty) (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0)) else (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0))}
val logs_base_page_schemaRDD = base_people.map(p => pageRow(p._1, p._2, p._3, p._4,p._5, p._6, p._7, p._8,p._9)).toDF()
logs_base_page_schemaRDD.registerTempTable("baseTable")
sqlContext.sql("select * from baseTable").collect().foreach(println)
}
}
batch_nht.main()
注:如果我跑了波紋管通過一個火花外殼(無腳本)命令之一,我沒有任何異常正確的輸出。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val inviewraw = sc.textFile("hdfs://AAAAA:8020/tmp/XXX/rawDB/rawFile.csv").map(_.split(",")).map(x=>x.map(_.replace("\"","")))
val base_people = inviewraw.map{r => if(r(13) == null || r(13).trim.isEmpty) (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0)) else (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0))}
case class pageRow(affiliateid : String , pageurl : String, alertedbrandsafety : String, blockedbrandsafety : String, grossimpressions : String, spider_bot : String, invalidbrowser : String ,outlieractivity : String , day : String)
val logs_base_page_schemaRDD = base_people.map(p => pageRow(p._1, p._2, p._3, p._4,p._5, p._6, p._7, p._8,p._9)).toDF()
--create table
logs_base_page_schemaRDD.registerTempTable("baseTable")
sqlContext.sql("select * from baseTable").collect().foreach(println)
請提示出了什麼問題?在腳本中
這只是一個例子。正如我已經提到「我也檢查過文件,它正在選擇正確。」 –
用工作腳本編輯。請它知道它是否有幫助。 –
這真棒。我改變了我的東西,它正在工作。這是我的第一個scala腳本。多謝。 –