1

我目前正在研究PySpark作業(Spark 2.2.0),它打算根據一組文檔訓練潛在Dirichlet分配模型。輸入文檔作爲位於Google雲端存儲上的CSV文件提供。如何在多個節點上正確地並行化pyspark作業並避免內存問題?

以下代碼在單個節點上成功運行Google Cloud Dataproc集羣(4vCPU/15GB內存),文檔的一個小子集(〜6500),生成的主題數量較少(10),數量較少迭代(100)。 然而,對於主題數量或迭代次數較多的文檔或更高值的其他嘗試很快導致內存問題和作業失敗。另外,將這個作業提交給4節點集羣時,我可以看到只有一個工作節點實際上正在工作(CPU使用率爲30%),這讓我認爲代碼沒有針對並行處理進行適當的優化。

代碼

conf = pyspark.SparkConf().setAppName("lda-training") 
spark = SparkSession.builder.config(conf=conf).getOrCreate() 

# CSV schema declaration 
csv_schema = StructType([StructField("doc_id", StringType(), True), # id of the document 
         StructField("cleaned_content", StringType(), True)]) # cleaned text content (used for LDA) 

# Step 1: Load CSV 
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema) 

print("{} document(s) loaded".format(doc_df.count())) 
# This prints "25000 document(s) loaded" 

print("{} partitions".format(doc_df.rdd.getNumPartitions())) 
# This prints "1" 

# Step 2: Extracting words 
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType())) 
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"])) 

# Step 3: Generate count vectors (BOW) for each document 
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features") 
vectorizer_model = count_vectorizer.fit(doc_df) 
dataset = vectorizer_model.transform(doc_df) 

# Instantiate LDA model 
lda = LDA(k=100, # number of topics 
      optimizer="online", # 'online' or 'em' 
      maxIter=100, 
      featuresCol="features", 
      topicConcentration=0.01, # beta 
      optimizeDocConcentration=True, # alpha 
      learningOffset=2.0, 
      learningDecay=0.8, 
      checkpointInterval=10, 
      keepLastCheckpoint=True) 

# Step 4: Train LDA model on corpus (this is the long part of the job) 
lda_model = lda.fit(dataset) 

# Save LDA model to Cloud Storage 
lda_model.write().overwrite().save("gs://...") 

貝婁是已經遇到的警告和錯誤消息的例子:

WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB. 
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB. 
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

問題

  • 有沒有可以做任何代碼優化本身以確保其可擴展性?
  • 我們如何讓Spark在所有工作節點上分配作業,並希望避免內存問題?

回答

0

如果您的輸入數據大小很小,即使您的管道最終對小數據進行密集計算,基於大小的分區也會導致可伸縮性分區太少。由於您的getNumPartitions()列印了1,這表示Spark將最多使用1個執行程序核心來處理該數據,這就是爲什麼您只看到一個工作節點正常工作的原因。

你可以試着改變你的初始spark.read.csv線包括repartition底:

doc_df = spark.read.csv(path="gs://...", ...).repartition(32) 

然後你就可以驗證它沒有你所期望的通過在更晚的線看到getNumPartitions()打印32

+0

我確實可以驗證增加分區數量允許Spark將工作分配給多個工作人員。 考慮到這一點,我仍然遇到很多內存問題(無論是YARN爲超過內存限制而死亡的容器還是java.lang.OutOfMemoryError:Java堆空間)。 作爲輸入的CSV文件的權重〜600MB。對於如何估計正確的分區數量與工作節點的大小(CPU核心數+內存數量),您有任何建議嗎? –

相關問題