2017-06-22 25 views
0

我正在開發一個火花流java應用程序(看起來scala對火花更好,但不幸的是我的時間有限,而且花費我太多時間來學習它,開始使用它),我需要在不可序列化的轉換函數中使用第三方類。解決這個問題的所有解決方案都是爲每個執行器JVM創建一個單例靜態對象,所有示例都使用實現單例模式的靜態包裝器方法在scala中。在我的應用程序中,我使用的是彈簧DI,所以我初始化一個單例對象,然後將其作爲類字段使用,使其成爲靜態,這與scala解決方案基本相同。問題是,出現一個奇怪的例外:在Spark流轉換中使用第三方不可序列化對象

Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at scala.Option.foreach(Option.scala:257) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1226) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1071) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1488) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1467) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1467) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1467) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$saveAsTextFiles$1$$anonfun$11.apply(DStream.scala:925) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$saveAsTextFiles$1$$anonfun$11.apply(DStream.scala:923) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
at scala.util.Try$.apply(Try.scala:192) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException: unexpected exception type 
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1582) 
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1154) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1817) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) 
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
... 3 more 
Caused by: java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1148) 
... 117 more 
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization 

所以有人能告訴我什麼是使用一個不可序列化的對象(沒有初始化它在每次執行)執行人節點上的正確方法嗎?

這裏是我使用有問題的依賴關係(DatabaseReader)的類。它是在執行節點上執行:

import com.maxmind.geoip2.DatabaseReader; 
import com.maxmind.geoip2.model.CityResponse; 
import com.qello.models.Event; 
import lombok.NonNull; 
import lombok.RequiredArgsConstructor; 

import java.net.InetAddress; 

@RequiredArgsConstructor 
public class GeoEnricher extends AbstractEnricher { 

    @NonNull 
    private static DatabaseReader geoIpDatabaseReader; 

    public GeoEnricher(Enricher nextEnricher, DatabaseReader geoIpDatabaseReader) { 
     super(nextEnricher); 
     this.geoIpDatabaseReader = geoIpDatabaseReader; 
    } 

    @Override 
    public void doEnrich(Event event) { 
     try { 
      CityResponse cityResponse = geoIpDatabaseReader.city(InetAddress.getByName(event.getIp())); 
      if (cityResponse != null) { 
       event.setContinent(cityResponse.getContinent().getName()); 
       event.setCountry(cityResponse.getCountry().getName()); 
       event.setLatitude(cityResponse.getLocation().getLatitude());   
       event.setLongitude(cityResponse.getLocation().getLongitude()); 
       event.setIpTimezone(cityResponse.getLocation().getTimeZone()); 
      } 
     } catch (Exception e) { 
     } 
    } 
} 

這裏是我得到的濃縮塔實例中的主要方法:

Enricher enricher = applicationContext.getBean(Enricher.class); 

這裏是使用它的轉型(事件從一個室壁運動消耗流,然後將轉換由):

kinesisStream 
    .flatMap(bytes -> { 
     return objectMapper.readValue(bytes, BatchEvent.class); 
    }) 
    .mapToPair(batchEvent -> new Tuple2<>(batchEvent.getEvents().get(0).getDeviceId(), batchEvent.getEvents())) 
    .map(deviceIdEvents -> { 
     for(Event event : deviceIdEvents._2) { 
      enricher.enrich(event); 
     } 

     return deviceIdEvents; 
    }) 
+1

關於斯卡拉星火,如果你有23個分:https://www.youtube.com/watch?v=LBoSgiLV_NQ – maasg

+1

春天DI不會幫助你,因爲執行者不有春天DI初始化讓你的單例實例化。你有代碼嗎?可能還有其他方法。 – maasg

+0

@maasg Spring DI未在執行程序上調用。我編輯與導致問題的類的職位。 –

回答

1

在最壞的情況下,您可以通過該數據是親每次實例化它的每個執行使用非序列化服務cessed。我們可以做到這一點使用mapPartition這樣的:

kinessStream 
.flatMap(...) 
.mapToPair(...) 
.mapPartitions(iterator -> { 
    Enricher enricher = new EnricherService(...); // instantiate my service locally 
                // should be able to create an instance *without* requiring DI 
    for(Event event : deviceIdEvents._2) { 
     enricher.enrich(event); // this is strongly discouraged. Avoid mutable state! 
    } 
    return deviceIdEvents; 
    // this should be a better option: 
    // return iterator.map(event -> enricher.enrich(event)); 
} 
}) 

一個更好的替代方法是使用被訪問時,每個執行初始化一個懶惰的單身對象。

僞代碼:

...stream setup... 
.mapPartitions(iterator -> { 
    Enricher enricher = EnricherSingleton.getInstance(); // get a local instance 
    ...same as above...  
}) 

單身替代的優勢在於,它能夠減少每次創建一個新實例的開銷,提供的服務實例是線程安全的或足夠的護理採取提供一個線程安全的替代(例如ThreadLocal變量)。 在任何情況下,mapPartitions操作會在每個分區中包含的潛在大量數據點之間分攤服務創建成本,並且執行仍然是並行化的。

如果沒有這些選項是可能的(思考DI框架只能注入對驅動程序的依賴關係),我們可以在本地使用該服務,代價是在我們的進程中丟失所有並行性。

transient Enricher enricher = applicationContext.getBean(Enricher.class); 
...stream setup... 
.foreachRDD(rdd -> { 
    Event[] events = rdd.collect(); // All data is sent to the driver. This is a bottleneck 
    Events[] enrichedEvents = events.map(event -> 
     enricher.enrich(event)); // suggesting to use a map function instead of mutating in-place. The mutation used in the question would work as well. 
    // do something with enrichedEvents 

} 
+0

第一個選項不可接受 - 對象初始化需要很長時間。第二個選項與我所做的(靜態初始化)非常相似,並且它繼續拋出相同的異常。第三種選擇也是不可接受的 - 我們正在談論大量的數據,我們不能失去並行性。這裏的壞事是,這個例外不是描述性的,我不明白實際問題在哪裏。 –

+0

@HristoAngelov我可以看到你使用的靜態初始化代碼嗎?它*必須*懶惰。初始化不能在驅動程序上進行。如果您使用DI,可能是錯誤的。 – maasg

+0

@HristoAngelov看到你的代碼(*),需要成爲單例的類是'GeoEnricher',然後應用選項(2)來使用它。 (*不應該發佈爲答案,而是作爲你的問題的編輯,順便說一句) – maasg

0

我做一個懶惰的初始化。下面的代碼:

private static DatabaseReader databaseReader = null; 

public static DatabaseReader getInstance() { 
    try { 
     if(databaseReader == null) { 
      databaseReader = new DatabaseReader.Builder(new File(SparkFiles.get("GeoLite2-City.mmdb"))) 
        .withCache(new CHMCache()) 
        .fileMode(Reader.FileMode.MEMORY) 
        .build(); 
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 

    return databaseReader; 
} 
相關問題