2016-05-09 65 views
1

我試圖用寫Twitter stream到Elasticsearch 2.3索引所提供的Elasticsearch2 connectorElasticsearch連接器工作在IDE而不是在本地集羣

運行我在的IntelliJ的工作工作正常,但當我在本地運行的jar工作集羣出現以下錯誤:

05/09/2016 13:26:58 Job execution switched to status RUNNING. 
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to SCHEDULED 
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to DEPLOYING 
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to RUNNING 
05/09/2016 13:26:59 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to FAILED 
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes! 
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) 
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) 
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) 
    at java.lang.Thread.run(Thread.java:745) 

05/09/2016 13:26:59 Job execution switched to status FAILING. 
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes! 
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) 
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) 
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) 
    at java.lang.Thread.run(Thread.java:745) 
05/09/2016 13:26:59 Job execution switched to status FAILED. 

------------------------------------------------------------ 
The program finished with the following exception: 

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. 
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381) 
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355) 
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) 
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541) 
    at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69) 
    at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) 
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) 
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248) 
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) 
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) 
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) 
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) 
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes! 
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) 
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) 
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) 
    at java.lang.Thread.run(Thread.java:745) 

我的代碼中階:

val config = new java.util.HashMap[String, String] 
     config.put("bulk.flush.max.actions", "1") 
     config.put("cluster.name", "elasticsearch") 
     config.put("node.name", "node-1") 

     config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2/bin") 
     val transports = new util.ArrayList[InetSocketAddress] 
     transports.add(new InetSocketAddress(InetAddress.getLocalHost(),9300)) 
    transports.add(new InetSocketAddress(InetAddress.getLoopbackAddress(),9300)) 
    transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300)) 
    transports.add(new InetSocketAddress(InetAddress.getByName("localhost"),9300)) 
    stream.addSink(new ElasticsearchSink(config, transports, new ElasticSearchSinkTwitter())) 

是什麼銀行經營之間的區別從IDE和本地集羣中啓動該程序?

+0

你解決了這個問題嗎?我也遇到同樣的問題。請給出你的意見。 – Kumar

回答

0

這樣的問題通常是由IDE管理/包含依賴項(IntelliJ,Eclipse)和Flink通過胖罐提交的作業的不同方式引起的。

我有同樣的問題,有一天,在任務管理器的日誌文件顯示了以下根源:

java.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. You need to add the corresponding JAR file supporting this SPI to your classpath. The current classpath supports the following names: [es090, completion090, XBloomFilter]

搜索錯誤我發現原來這答案就解決該問題:

https://stackoverflow.com/a/38354027/3609571

通過添加下面的依賴關係到我的pom.xml

<dependency> 
    <groupId>org.apache.lucene</groupId> 
    <artifactId>lucene-core</artifactId> 
    <version>5.4.1</version> 
</dependency> 

請注意,在這種情況下依賴關係的順序很重要。它只在將lucene-core依賴關係置於頂端時才起作用。將它添加到最後並不適合我。所以這更像一個「黑客」而不是一個適當的修復。

相關問題