2014-11-21 87 views
5

我想在spark中執行我的數據的geoip查找。爲此,我使用MaxMind的geoIP數據庫。如何在spark中執行初始化?

我想要做的是在每個分區上初始化一次geoip數據庫對象,然後用它來查找與IP地址相關的城市。

spark對每個節點都有一個初始化階段,還是應該檢查一個實例變量是否未定義?如果是,請在繼續之前對它進行初始化?例如。類似的信息(這是蟒蛇,但我希望有一個解決方案階):

class IPLookup(object): 
    database = None 

    def getCity(self, ip): 
     if not database: 
     self.database = self.initialise(geoipPath) 
    ... 

當然,這樣做需要的火花將連載整個對象,該文檔警告反對的東西。

回答

1

這似乎是廣播變量的一個很好的用法。你有沒有看過該功能的文檔,如果你有它不能滿足你的要求?

+1

我試過使用廣播變量。但它沒有奏效。可能是因爲com.maxmind.geoip.LookupService不可序列化。我嘗試使用SparContext.addFile方法,而且工作正常。添加文件GeoIPCity.dat和GeoIPASNum.dat – 2015-03-10 18:46:16

5

在火花,每分區中的操作可以是使用做:

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false) 

此映射器將在元件的一個迭代執行每個分區一次函數f。這個想法是,設置資源(如數據庫連接)的成本將通過迭代器中多個元素的使用而抵消。

例子:

val logsRDD = ??? 
logsRDD.mapPartitions{iter => 
    val geoIp = new GeoIPLookupDB(...) 
    // this is local map over the iterator - do not confuse with rdd.map 
    iter.map(elem => (geoIp.resolve(elem.ip),elem)) 
} 
+0

一個很好的解決方案,但在這種情況下,我想在多個操作中重用該對象,所以廣播變量對我來說看起來更有用。 – jbrown 2014-11-25 10:14:56

0

由於@bearrito提到的 - 你可以使用加載地理數據庫,然後從驅動器播放。 另一個需要考慮的選擇是提供可用於查找的外部服務。它可能是內存緩存,如Redis/Memcached/Tacheyon或常規數據存儲。