2016-08-11 200 views
1

我正在使用spark 2.0.0。這裏是我的代碼:Logical Plan to DataFrame/Dataset Apache spark

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases 
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 

object WikiDataframe { 

    def getDataframe(sparkSession: SparkSession): DataFrame = { 

    val df = sparkSession.read.option("header", "true").option("inferSchema", "true").csv(FILE_LOCATION) 

    df.registerTempTable("pageviews_by_second") 

    df 
    } 

    def main(args: Array[String]) { 
    val sparkSession = SparkSession 
     .builder() 
     .appName("Spark SQL Example") 
     .master("local") 
     .getOrCreate() 

    val pageViewsDF = WikiDataframe.getDataframe(sparkSession) 

    val query: DataFrame = sparkSession.sql("select Date from (select * from pageviews_by_second) a") 

    var logicalQuery: LogicalPlan = query.queryExecution.logical 

    println("logicalQuery : " + logicalQuery); 

    import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases 

    println("Eliminating sub queries"); 

    logicalQuery = EliminateSubqueryAliases.apply(logicalQuery) 

    } 
} 

我堅持上執行logicalQuery。如果可能,我想獲得dataframedataset。 任何幫助,將不勝感激

+0

什麼是與星火應用程序的問題?你想從...得到一個'數據集'?什麼?從'logicalQuery'?它甚至還沒有執行過程的一半。請詳細說明一下,然後我可以給你更多的提示如何繼續。 –

回答

0

u需要在org.apache.spark.sql寫一個類,並有東西像下面

def apply(Sqlctx: SparkSession, Plan: LogicalPlan): DataFrame = { 
    Dataset.ofRows(Sqlctx, Plan) 
}