我想處理一些在Spark獨立模式下運行的代碼以及在羣集上運行的Spark。基本上,對於RDD中的每個項目,我試圖將其添加到列表中,一旦完成,我想將此列表發送到Solr。如何處理在Apache Spark中的foreach塊之前運行的代碼?
當我在Spark的獨立模式下運行以下代碼時,此功能完全正常,但在集羣上運行相同的代碼時無效。當我在集羣上運行相同的代碼時,就像「發送到Solr」一樣,代碼的一部分在要發送到Solr的列表中填充項目之前執行。我嘗試在foreach
之後強制執行solrInputDocumentJavaRDD.collect();
,但它似乎沒有任何效果。
// For each RDD
solrInputDocumentJavaDStream.foreachRDD(
new Function<JavaRDD<SolrInputDocument>, Void>() {
@Override
public Void call(JavaRDD<SolrInputDocument> solrInputDocumentJavaRDD) throws Exception {
// For each item in a single RDD
solrInputDocumentJavaRDD.foreach(
new VoidFunction<SolrInputDocument>() {
@Override
public void call(SolrInputDocument solrInputDocument) {
// Add the solrInputDocument to the list of SolrInputDocuments
SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
}
});
// Try to force execution
solrInputDocumentJavaRDD.collect();
// After having finished adding every SolrInputDocument to the list
// add it to the solrServer, and commit, waiting for the commit to be flushed
try {
if (SolrIndexerDriver.solrInputDocumentList != null
&& SolrIndexerDriver.solrInputDocumentList.size() > 0) {
SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
SolrIndexerDriver.solrServer.commit(true, true);
SolrIndexerDriver.solrInputDocumentList.clear();
}
} catch (SolrServerException | IOException e) {
e.printStackTrace();
}
return null;
}
}
);
我應該怎麼做,使發送到Solr的部分SolrDocuments名單後執行添加到solrInputDocumentList
(和作品也是在集羣模式)?