2015-12-03 42 views
0

我在scala腳本中編寫簡單的ETL過程,並以'spark-shell -i rawetl.scala'運行,但得到異常「Table not found BaseTable」。我也檢查了文件,它正在選擇正確。ETL Scala腳本得到了異常

下面是示例代碼

import java.io.File 
import sqlContext.implicits._ 
import scala.io.Source 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 
import sys.process._ 
case class pageRow(affiliateid : String , pageurl : String, alertedbrandsafety : String, blockedbrandsafety : String, grossimpressions : String, spider_bot : String, invalidbrowser : String ,outlieractivity : String , day : String) 

object batch_nht { 
    def main() { 
     processRawNHT() 
    } 

    def processRawNHT() { 

     val rawFile = "hadoop fs -ls /tmp/XXX/rawDB/" #| "tail -1" !! 
     val fileName = rawFile.substring(rawFile.indexOf("/")) 
     val filePathName = "hdfs://AAAAA:8020" + fileName.trim() 
     println(filePathName) 

     val sc = new SparkContext(new SparkConf().setAppName("analyzeBlog")) 
     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

     val inviewraw = sc.textFile(filePathName).map(_.split(",")).map(x=>x.map(_.replace("\"",""))) 
     val base_people = inviewraw.map{r => if(r(13) == null || r(13).trim.isEmpty) (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0)) else (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0))} 

     val logs_base_page_schemaRDD = base_people.map(p => pageRow(p._1, p._2, p._3, p._4,p._5, p._6, p._7, p._8,p._9)).toDF() 

     logs_base_page_schemaRDD.registerTempTable("baseTable") 

     sqlContext.sql("select * from baseTable").collect().foreach(println) 
    } 
} 
batch_nht.main() 

注:如果我跑了波紋管通過一個火花外殼(無腳本)命令之一,我沒有任何異常正確的輸出。

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.implicits._ 

val inviewraw = sc.textFile("hdfs://AAAAA:8020/tmp/XXX/rawDB/rawFile.csv").map(_.split(",")).map(x=>x.map(_.replace("\"",""))) 
val base_people = inviewraw.map{r => if(r(13) == null || r(13).trim.isEmpty) (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0)) else (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0))} 

case class pageRow(affiliateid : String , pageurl : String, alertedbrandsafety : String, blockedbrandsafety : String, grossimpressions : String, spider_bot : String, invalidbrowser : String ,outlieractivity : String , day : String) 

val logs_base_page_schemaRDD = base_people.map(p => pageRow(p._1, p._2, p._3, p._4,p._5, p._6, p._7, p._8,p._9)).toDF() 

--create table 
logs_base_page_schemaRDD.registerTempTable("baseTable") 
sqlContext.sql("select * from baseTable").collect().foreach(println) 

請提示出了什麼問題?在腳本中

回答

1

這是測試的代碼片段。您的方法不適用於使用scala的編程方法。

import java.io.File 
import sqlContext.implicits._ 
import scala.io.Source 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 
import sys.process._ 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.types.{StructType,StructField,StringType}; 

object batch_nht { 
    def main() { 
     processRawNHT() 
    } 

    def processRawNHT() { 
     val rawFile = "hadoop fs -ls /user/cloudera/cards/" #| "tail -1" !! 
     val fileName = rawFile.substring(rawFile.indexOf("/")) 
     val filePathName = "hdfs://quickstart.cloudera:8020" + fileName.trim() 
     println(filePathName) 
     val schemaString = "color|suit|pip" 
     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     val deck = sc.textFile(filePathName).map(_.split("\\|")) 
     val schema = 
      StructType(
      schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true))) 
     val base_deckRDD = deck.map{r => Row(r(0), r(1), r(2))} 
     val cardsDataFrame = sqlContext.createDataFrame(base_deckRDD, schema) 
     cardsDataFrame.registerTempTable("deck_of_cards") 
     val firstTen = sqlContext.sql("select * from deck_of_cards limit 10") 
     firstTen.map(r => (r(0), r(1), r(2))).collect().foreach(println) 
    } 
} 

batch_nht.main() 
+0

這只是一個例子。正如我已經提到「我也檢查過文件,它正在選擇正確。」 –

+0

用工作腳本編輯。請它知道它是否有幫助。 –

+1

這真棒。我改變了我的東西,它正在工作。這是我的第一個scala腳本。多謝。 –