我是python和Spark的新手,在這裏我試圖播放spark Rtree索引。當我嘗試用mapPartitions廣播函數索引它的錯誤了與以下錯誤Apache Spark廣播變量給出mappartitions的錯誤
在Windows上:
文件 「avlClass.py」,第42行,在avlFileLine
for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters
buffer_size,y_meters+buffer_size]):
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 440, in in
tersection p_mins, p_maxs = self.get_coordinate_pointers(coordinates)
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 294, in ge
t_coordinate_pointers
dimension = self.properties.dimension
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 883, in ge
t_dimension
return core.rt.IndexProperty_GetDimension(self.handle)
indowsError: exception: access violation reading 0x00000004
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala
166)
在Linux中:
ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
文件:avlClass.py
import fiona
from shapely.geometry import shape,Point, LineString, Polygon
from shapely.ops import transform
from rtree import index
from numpy import math
import os
import pyproj
from functools import partial
from pyspark import SparkContext, SparkConf
class avlClass(object):
def __init__(self,name):
self.name=name
def create_index(self):
# Read the ESRI Shape File
shapeFileName='C:\\shapefiles\\Road.shp'
polygons= [ pol for pol in fiona.open(shapeFileName,'r') ]
p=index.Property()
p.dimension=2
self_idx=index.Index(property=p)
# Create Index Entries
for pos,features in enumerate(polygons):
self_idx.insert(pos,LineString(features['geometry'] ['coordinates']).bounds)
return self_idx
def avlFileLine(self,iter,bv):
for line in iter:
splits =line.split(',')
lat= float(splits[2])
long= float(splits[3])
print lat,long
x='No'
# Test the index from broadcast Variable bv
buffer_size=10
x_meters=-9511983.32151
y_meters=4554613.80307
for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]):
x= "FOUND"
yield lat,long,heading_radians,x
文件:avlSpark.py
import fiona
from shapely.geometry import shape,Point, LineString, Polygon
from shapely.ops import transform
from rtree import index
from numpy import math
import os
import pyproj
from functools import partial
from pyspark import SparkContext, SparkConf
from avlClass import avlClass
if __name__ == '__main__':
conf = SparkConf().setAppName('AVL_Spark_Job')
conf = SparkConf().setMaster('local[*]')
sc= SparkContext(conf=conf)
sc.addPyFile("avlClass.py")
test_avlClass=avlClass("Test")
print test_avlClass.name
idx= test_avlClass.create_index()
# Test the created index
buffer_size=10
x_meters=-9511983.32151
y_meters=4554613.80307
for j in idx.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]):
print "FOUND" # Index Worked
# broadcast Index for Partitions
idx2=sc.broadcast(idx)
FileName='c:\\test\\file1.txt'
avlFile=sc.textFile(FileName).mapPartitions(lambda line: test_avlClass.avlFileLine(line,idx2.value))
for line in avlFile.take(10):
print line
1.將IDX2.value作爲參數(bv)傳遞給映射函數,如下所示:File = sc.textFile(FileName).mapPartitions(lambda line:test_avlClass.avlFileLine(line,idx2.value)) –
我只是在平板電腦上看到這一點,稍後會在我的計算機上,但通常您會編輯您的問題以添加此:)而且我看到您將idx2.value傳遞到avlFileLine中,但沒有看到它實際聲明的位置和賦值。 – JimLohse
3. index.py中的erros來自我已下載的RTree模塊。在爲Spark寫這篇文章之前,我嘗試了python,它在python中工作。 –