2017-06-06 82 views
1

我有這樣的路徑下保存的這2個文件:PySpark覆蓋加入sc.addPyFile

C:\代碼\ SAMPLE1 \ main.py

def method(): 
    return "this is sample method 1" 

C:\代碼\ SAMPLE2 \ main.py

def method(): 
    return "this is sample method 2" 

然後我運行此:

from pyspark import SparkContext 
from pyspark.sql import SparkSession 

sc = SparkContext() 
spark = SparkSession(sc) 

sc.addPyFile("~/code/sample1/main.py") 
main1 = __import__("main") 
print(main1.method()) # this is sample method 1 

sc.addPyFile("~/code/sample2/main.py") # Error 

錯誤是

Py4JJavaError:調用o21.addFile時發生錯誤。文件C:\ Users \ hans.yulian \ AppData \ Local \ Temp \ spark-5da165cf-410f-4576-8124-0ab23aba6aa3 \ userFiles-25a7ca23-84fb-42b7-95d9-206867fb9dfd \ main.py存在並且與/C:/Users/hans.yulian/Documents/spark-test/main2/main.py的內容不匹配

這意味着它的臨時文件中已經有「main.py」文件文件夾和內容不同。我不知道是否有這種情況下,任何解決辦法,但對我來說,我有以下的限制:

  1. 的文件名還是要「main.py」,只有文件夾可以 不同
  2. 這沒關係以某種方式清除臨時文件夾添加AGA
  3. 在另一個文件中,我有唯一的解決方法是通過在main.py的前面附加隨機 字符串,例如abcdemain.pyfghijmain.py,那麼我會import main = __import __(「abcdemain」), ,但是這個人是不是真的最好

回答

1

雖然在技術上是可行的,通過設置spark.files.overwrite"true"

from pyspark import SparkConf, SparkContext 

sc = SparkContext(conf=SparkConf().set("spark.files.overwrite", "true")) 

,並在簡單的情況下,會給出正確的結果

def f(*_):                 
    from main import method 
    return [method()] 

sc.addFile("/path/to/sample1/main.py") 
sc.parallelize([], 3).mapPartitions(f).collect() 
['this is sample method 1', 
'this is sample method 1', 
'this is sample method 1'] 
sc.addFile("/path/to/sample2/main.py") 

sc.parallelize([], 3).mapPartitions(f).collect() 
['this is sample method 2', 
'this is sample method 2', 
'this is sample method 2'] 

它在實踐中將不可靠,即使您在每個訪問模塊上使用reload,也會使您的應用程序難以推理。由於Spark可能會隱式地緩存某些對象,或者透明地重新啓動Python工作器,所以在不同節點可以看到源的不同狀態的情況下,您可能會很容易結束。