Spark 2.1,具有原始計數(*)的結構化流,sum(field)在parquet文件上工作正常,但過濾不起作用。 示例代碼:Spark結構化流媒體和過濾器
Welcome to
____ __
/__/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.3-8
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.types._
val userSchema = new StructType()
.add("caseId", StringType)
.add("ts", LongType)
.add("rowtype", StringType)
.add("rowordernumber", IntegerType)
.add("parentrowordernumber", IntegerType)
.add("fieldname", StringType)
.add("valuestr", StringType)
val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2")
csvDF.createOrReplaceTempView("tmptable")
val aggDF = spark.sql("select count(*) from tmptable where rowtype='3600'")
aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()
aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
// Exiting paste mode, now interpreting.
+--------+
|count(1)|
+--------+
+--------+
import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(caseId,StringType,true), StructField(ts,LongType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true))
csvDF: org.apache.spark.sql.DataFrame = [caseId: string, ts: bigint ... 5 more fields]
aggDF: org.apache.spark.sql.DataFrame = [count(1): bigint]
-------------------------------------------
Batch: 0
-------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+--------+
|count(1)|
+--------+
| 0|
+--------+
而且我已經試過的NoSQL風格過濾: VAL aggDF = csvDF.filter( 「行類型== '3600'」)AGG(計數( 「caseId」))
。沒有成功,我已經檢查了實木複合地板的文件,也有一些行,其中行類型=「3600」
[[email protected] ~]# spark-sql
SPARK_MAJOR_VERSION is set to 2, using Spark2
spark-sql> select count(*) from tab1 where rowtype='3600' ;
433698463
你能檢查什麼數據,你已經來了嗎?比如說,使用這個查詢:'select rowtype,count(*)from tmptable group by rowtype'。 – nonsleepr
感謝您的提示! rowtype,count(*)爲rowtype返回空值,我再次檢查了avro-parquet文件,發現在avro-parquet列名中是rowType,而不是rowtype。這導致了rowtype列中的空值。 – Triffids