2017-09-14 61 views
1

我已經編寫了一個結構化流式聚合,它從卡夫卡源中獲取事件,執行簡單計數並將它們寫回到卡桑德拉數據庫。代碼如下所示:結構化流式聚合返回錯誤的值

val data = stream 
    .groupBy(functions.to_date($"timestamp").as("date"), $"type".as("type")) 
    .agg(functions.count("*").as("value")) 

val query: StreamingQuery = data 
    .writeStream 
    .queryName("group-by-type") 
    .format("org.apache.spark.sql.streaming.cassandra.CassandraSinkProvider") 
    .outputMode(OutputMode.Complete()) 
    .option("checkpointLocation", config.getString("checkpointLocation") + "/" + "group-by-type") 
    .option("keyspace", "analytics") 
    .option("table", "aggregations") 
    .option("partitionKeyColumns", "project,type") 
    .option("clusteringKeyColumns", "date") 
    .start() 

問題是計數剛剛結束每一批。所以我會看到卡桑德拉的數量下降。計數不應該一天下降,我該如何實現這一目標?

編輯: 我一直在使用的窗口聚集太多,同樣的事情

+0

你試過更新模式嗎? –

+0

是的,我曾嘗試過,同樣的行爲 –

回答

1

因此,在這種情況下,錯誤本來就不是我的查詢或星火嘗試。 要找出問題出在哪裏,我使用了控制檯接收器,並且沒有發現問題。

的問題是在我的卡桑德拉下沉看起來像這樣:

class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink { 
    override def addBatch(batchId: Long, data: DataFrame): Unit = { 
    data.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save() 
    } 
} 

它使用Datastax星火卡桑德拉連接器寫入數據幀。

問題是變量data包含流式數據集。在Spark提供的ConsoleSink中,DataSet在寫入之前被複制到一個靜態DataSet中。所以我改變了它,現在它工作。完成的版本如下所示:

class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink { 
    override def addBatch(batchId: Long, data: DataFrame): Unit = { 
    val ds = data.sparkSession.createDataFrame(
     data.sparkSession.sparkContext.parallelize(data.collect()), 
     data.schema 
    ) 
    ds.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save() 
    } 
}