2016-03-25 88 views
4

我想在pyspark中使用matplotlib.bblpath或shapely.geometry庫。如何在pyspark中獲取python庫?

當我嘗試導入任何人的,我得到下面的錯誤:

>>> from shapely.geometry import polygon 
Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
ImportError: No module named shapely.geometry 

我知道這個模塊不存在,但我想知道如何能夠將這些包被帶到我pyspark庫。

+1

'PIP安裝shapely' –

+0

我想安裝在pyspark中,而不是在我的本地機器上。這個命令在pyspark shell中不起作用。 – nakulchawla09

+0

這是http://stackoverflow.com/q/29495435/1711188 –

回答

7

在Spark情況下嘗試使用:

SparkContext.addPyFile("module.py") # also .zip 

,從docs報價:

Add a .py or .zip dependency for all tasks to be executed on this SparkContext in the future. The path passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.

+1

的一個可能的副本我能夠添加此依賴項。當我進行火花提交時,有沒有辦法做到這一點。 我正在做一個file.py的spark-submit,在那個文件中我應該在做addPyFile(「module.py」),還是有辦法通過向spark-submit命令添加一個參數來添加依賴關係 – nakulchawla09

+0

從Spark doc(https://spark.apache.org/docs/1.1.0/submitting-applications。html)通過參數添加一個py文件似乎是可行的(把它放在搜索路徑中)。不過,我不知道PySpark的submition API有什麼不同。 – armatita

+0

好吧我會在一個參數和我的文件中嘗試它。兩種方式看看有什麼作用。 – nakulchawla09

1

這是在獨立(即筆記本電腦/臺式機)或在集羣環境中(例如AWS EMR )?

  1. 如果你的筆記本/臺式機上,pip install shapely應該只是罰款。您可能需要檢查您的默認Python環境的環境變量。例如,如果您通常使用Python 3,但使用Python 2作爲pyspark,那麼您將不會適合pyspark。

  2. 如果在集羣環境中,例如在AWS EMR,您可以嘗試:

    import os 
    
    def myfun(x):` 
         os.system("pip install shapely") 
         return x 
    rdd = sc.parallelize([1,2,3,4]) ## assuming 4 worker nodes 
    rdd.map(lambda x: myfun(x)).collect() 
    ## call each cluster to run the code to import the library 
    

「我知道該模塊是不存在的,但我想知道這些包是如何能帶到我的pyspark圖書館。「

在EMR上,如果您希望使用任何其他庫和配置來預先準備pyspark,則可以使用引導步驟進行這些調整。除此之外,如果不在Scala中編譯Spark(如果您不熟悉SBT,那將是一件很痛苦的事情),您不能「添加」庫到pyspark。

+0

與此問題是,如果節點3正在使用,則無法在節點3上安裝軟件包。 – user48956

+0

您可以在EMR啓動時使用bash腳本(希望您在AWS上使用EMR)來安裝所有需要的庫。這是「引導程序安裝步驟」 – Jon

+0

@ user48956在更新所需內容之前,您不得導入任何可能會更新的第三方程序包。 –

4

這就是我如何在我們的AWS EMR羣集中工作(它在任何其他羣集中也應如此)。我創建了下面的shell腳本並執行它作爲一個自舉操作:

#!/bin/bash 
# shapely installation 
wget http://download.osgeo.org/geos/geos-3.5.0.tar.bz2 
tar jxf geos-3.5.0.tar.bz2 
cd geos-3.5.0 && ./configure --prefix=$HOME/geos-bin && make && make install 
sudo cp /home/hadoop/geos-bin/lib/* /usr/lib 
sudo /bin/sh -c 'echo "/usr/lib" >> /etc/ld.so.conf' 
sudo /bin/sh -c 'echo "/usr/lib/local" >> /etc/ld.so.conf' 
sudo /sbin/ldconfig 
sudo /bin/sh -c 'echo -e "\nexport LD_LIBRARY_PATH=/usr/lib" >> /home/hadoop/.bashrc' 
source /home/hadoop/.bashrc 
sudo pip install shapely 
echo "Shapely installation complete" 
pip install https://pypi.python.org/packages/74/84/fa80c5e92854c7456b591f6e797c5be18315994afd3ef16a58694e1b5eb1/Geohash-1.0.tar.gz 
# 
exit 0 

注意:除了作爲運行該腳本可以獨立於集羣中的每個節點執行的自舉操作。我測試了兩種情況。

下面是一個示例pyspark和勻稱的代碼(火花SQL UDF),以確保上述命令是否按預期工作:

Python 2.7.10 (default, Dec 8 2015, 18:25:23) 
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2 
Type "help", "copyright", "credits" or "license" for more information. 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /__/.__/\_,_/_/ /_/\_\ version 1.6.1 
     /_/ 

Using Python version 2.7.10 (default, Dec 8 2015 18:25:23) 
SparkContext available as sc, HiveContext available as sqlContext. 
>>> from pyspark.sql.functions import udf 
>>> from pyspark.sql.types import StringType 
>>> from shapely.wkt import loads as load_wkt 
>>> def parse_region(region): 
...  from shapely.wkt import loads as load_wkt 
...  reverse_coordinate = lambda coord: ' '.join(reversed(coord.split(':'))) 
...  coordinate_list = map(reverse_coordinate, region.split(', ')) 
...  if coordinate_list[0] != coordinate_list[-1]: 
...   coordinate_list.append(coordinate_list[0]) 
...  return str(load_wkt('POLYGON ((%s))' % ','.join(coordinate_list)).wkt) 
... 
>>> udf_parse_region=udf(parse_region, StringType()) 
16/09/06 22:18:34 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 
16/09/06 22:18:34 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 
>>> df = sqlContext.sql('select id, bounds from <schema.table_name> limit 10') 
>>> df2 = df.withColumn('bounds1', udf_parse_region('bounds')) 
>>> df2.first() 
Row(id=u'0089d43a-1b42-4fba-80d6-dda2552ee08e', bounds=u'33.42838509594465:-119.0533447265625, 33.39170168789402:-119.0203857421875, 33.29992542601392:-119.0478515625', bounds1=u'POLYGON ((-119.0533447265625 33.42838509594465, -119.0203857421875 33.39170168789402, -119.0478515625 33.29992542601392, -119.0533447265625 33.42838509594465))') 
>>> 

感謝, 侯賽因Bohra