2015-10-19 70 views
0

我開始使用spark來學習。我做了一個基於this document的簡單程序。SparkSQL DataFrame:使用緩存時sql查詢不起作用

我的程序從文件(在HDFS集羣上)讀取支付日誌,將其傳輸到一個數據框,並在一些sql查詢中使用這個數據框。我在兩種情況下運行我的程序:使用和不使用cache()方法。我遇到了一個奇怪的問題,因爲描述波紋管:

  1. 不使用高速緩存():

我試圖運行一些查詢和一切都很好。 (log_zw是我的表名)

val num_records = sqlContext.sql("select * from log_zw").count 
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
  • 使用高速緩存()
  • 我也使用上述兩個查詢。第一個查詢返回正確的值,但第二個是而不是,它返回。

    然而,當我在另一種方法查詢它:

    val num_acc1 = log_zw.filter(log_zw("ACN").contains("acc1")).count 
    

    它返回正確的結果。

    我對Spark和集羣計算系統非常陌生,我沒有任何想法,爲什麼它像那樣工作。任何人都可以請向我解釋這個問題,特別是使用SQL查詢和火花方法時的不同。

    編輯:這是模式,它非常簡單。

    root 
    |-- PRODUCT_ID: string (nullable = true) 
    |-- CHANNEL: string (nullable = true) 
    |-- ACN: string (nullable = true) 
    |-- AMOUNT_VND: double (nullable = false) 
    |-- TRANS_ID: string (nullable = true) 
    

    EDIT2:這是使用高速緩存()時,我的代碼:(我跑了一些查詢,結果表明:在代碼中的註釋)

    // read tsv files 
    case class LogZW(
        PRODUCT_ID: String, 
        PLATFORM: String, 
        CHANNEL: String, 
        ACN: String, 
        AMOUNT_VND: Double, 
        TRANS_ID: String) 
    
    def loadLog(filename: String): DataFrame = { 
        sc.textFile(filename).map(line => line.split("\t")).map(p => 
        LogZW(p(1), p(3), p(4), p(5), p(9).toDouble, p(10).substring(0,8))).toDF() 
    } 
    
    // generate schema 
    val schemaString = "PRODUCT_ID PLATFORM CHANNEL ACN AMOUNT_VND TRANS_ID" 
    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 
    
    // read all files 
    val HDFSFolder = "hdfs://master:54310/user/lqthang/data/*" 
    val log = loadLog(HDFSFolder) 
    
    // register table 
    log.registerTempTable("log") 
    log.show() 
    
    // select a subset of log table 
    val log_zw = sqlContext.sql("select PRODUCT_ID, CHANNEL, ACN, AMOUNT_VND, TRANS_ID from log where PLATFORM = 'zingwallet' and CHANNEL not in ('CBZINGDEAL', 'VNPT') and PRODUCT_ID not in ('ZingCredit', 'zingcreditdbg') ") 
    
    // register new table 
    log_zw.show() 
    log_zw.registerTempTable("log_zw") 
    
    // cache table 
    log_zw.cache() 
    
    // this query returns incorrect value!! 
    val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
    
    // this query returns correct value! 
    val num_acc2 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count 
    
    // uncache data and try another query 
    log_zw.unpersist() 
    
    // this query also returns the correct value!!! 
    val num_acc2 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
    

    EDIT3:我試着將另一個緩存()方法添加到log數據幀:

    // register table 
    log.registerTempTable("log") 
    log.show() 
    log.cache() 
    

    以下代碼是相同的如上(log_zw.cache())。所以重要的結果是:

    // this query returns the CORRECT value!! 
    val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
    
    +0

    你能告訴我們的模式? –

    +0

    @AlbertoBonsanto我添加了架構。 –

    回答

    0

    我們沒有很多關於數據是什麼的細節,但我注意到你的兩個代碼段做了不同的事情。

    在第一個,你做ACN = 'acc1',但在第二個,你檢查ACN 包含'acc1'。

    所以第二位(帶過濾器),將匹配如果ACN是「ACC1」或「ACC1」或「ACC1」

    換句話說,我敢打賭,如果你添加一個微調到您的SQL查詢你會得到不同的結果。

    那麼試試這個:
    val num_records = sqlContext.sql("select * from log_zw").count val num_acc1 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count

    +0

    感謝您的回答。 使用** trim **給了我一個正確的結果。但是,它不回答我的問題。而且,當不使用** cache()**和** trim **時:sqlContext.sql(「select * from log_zw where ACN ='acc1'」)。count也返回正確的結果。 –

    +0

    我需要更多信息。你可以減少一個樣本,給出不正確的計數(你可以更改私人信息)?這樣我們可以嘗試重現這一點。你在哪裏放了'''cache()'''調用?請更新您的2號碼(使用緩存)。 – kanielc

    +0

    根據你的建議,我添加了整個代碼,並只顯示了一個錯誤結果的樣本。 –