2015-05-13 91 views
11

它是由星火文檔約Scheduling Within an Application理解:如何在PySpark中的獨立線程在一個Sparkcontext中運行多個作業?

內的規定星火應用(例如SparkContext),並行任務可如果他們是從單獨的線程提交的同時運行。通過「作業」,在本節中,我們的意思是Spark操作(例如保存,收集)以及需要運行以評估該操作的任何任務。斯巴克的調度是線程安全的,並支持這種使用情況,使服務於多個請求(針對多個用戶如查詢)的應用程序。」

我能找到在斯卡拉和Java一樣的幾個示例代碼。 能有人舉例說明了如何使用PySpark來實現這個功能嗎?

+0

自從得到答案嗎?我試圖做同樣的事情,並認爲它實際上是不可能的,直到更好的鎖定被添加到'SparkContext's。 –

+0

@MikeSukmanowsky你是什麼意思?這篇文檔沒有提到特定的Spark API,它似乎適用於所有這些API。使用任何API時運行的實際代碼是Scala代碼以及Java和Python的一些接口代碼。 – Dici

+0

你能提供這個聲明來自哪裏的鏈接嗎? – Jon

回答

2

今天,我問我同樣的情況,多處理模塊提供ThreadPool,它爲您產生了一些線程,因此可以並行運行這些工作。實例化函數,然後創建池,然後在想要迭代的範圍上創建它。

在我的情況下,我計算了不同數量的中心(超參數調整)的這些WSSSE數字,以獲得「良好」的k均值聚類...就像它在MLSpark documentation中概述的那樣。如果沒有進一步的解釋,這裏有一些細胞從我IPython的工作表:

from pyspark.mllib.clustering import KMeans 
import numpy as np 

c_points被12dim陣列:

>>> c_points.cache() 
>>> c_points.take(3) 
[array([ 1, -1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]), 
array([-2, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]), 
array([ 7, -1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])] 

在下面,每個i我計算這一WSSSE值並返回它作爲一個元組:

def error(point, clusters): 
    center = clusters.centers[clusters.predict(point)] 
    return np.linalg.norm(point - center) 

def calc_wssse(i): 
    clusters = KMeans.train(c_points, i, maxIterations=20, 
     runs=20, initializationMode="random") 
    WSSSE = c_points\ 
     .map(lambda point: error(point, clusters))\ 
     .reduce(lambda x, y: x + y) 
    return (i, WSSSE) 

這裏開始有趣的部分:

from multiprocessing.pool import ThreadPool 
tpool = ThreadPool(processes=4) 

運行:

wssse_points = tpool.map(calc_wssse, range(1, 30)) 
wssse_points 

給出:

[(1, 195318509740785.66), 
(2, 77539612257334.33), 
(3, 78254073754531.1), 
... 
] 
+0

...出於好奇,我通過'%timeit'做了基準測試。串行執行(具有「脈動」行爲)花費了53.2秒,而具有4個線程的並行方式花費了16.2秒。所以,真的有區別。並行的更活躍的階段,並且總是有一些在隊列中。 –

+0

這不會增加競賽條件的可能性嗎? – Jon

+0

它可能取決於您操作的數據。既然你管理線程,你必須確保你不要自己提出競爭條件。 – Minutis

7

我也陷入了同樣的問題,所以我創建了一個很小的自包含的例子。我使用python的線程模塊創建多個線程並同時提交多個spark任務。

請注意,默認情況下,spark會按先進先出(FIFO)運行作業:http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application。在下面的例子中,我將其更改爲公平調度

# Prereqs: 
# set 
# spark.dynamicAllocation.enabled   true 
# spark.shuffle.service.enabled   true 
    spark.scheduler.mode     FAIR 
# in spark-defaults.conf 

import threading 
from pyspark import SparkContext, SparkConf 

def task(sc, i): 
    print sc.parallelize(range(i*10000)).count() 

def run_multiple_jobs(): 
    conf = SparkConf().setMaster('local[*]').setAppName('appname') 
    # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application 
    conf.set('spark.scheduler.mode', 'FAIR') 
    sc = SparkContext(conf=conf) 
    for i in range(4): 
    t = threading.Thread(target=task, args=(sc, i)) 
    t.start() 
    print 'spark task', i, 'has started' 


run_multiple_jobs() 

輸出:

spark task 0 has started 
spark task 1 has started 
spark task 2 has started 
spark task 3 has started 
30000 
0 
10000 
20000 
+0

任何想法是這是做到這一點的最佳方式?特別是如果你在一個羣集。由於sc仍然在master上,master會將其分發到worker節點上,所以我想這是否是最好的方法。 – nEO

+0

關於使用線程庫有一些很好的註釋[這裏](https://www.shanelynn.ie/using-python-threading-for-multiple-results-queue/),尤其是用於返回結果線程計算。 –

相關問題