我有類似的問題,我通過使用Spark-JobServer解決。
與火花Jobserver(SJS)主要做法通常是創建一個擴展他們的SparkSQLJob比如在下面的例子中一個特殊的工作:
object ExecuteQuery extends SparkSQLJob {
override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = sqlContext.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
然而,這種方法與卡桑德拉很好地工作,您必須使用spark-cassandra-connector,然後加載數據,您將有兩種選擇:
1)之前調用通過REST這個ExecuteQuery
,你必須傳遞你想從卡桑德拉查詢到星火完整的數據。對於這一點,你會做這樣的事情(改編自spark-cassandra-connector documentation代碼):
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "words", "keyspace" -> "test"))
.load()
然後把它註冊爲一個表,以SparkSQL能夠訪問它:
df.registerAsTempTable("myTable") // As a temporary table
df.write.saveAsTable("myTable") // As a persistent Hive Table
只有之後,您將能夠使用ExecuteQuery
從myTable
進行查詢。
2)由於在某些使用情況下第一個選項可能效率低下,因此還有另一種選擇。
spark-cassandra連接器有一個特殊的CassandraSQLContext,可用於直接從Spark中查詢C *表。它可用於像:
val cc = new CassandraSQLContext(sc)
val df = cc.sql("SELECT * FROM keyspace.table ...")
然而,使用不同類型的上下文有火花JobServer,您需要擴展SparkContextFactory
和上下文創建的時刻(這可以通過一個POST請求來實現用它到/contexts
)。在SJS Gitub上可以看到一個特殊的上下文工廠的例子。你還必須創建一個SparkCassandraJob
,擴展SparkJob
(但這部分非常easy)。
最後,ExecuteQuery
工作必須適應使用新的類。這將是這樣的:
object ExecuteQuery extends SparkCassandraJob {
override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = cc.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
之後,ExecuteQuery
工作可以通過休息與POST請求執行。
結論
這裏我用的第一選擇,因爲我需要在HiveContext
可用的高級查詢(窗口功能,例如),這是不是在CassandraSQLContext
可用。但是,如果你不需要這些操作,我推薦第二種方法,即使它需要一些額外的編碼來爲SJS創建一個新的ContextFactory。