2017-05-10 48 views
0

在我的代碼中,我嘗試使用env變量在URL處實例化redis-py 連接。問題是,當我使用foreach or foreachPartition時,在#save_on_redis方法中無法識別env變量。Pyspark無法識別env變量作爲參數傳遞給foreach或foreachPartition的方法

我只是嘗試之外創建Redis的關係,但我收到「pickle.PicklingError:無法鹹菜‘鎖定’目標」,因爲火花嘗試運行這兩種方法,在同一時間,所有節點上。

Question: How I can use env variables on the method passed as argument to foreach or foreachPartition ?

import os 
from pyspark.sql import SparkSession 
import redis 

spark = (SparkSession 
     .builder 
     .getOrCreate()) 

print "---------" 
print os.getenv("REDIS_REPORTS_URL") 
print "---------" 

def save_on_redis(row): 
    redis_ = redis.StrictRedis(host=os.getenv("REDIS_REPORTS_URL"), port=6379, db=0) 
    print os.getenv("REDIS_REPORTS_URL") 
    print redis_ 
    redis_.set("#teste#", "fagner") 


df = spark.createDataFrame([(0,1), (0,1), (0,2)], ["id", "score"]) 
df.foreach(save_on_redis) 

回答

0

我建議你讓環境變量在你的驅動程序,並把它作爲一個Python變量工作進程,在那裏你可以使用os.putenv

示例設置環境:

In [1]: import os 

In [2]: a = sc.parallelize(range(20)) 

In [3]: os.getenv('MY_VAR') 
Out[3]: 'some_value' 

In [4]: def f(iter): 
    import os 
    return (str(os.getenv('MY_VAR')),) 
    ...: 

In [5]: a.mapPartitions(f).collect() 
Out[5]: ['None', 'None'] 

In [6]: my_var = os.getenv('MY_VAR') 

In [6]: def f2(iter): 
    import os 
    from subprocess import check_output 
    os.putenv('MY_VAR', my_var) 
    return (check_output('env | grep MY_VAR', shell=True), my_var) 
    ....: 

In [7]: a.mapPartitions(f2).collect() 
Out[7]: 
['MY_VAR=some_value\n', 
'some_value', 
'MY_VAR=some_value\n', 
'some_value'] 

PS。根據this answer,最好直接修改映射對象而不是使用os.putenv

相關問題