在我去了以下解決方案結束(蟒蛇):
import os
import tarfile
from io import BytesIO
from pyspark.sql import SparkSession
# Get the spark app.
spark = SparkSession.builder.appName("my-spark-app").getOrCreate()
# Get the executor working directories.
spark_home = os.environ.get('SPARK_HOME')
if spark_home:
num_workers = 0
with open(os.path.join(spark_home, 'conf', 'slaves'), 'r') as f:
for line in f:
num_workers += 1
if num_workers:
executor_logs_path = '/where/to/store/executor_logs'
def _map(worker):
'''Returns the list of tuples of the name and the tar.gz of the worker log directory in binary format
for the corresponding worker.
'''
flo = BytesIO()
with tarfile.open(fileobj=flo, mode="w:gz") as tar:
tar.add(os.path.join(spark_home, 'work'), arcname='work')
return [('worker_%d_dir.tar.gz' % worker, flo.getvalue()),]
def _reduce(worker1, worker2):
'''Appends the worker name and its log tar.gz's into the list.
'''
worker1.extend(worker2)
return worker1
os.makedirs(executor_logs_path)
logs = spark.sparkContext.parallelize(range(num_workers), num_workers).map(_map).reduce(_reduce)
with tarfile.open(os.path.join(executor_logs_path, 'logs.tar'), 'w') as tar:
for name, data in logs:
info = tarfile.TarInfo(name=name)
info.size=len(data)
tar.addfile(tarinfo=info, fileobj=BytesIO(data))
一對夫婦的憂慮,但:
- 不知道,如果使用的map-reduce技術是最好的方式收集日誌
- 文件(tarball)正在內存中創建,所以根據您的應用程序它可以粉碎如果文件太大
- 也許有更好的確定工人人數的方法