爲了在卡桑德拉做一個準備好的聲明,然後註冊數據和結構化的火花流,同時還處理流,你需要:
- 進口com.datastax.driver.core。會議
- 進口com.datastax.spark.connector.cql.CassandraConnector
然後,建立你的連接器:
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
有兩個會議和連接器,你現在就可以給你打電話聲明斯卡拉類
connector.withSessionDo { session =>
Statements.PreparedStatement()
}
你終於可以寫的準備好的聲明功能用Cassandra將數據寫入以下函數完成,cql是結合變量到準備好的聲明,並執行它的功能:
private def processRow(value: Commons.UserEvent) = {
connector.withSessionDo { session =>
session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
}
}
當然,你必須在foreach作家調用這個函數(processRow)
// This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start