2017-07-06 11 views
2

我有一種情況,從兩個獨立的遠程配置單元的服務器比較兩個不同的表源和目的地,可我們能使用兩個SparkSessions像我以下嘗試: -我們能否能夠使用多張sparksessions訪問兩個不同的配置單元的服務器

val spark = SparkSession.builder().master("local") 
    .appName("spark remote") 
    .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false") 
    .config("javax.jdo.option.ConnectionUserName", "hiveroot") 
    .config("javax.jdo.option.ConnectionPassword", "hivepassword") 
    .config("hive.exec.scratchdir", "/tmp/hive/${user.name}") 
    .config("hive.metastore.uris", "thrift://192.168.175.160:9083") 
    .enableHiveSupport() 
    .getOrCreate() 

SparkSession.clearActiveSession() 
SparkSession.clearDefaultSession() 

val sparkdestination = SparkSession.builder() 
    .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.42:3306/metastore?useSSL=false") 
    .config("javax.jdo.option.ConnectionUserName", "hiveroot") 
    .config("javax.jdo.option.ConnectionPassword", "hivepassword") 
    .config("hive.exec.scratchdir", "/tmp/hive/${user.name}") 
    .config("hive.metastore.uris", "thrift://192.168.175.42:9083") 
    .enableHiveSupport() 
    .getOrCreate() 

我試着用SparkSession.clearActiveSession() and SparkSession.clearDefaultSession(),但它不工作,拋出下面的錯誤:

Hive: Failed to access metastore. This class should not accessed in runtime. 
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 

有沒有我們可以達到訪問使用多個SparkSessions或代蜂巢表任何其他方式。

感謝

回答

1

SparkSessiongetOrCreate method

,其中指出

得到一個現有[SparkSession]或者,如果沒有現有的, 創建一個基於一個新的在此構建器中設置的選項。

此方法首先檢查是否存在有效的線程本地 SparkSession,如果是,則返回該線程。然後檢查 是否有一個有效的全局默認SparkSession,如果是,則返回 那一個。如果不存在有效的全局默認SparkSession,則方法 會創建一個新的SparkSession,並將新創建的 SparkSession指定爲全局默認值。 如果返回現有的SparkSession,則此構建器中指定的配置選項將應用於現有的 SparkSession。

這就是它返回第一次會話及其配置的原因。

請通過docs找出替代方法來創建會話..


我的工作< 2火花版本。所以,我不知道如何與出配置的碰撞究竟創建新的會話..

但是,在這裏是有用的測試案例即SparkSessionBuilderSuite.scala做但─ DIY ..

實例方法該測試用例

test("use session from active thread session and propagate config options") { 
    val defaultSession = SparkSession.builder().getOrCreate() 
    val activeSession = defaultSession.newSession() 
    SparkSession.setActiveSession(activeSession) 
    val session = SparkSession.builder().config("spark-config2", "a").getOrCreate() 

    assert(activeSession != defaultSession) 
    assert(session == activeSession) 
    assert(session.conf.get("spark-config2") == "a") 
    assert(session.sessionState.conf == SQLConf.get) 
    assert(SQLConf.get.getConfString("spark-config2") == "a") 
    SparkSession.clearActiveSession() 

    assert(SparkSession.builder().getOrCreate() == defaultSession) 
    SparkSession.clearDefaultSession() 
    } 
+0

我無法在SparkSession中找到newSession()方法。有沒有例子請。 – Vignesh

+0

我的代碼研究了構建火花並給出了上面的指針。你必須檢查文檔方法。事實上,我正在研究火花的<2版本。請檢查像'setActiveSession'等方法... –

2

我用這種方式和工作星火完美罰款2.1

val sc = SparkSession.builder() 
      .config("hive.metastore.uris", "thrift://dbsyz1111:10000") 
      .enableHiveSupport() 
      .getOrCreate() 

// Createdataframe 1 from by reading the data from hive table of metstore 1 
val dataframe_1 = sc.sql("select * from <SourcetbaleofMetaStore_1>") 

// Resetting the existing Spark Contexts 
SparkSession.clearActiveSession() 
SparkSession.clearDefaultSession() 

//Initialize Spark session2 with Hive Metastore 2 
val spc2 = SparkSession.builder() 
       .config("hive.metastore.uris", "thrift://dbsyz2222:10004") 
       .enableHiveSupport() 
       .getOrCreate() 

// Load dataframe 2 of spark context 1 into a new dataframe of spark context2, By getting schema and data by converting to rdd API 
val dataframe_2 = spc2.createDataFrame(dataframe_1.rdd, dataframe_1.schema) 

dataframe_2.write.mode("Append").saveAsTable(<targettableNameofMetastore_2>) 
相關問題