我開始使用spark來學習。我做了一個基於this document的簡單程序。SparkSQL DataFrame:使用緩存時sql查詢不起作用
我的程序從文件(在HDFS集羣上)讀取支付日誌,將其傳輸到一個數據框,並在一些sql查詢中使用這個數據框。我在兩種情況下運行我的程序:使用和不使用cache()方法。我遇到了一個奇怪的問題,因爲描述波紋管:
- 不使用高速緩存():
我試圖運行一些查詢和一切都很好。 (log_zw是我的表名)
val num_records = sqlContext.sql("select * from log_zw").count
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
- 使用高速緩存()
我也使用上述兩個查詢。第一個查詢返回正確的值,但第二個是而不是,它返回。
然而,當我在另一種方法查詢它:
val num_acc1 = log_zw.filter(log_zw("ACN").contains("acc1")).count
它返回正確的結果。
我對Spark和集羣計算系統非常陌生,我沒有任何想法,爲什麼它像那樣工作。任何人都可以請向我解釋這個問題,特別是使用SQL查詢和火花方法時的不同。
編輯:這是模式,它非常簡單。
root
|-- PRODUCT_ID: string (nullable = true)
|-- CHANNEL: string (nullable = true)
|-- ACN: string (nullable = true)
|-- AMOUNT_VND: double (nullable = false)
|-- TRANS_ID: string (nullable = true)
EDIT2:這是使用高速緩存()時,我的代碼:(我跑了一些查詢,結果表明:在代碼中的註釋)
// read tsv files
case class LogZW(
PRODUCT_ID: String,
PLATFORM: String,
CHANNEL: String,
ACN: String,
AMOUNT_VND: Double,
TRANS_ID: String)
def loadLog(filename: String): DataFrame = {
sc.textFile(filename).map(line => line.split("\t")).map(p =>
LogZW(p(1), p(3), p(4), p(5), p(9).toDouble, p(10).substring(0,8))).toDF()
}
// generate schema
val schemaString = "PRODUCT_ID PLATFORM CHANNEL ACN AMOUNT_VND TRANS_ID"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// read all files
val HDFSFolder = "hdfs://master:54310/user/lqthang/data/*"
val log = loadLog(HDFSFolder)
// register table
log.registerTempTable("log")
log.show()
// select a subset of log table
val log_zw = sqlContext.sql("select PRODUCT_ID, CHANNEL, ACN, AMOUNT_VND, TRANS_ID from log where PLATFORM = 'zingwallet' and CHANNEL not in ('CBZINGDEAL', 'VNPT') and PRODUCT_ID not in ('ZingCredit', 'zingcreditdbg') ")
// register new table
log_zw.show()
log_zw.registerTempTable("log_zw")
// cache table
log_zw.cache()
// this query returns incorrect value!!
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
// this query returns correct value!
val num_acc2 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count
// uncache data and try another query
log_zw.unpersist()
// this query also returns the correct value!!!
val num_acc2 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
EDIT3:我試着將另一個緩存()方法添加到log
數據幀:
// register table
log.registerTempTable("log")
log.show()
log.cache()
以下代碼是相同的如上(log_zw.cache()
)。所以重要的結果是:
// this query returns the CORRECT value!!
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
你能告訴我們的模式? –
@AlbertoBonsanto我添加了架構。 –