我目前正在研究PySpark作業(Spark 2.2.0),它打算根據一組文檔訓練潛在Dirichlet分配模型。輸入文檔作爲位於Google雲端存儲上的CSV文件提供。如何在多個節點上正確地並行化pyspark作業並避免內存問題?
以下代碼在單個節點上成功運行Google Cloud Dataproc集羣(4vCPU/15GB內存),文檔的一個小子集(〜6500),生成的主題數量較少(10),數量較少迭代(100)。 然而,對於主題數量或迭代次數較多的文檔或更高值的其他嘗試很快導致內存問題和作業失敗。另外,將這個作業提交給4節點集羣時,我可以看到只有一個工作節點實際上正在工作(CPU使用率爲30%),這讓我認爲代碼沒有針對並行處理進行適當的優化。
代碼
conf = pyspark.SparkConf().setAppName("lda-training")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# CSV schema declaration
csv_schema = StructType([StructField("doc_id", StringType(), True), # id of the document
StructField("cleaned_content", StringType(), True)]) # cleaned text content (used for LDA)
# Step 1: Load CSV
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema)
print("{} document(s) loaded".format(doc_df.count()))
# This prints "25000 document(s) loaded"
print("{} partitions".format(doc_df.rdd.getNumPartitions()))
# This prints "1"
# Step 2: Extracting words
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType()))
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"]))
# Step 3: Generate count vectors (BOW) for each document
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
vectorizer_model = count_vectorizer.fit(doc_df)
dataset = vectorizer_model.transform(doc_df)
# Instantiate LDA model
lda = LDA(k=100, # number of topics
optimizer="online", # 'online' or 'em'
maxIter=100,
featuresCol="features",
topicConcentration=0.01, # beta
optimizeDocConcentration=True, # alpha
learningOffset=2.0,
learningDecay=0.8,
checkpointInterval=10,
keepLastCheckpoint=True)
# Step 4: Train LDA model on corpus (this is the long part of the job)
lda_model = lda.fit(dataset)
# Save LDA model to Cloud Storage
lda_model.write().overwrite().save("gs://...")
貝婁是已經遇到的警告和錯誤消息的例子:
WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
問題
- 有沒有可以做任何代碼優化本身以確保其可擴展性?
- 我們如何讓Spark在所有工作節點上分配作業,並希望避免內存問題?
我確實可以驗證增加分區數量允許Spark將工作分配給多個工作人員。 考慮到這一點,我仍然遇到很多內存問題(無論是YARN爲超過內存限制而死亡的容器還是java.lang.OutOfMemoryError:Java堆空間)。 作爲輸入的CSV文件的權重〜600MB。對於如何估計正確的分區數量與工作節點的大小(CPU核心數+內存數量),您有任何建議嗎? –