2016-01-18 71 views
1

我是python和Spark的新手,在這裏我試圖播放spark Rtree索引。當我嘗試用mapPartitions廣播函數索引它的錯誤了與以下錯誤Apache Spark廣播變量給出mappartitions的錯誤

在Windows上:

文件 「avlClass.py」,第42行,在avlFileLine

for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters 
buffer_size,y_meters+buffer_size]): 
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 440, in in 
tersection p_mins, p_maxs = self.get_coordinate_pointers(coordinates) 
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 294, in ge 
t_coordinate_pointers 
dimension = self.properties.dimension 
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 883, in ge 
t_dimension 
return core.rt.IndexProperty_GetDimension(self.handle) 
indowsError: exception: access violation reading 0x00000004 
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala 
166) 

在Linux中:

ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.net.SocketException: Connection reset 
at java.net.SocketInputStream.read(SocketInputStream.java:196) 
at java.net.SocketInputStream.read(SocketInputStream.java:122) 
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) 
at java.io.BufferedInputStream.read(BufferedInputStream.java:254) 
at java.io.DataInputStream.readInt(DataInputStream.java:387) 

文件:avlClass.py

import fiona 
from shapely.geometry import shape,Point, LineString, Polygon 
from shapely.ops import transform 
from rtree import index 
from numpy import math 
import os 
import pyproj 
from functools import partial 
from pyspark import SparkContext, SparkConf 

class avlClass(object): 
    def __init__(self,name): 
     self.name=name 

    def create_index(self): 
     # Read the ESRI Shape File 
     shapeFileName='C:\\shapefiles\\Road.shp' 
     polygons= [ pol for pol in fiona.open(shapeFileName,'r') ] 
     p=index.Property() 
     p.dimension=2 
     self_idx=index.Index(property=p) 
     # Create Index Entries 
     for pos,features in enumerate(polygons): 
      self_idx.insert(pos,LineString(features['geometry'] ['coordinates']).bounds) 
     return self_idx 


    def avlFileLine(self,iter,bv): 
     for line in iter: 
      splits =line.split(',') 
      lat= float(splits[2]) 
      long= float(splits[3]) 
      print lat,long 
      x='No' 

      # Test the index from broadcast Variable bv 
      buffer_size=10 
      x_meters=-9511983.32151 
      y_meters=4554613.80307 
      for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]): 
       x= "FOUND" 

      yield lat,long,heading_radians,x 

文件:avlSpark.py

import fiona 
from shapely.geometry import shape,Point, LineString, Polygon 
from shapely.ops import transform 
from rtree import index 
from numpy import math 
import os 
import pyproj 
from functools import partial 
from pyspark import SparkContext, SparkConf 
from avlClass import avlClass 

if __name__ == '__main__': 
    conf = SparkConf().setAppName('AVL_Spark_Job') 
    conf = SparkConf().setMaster('local[*]') 
    sc= SparkContext(conf=conf) 

    sc.addPyFile("avlClass.py") 
    test_avlClass=avlClass("Test") 

    print test_avlClass.name 
    idx= test_avlClass.create_index() 

    # Test the created index 
    buffer_size=10 
    x_meters=-9511983.32151 
    y_meters=4554613.80307 
    for j in idx.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]): 
     print "FOUND" # Index Worked 

    # broadcast Index for Partitions 
    idx2=sc.broadcast(idx) 



    FileName='c:\\test\\file1.txt' 
    avlFile=sc.textFile(FileName).mapPartitions(lambda line: test_avlClass.avlFileLine(line,idx2.value)) 
    for line in avlFile.take(10): 
    print line 

回答

0

我所看到的是你正在創建一個廣播VAR:

# broadcast Index for Partitions 
idx2=sc.broadcast(idx) 

但隨後其傳遞到.value的AvlFileLine:

avlFile=sc.textFile(FileName).mapPartitions(lambda line: test_avlClass.avlFileLine(line,idx2.value)) 

但idx或idx2都不是RDD。 IDX2,作爲廣播變量,will take on whatever class idx is. (I actually asked this question based on your question :)

你還在治療是傳遞的參數作爲廣播變種,但隨後試圖把它當作一個RDD,presumably a PythonRDDwhich as noted, it's not.廣播變量不是RDD,它的分配只是任何類型到它。此外,您將其值(使用.value())傳遞給AVLFileLine。

所以當你調用intersection()時,它會爆炸。我很驚訝,它並沒有炸得更好,但我在Java中工作,編譯器會捕獲這個,我假設在Python中解釋器只是愉快地運行,直到它遇到一個不好的內存位置,並且你得到那個醜陋的錯誤信息: )

我認爲最好的方法是從一開始就重新考慮你的代碼,這不正確的使用Spark。我不太瞭解你的具體應用,所以我最好的猜測是你需要放棄交集(),而不是look again at the RDD programming guide part of the Spark docs for Python。找到一種方法來將您的value應用於idx2avlfile,這是一個RDD。您需要避免傳遞函數中的任何for循環,Spark將通過將所傳遞的任何函數應用於RDD的每個元素來爲您執行「for」循環。記住結果將是另一個RDD。

僞的Java代碼它看起來像:

SomeArray theArray = avlfile.map({declare inline or call function}).collect(<if the RDD is not too big for collect>) 

我希望幫助,如果您還沒有這樣做,a great book is Learning Spark by O'Reillythe sample code for the book,它是從the Apache Spark Docs下一個步驟。學習Spark可以租用10美元以下,在我的情況下,我可以通過Safari Books作爲大學生免費獲得。

如果你不習慣用函數式編程來思考,編寫Spark程序有一個陡峭的學習曲線,我並不比你遠,恕我直言,你還不完全理解Spark編程模型。我希望這一切都有所幫助。


而且在原來這個編輯注意到回答SparkConf你的電話是錯誤的,I had to go back a ways in the docs (.9)找到例子,但是你想是這樣的:

from pyspark import SparkConf, SparkContext 
conf = (SparkConf() 
     .setMaster("local") 
     .setAppName("My app") 
     .set("spark.executor.memory", "1g")) 
sc = SparkContext(conf = conf) 

從上獨立的程序段..現在我相信你的第二個任務是conf覆蓋第一個。


摘要:我不明白,你會打電話RDD功能上broadcast variable, a broadcast variable is NOT an RDD but simply a data structure like a global that is read(而不是寫入)的所有工人。 Per the Broadcast class in Scala

From the docs on Broadcast Variables

>>> broadcastVar = sc.broadcast([1, 2, 3]) 
<pyspark.broadcast.Broadcast object at 0x102789f10> 

>>> broadcastVar.value 
[1, 2, 3] 

它看起來對我來說,呼籲BV路口()(這是不是一個RDD)沒有任何意義。

+0

1.將IDX2.value作爲參數(bv)傳遞給映射函數,如下所示:File = sc.textFile(FileName).mapPartitions(lambda line:test_avlClass.avlFileLine(line,idx2.value)) –

+0

我只是在平板電腦上看到這一點,稍後會在我的計算機上,但通常您會編輯您的問題以添加此:)而且我看到您將idx2.value傳遞到avlFileLine中,但沒有看到它實際聲明的位置和賦值。 – JimLohse

+0

3. index.py中的erros來自我已下載的RTree模塊。在爲Spark寫這篇文章之前,我嘗試了python,它在python中工作。 –

相關問題