2016-07-05 11 views
1

我有一個在Zookeeper中寫入的配置信息。我使用Apache Curator來閱讀Curator Watcher的配置(如果有更好的解決方案來閱讀它,我很樂意使用它),所以如果在Zookeeper中更改配置,我會收到新配置。我在Spark中使用這個配置。我怎樣才能將它分享給同一應用程序的所有火花執行者?在多個Spark Executors上共享Zookeeper配置

謝謝!

LE:

謝謝Dikei,

在下面的代碼,在這裏,你會做觀察者實現?我是新來的星星,我不確定每個工作人員會發生什麼。

謝謝!

final JavaDStream<ElementMessage> nodeMessageStream = mapWithStateDistinctAndFiltered.flatMap(pair -> pair._2.buildElementMessages()) 
      .filter(f -> f != null); 

    nodeMessageStream.foreachRDD(rdd -> { 
     rdd.foreachPartition(r -> { 
      final ElementRecordRestClient rest = new ElementRecordRestClient(
        startProps.getProperty(InputPropertyKey.WEPAPP_URL.toString())); 
      r.forEachRemaining(message -> { 
       rest.createObject(message.toElementRecord()); 
      }); 
     }); 
    }); 
+0

謝謝你的回答。我編輯了初始文章。 – Vlad

回答

0

我在這種情況下要做的是在主節點上運行Curator Watcher,並使用Spark的廣播變量向所有執行器廣播配置。只要配置發生變化,您就會停止當前的流式上下文,並使用新配置啓動一個新的流式上下文。這將確保您的結果始終一致。

另一種方式是讀取foreachPartition lambda函數內的zookeeper配置。但是由於每個分區獨立讀取配置,相同RDD的不同分區可能會得到不同的配置,這可能不符合您的預期。

+0

有趣的解決方案,第一個,但停止和開始流困擾我。這意味着什麼,我怎樣才能停止和開始流(手動除外)?謝謝! – Vlad

+0

你必須通過調用當前上下文對象上的stop來停止處理來手動完成。然後創建一個新的流式上下文並通過調用'start'來啓動它。如果您的輸入源像一個持久隊列那樣工作,那麼當新的流式上下文開始時,它將在上一個上下文停止的地方恢復。 – Dikei

+0

謝謝你的幫助。 – Vlad