2017-07-31 27 views
0

Spark 2.1,具有原始計數(*)的結構化流,sum(field)在parquet文件上工作正常,但過濾不起作用。 示例代碼:Spark結構化流媒體和過濾器

Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.3-8 
     /_/ 

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131) 
Type in expressions to have them evaluated. 
Type :help for more information. 

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

import org.apache.spark.sql.types._ 

val userSchema = new StructType() 
    .add("caseId", StringType) 
    .add("ts", LongType) 
    .add("rowtype", StringType) 
    .add("rowordernumber", IntegerType) 
    .add("parentrowordernumber", IntegerType) 
    .add("fieldname", StringType) 
    .add("valuestr", StringType) 

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2") 

csvDF.createOrReplaceTempView("tmptable") 
val aggDF = spark.sql("select count(*) from tmptable where rowtype='3600'") 

aggDF 
    .writeStream 
    .outputMode("complete") 
    .format("console") 
    .start() 

aggDF 
.writeStream 
.queryName("aggregates") // this query name will be the table name 
.outputMode("complete") 
    .format("memory") 
    .start() 
spark.sql("select * from aggregates").show() 


// Exiting paste mode, now interpreting. 

+--------+ 
|count(1)| 
+--------+ 
+--------+ 

import org.apache.spark.sql.types._ 
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(caseId,StringType,true), StructField(ts,LongType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true)) 
csvDF: org.apache.spark.sql.DataFrame = [caseId: string, ts: bigint ... 5 more fields] 
aggDF: org.apache.spark.sql.DataFrame = [count(1): bigint] 

------------------------------------------- 
Batch: 0 
------------------------------------------- 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
+--------+ 
|count(1)| 
+--------+ 
|  0| 
+--------+ 

而且我已經試過的NoSQL風格過濾: VAL aggDF = csvDF.filter( 「行類型== '3600'」)AGG(計數( 「caseId」))

沒有成功,我已經檢查了實木複合地板的文件,也有一些行,其中行類型=「3600」

[[email protected] ~]# spark-sql 
SPARK_MAJOR_VERSION is set to 2, using Spark2 
spark-sql> select count(*) from tab1 where rowtype='3600' ; 
433698463 
+0

你能檢查什麼數據,你已經來了嗎?比如說,使用這個查詢:'select rowtype,count(*)from tmptable group by rowtype'。 – nonsleepr

+0

感謝您的提示! rowtype,count(*)爲rowtype返回空值,我再次檢查了avro-parquet文件,發現在avro-parquet列名中是rowType,而不是rowtype。這導致了rowtype列中的空值。 – Triffids

回答

1

當您的數據是靜態的時,您不需要指定自己的模式。在這種情況下,Spark可以自己計算拼花數據集的模式。例如:

case class Foo(lowercase: String, upperCase: String) 
val df = spark.createDataset(List(Foo("abc","DEF"), Foo("ghi","JKL"))) 
df.write.parquet("/tmp/parquet-test") 
val rdf = spark.read.parquet("/tmp/parquet-test") 
rdf.printSchema 
// root 
// |-- lowercase: string (nullable = true) 
// |-- upperCase: string (nullable = true) 

在這個階段,隨後的SQL查詢將工作忽略的情況:

rdf.createOrReplaceTempView("rdf") 
spark.sql("select uppercase from rdf").collect 
// Array[org.apache.spark.sql.Row] = Array([DEF], [JKL]) 

星火有一個選項spark.sql.caseSensitive啓用/禁用區分大小寫(默認值是true),但它似乎它只能用於寫入。

嘗試做同樣與流會導致異常:

java.lang.IllegalArgumentException: Schema must be specified when creating a streaming 
    source DataFrame. If some files already exist in the directory, then depending 
    on the file format you may be able to create a static DataFrame on that directory 
    with 'spark.read.load(directory)' and infer schema from it. 

這留給您以下選項:

  1. 提供自己的模式,你沒有(要知道,雖然它是區分大小寫的)。
  2. 按照異常的建議和從已經存儲在文件夾中的數據導出的模式:
val userSchema = spark.read.parquet("/tmp/parquet-test").schema 
val streamDf = spark.readStream.schema(userSchema).parquet("/tmp/parquet-test") 
  • 泰爾火花通過設置spark.sql.streaming.schemaInference反正推斷模式到true
  • spark.sql("set spark.sql.streaming.schemaInference=true") 
    val streamDf = spark.readStream.parquet("/tmp/parquet-test") 
    streamDf.createOrReplaceTempView("stream_rdf") 
    val query = spark.sql("select uppercase, count(*) from rdf group by uppercase") 
        .writeStream 
        .format("console") 
        .outputMode("complete") 
        .start 
    
    +0

    我在流式數據框上有個例外: val csvDF = spark.readStream.parquet(「/ folder1/folder2」) java.lang.IllegalArgumentException:在創建流源DataFrame時必須指定架構。如果目錄中已經存在一些文件,那麼根據文件格式,您可以使用'spark.read.load(目錄)'在該目錄上創建一個靜態DataFrame並從中推斷模式。 – Triffids

    +0

    對不起,沒有在流上測試它,有一個選項可以在流上啓用模式推斷。我編輯了我的答案來解釋這一點。 – nonsleepr

    +0

    謝謝,第二個選項工作正常。也許你可以回答下一個問題:是否可以在readStream.parquet()中傳遞文件掩碼?我們的ETL過程有時會將小拼圖合併爲更大的拼圖,因爲hdfs上的小文件效率不高。但刪除的文件導致流過程錯誤(錯誤StreamExecution:查詢終止,錯誤java.io.FileNotFoundException:文件不存在)。也許有可能只讀新文件? – Triffids

    0

    問題是行類型的列名,在Avro的實木複合地板實際列名是「行類型」。修復

    .add("rowType", StringType) 
    

    解決了這個問題。