2015-05-04 38 views
1

我想處理一些在Spark獨立模式下運行的代碼以及在羣集上運行的Spark。基本上,對於RDD中的每個項目,我試圖將其添加到列表中,一旦完成,我想將此列表發送到Solr。如何處理在Apache Spark中的foreach塊之前運行的代碼?

當我在Spark的獨立模式下運行以下代碼時,此功能完全正常,但在集羣上運行相同的代碼時無效。當我在集羣上運行相同的代碼時,就像「發送到Solr」一樣,代碼的一部分在要發送到Solr的列表中填充項目之前執行。我嘗試在foreach之後強制執行solrInputDocumentJavaRDD.collect();,但它似乎沒有任何效果。

// For each RDD 
solrInputDocumentJavaDStream.foreachRDD(
     new Function<JavaRDD<SolrInputDocument>, Void>() { 
      @Override 
      public Void call(JavaRDD<SolrInputDocument> solrInputDocumentJavaRDD) throws Exception { 

      // For each item in a single RDD 
      solrInputDocumentJavaRDD.foreach(
        new VoidFunction<SolrInputDocument>() { 
         @Override 
         public void call(SolrInputDocument solrInputDocument) { 

         // Add the solrInputDocument to the list of SolrInputDocuments 
         SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); 
         } 
        }); 

      // Try to force execution 
      solrInputDocumentJavaRDD.collect(); 


      // After having finished adding every SolrInputDocument to the list 
      // add it to the solrServer, and commit, waiting for the commit to be flushed 
      try { 
       if (SolrIndexerDriver.solrInputDocumentList != null 
         && SolrIndexerDriver.solrInputDocumentList.size() > 0) { 
       SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); 
       SolrIndexerDriver.solrServer.commit(true, true); 
       SolrIndexerDriver.solrInputDocumentList.clear(); 
       } 
      } catch (SolrServerException | IOException e) { 
       e.printStackTrace(); 
      } 


      return null; 
      } 
     } 
); 

我應該怎麼做,使發送到Solr的部分SolrDocuments名單後執行添加到solrInputDocumentList(和作品也是在集羣模式)?

回答

0

正如我在Spark郵件列表中提到的那樣: 我對Solr API並不熟悉,但是假設'SolrIndexerDriver'是單身人士,我猜在羣集上運行時發生的情況是:

SolrIndexerDriver.solrInputDocumentList.add(elem) 

是在SolrIndexerDriver不同的單實例發生在不同的JVM而

SolrIndexerDriver.solrServer.commit 

是發生在驅動程序。

實際上,執行者列表正在填寫,但他們從未提交過,而在驅動程序上則相反。

推薦的方式來處理,這是使用foreachPartition這樣的:

rdd.foreachPartition{iter => 
    // prepare connection 
    Stuff.connect(...) 
    // add elements 
    iter.foreach(elem => Stuff.add(elem)) 
    // submit 
    Stuff.commit() 
} 

這種方式,你可以在每個執行人的當地情況添加的每個分區的數據並提交結果。請注意,此添加/提交必須是線程安全的,以避免數據丟失或損壞。

0

您是否在spark UI中檢查過該行爲以查看此作業的執行計劃。 檢查它是如何分裂成階段和它們的依賴關係。這應該會給你一個想法,希望。

相關問題