2016-05-07 66 views
2

我目前使用npm的cassandra-driver從Node.js服務器查詢我的Cassandra數據庫。因爲我希望能夠編寫更復雜的查詢,所以我想使用Spark SQL而不是CQL。有沒有什麼方法可以創建一個RESTful API(或其他),以便我可以像我當前使用CQL一樣使用Spark SQL?查詢來自Node.js服務器的Spark SQL

換句話說,我希望能夠從我的Node.js服務器發送一個Spark SQL查詢到另一個服務器並返回結果。

有沒有辦法做到這一點?我一直在尋找這個問題的解決方案一段時間,還沒有發現任何東西。

編輯:我能從Spark shell中用Scala和Spark SQL查詢我的數據庫,所以這個位正在工作。我只需要以某種方式連接Spark和我的Node.js服務器。

回答

1

我有類似的問題,我通過使用Spark-JobServer解決。

與火花Jobserver(SJS)主要做法通常是創建一個擴展他們的SparkSQLJob比如在下面的例子中一個特殊的工作:

object ExecuteQuery extends SparkSQLJob { 
    override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = { 
    // Code to validate the parameters received in the request body 
    } 
    override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = { 
    // Assuming your request sent a { "query": "..." } in the body: 
    val df = sqlContext.sql(config.getString("query")) 
    createResponseFromDataFrame(df) // You should implement this 
    } 
} 

然而,這種方法與卡桑德拉很好地工作,您必須使用spark-cassandra-connector,然後加載數據,您將有兩種選擇:

1)之前調用通過REST這個ExecuteQuery,你必須傳遞你想從卡桑德拉查詢到星火完整的數據。對於這一點,你會做這樣的事情(改編自spark-cassandra-connector documentation代碼):

val df = sqlContext 
    .read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "words", "keyspace" -> "test")) 
    .load() 

然後把它註冊爲一個表,以SparkSQL能夠訪問它:

df.registerAsTempTable("myTable") // As a temporary table 
df.write.saveAsTable("myTable") // As a persistent Hive Table 

只有之後,您將能夠使用ExecuteQuerymyTable進行查詢。

2)由於在某些使用情況下第一個選項可能效率低下,因此還有另一種選擇。

spark-cassandra連接器有一個特殊的CassandraSQLContext,可用於直接從Spark中查詢C *表。它可用於像:

val cc = new CassandraSQLContext(sc) 
val df = cc.sql("SELECT * FROM keyspace.table ...") 

然而,使用不同類型的上下文有火花JobServer,您需要擴展SparkContextFactory和上下文創建的時刻(這可以通過一個POST請求來實現用它到/contexts)。在SJS Gitub上可以看到一個特殊的上下文工廠的例子。你還必須創建一個SparkCassandraJob,擴展SparkJob(但這部分非常easy)。

最後,ExecuteQuery工作必須適應使用新的類。這將是這樣的:

object ExecuteQuery extends SparkCassandraJob { 
    override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = { 
    // Code to validate the parameters received in the request body 
    } 
    override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = { 
    // Assuming your request sent a { "query": "..." } in the body: 
    val df = cc.sql(config.getString("query")) 
    createResponseFromDataFrame(df) // You should implement this 
    } 
} 

之後,ExecuteQuery工作可以通過休息與POST請求執行。


結論

這裏我用的第一選擇,因爲我需要在HiveContext可用的高級查詢(窗口功能,例如),這是不是在CassandraSQLContext可用。但是,如果你不需要這些操作,我推薦第二種方法,即使它需要一些額外的編碼來爲SJS創建一個新的ContextFactory。