2016-07-03 48 views
1

新的卡夫卡/弗林克/斯卡拉/ SBT組合,並試圖建立以下疑難解答使用scala sbt的kafka + flink示例?

  • 多主題卡夫卡隊列
  • 弗林克數據流作業,使用Scala的罐子
  • Scala的罐子,從讀取數據一個主題,過程,然後將數據推送到另一個話題

Uptil現在

  • 能夠正確設置Kafka和Flink。
  • 能夠使用flink二進制附帶的Kafka.jar示例讀取kafka隊列。

能夠創造一個單詞計數罐子(感謝ipoteka)
現在試圖建立以流式單詞計數罐子,但運行到SBT發出
現在試圖在嘗試之前創建一個示例wordcount.jar實際的卡夫卡/火花流示例。
但遇到simSBT問題 任何想法我可以忽略。
也讓我知道,如果我有任何不必要的聲明。
如果有人分享一個簡單的程序來讀取/寫入kakfa隊列,也將不勝感激。

項目設置 -

|- project/plugins.sbt 
|- build.sbt 
|- src/main/scala/WordCount.scala 

build.sbt

name := "Kakfa-Flink Project" 

version := "1.0" 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" 

// Updated : Correction pointed by ipoteka 
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0.0" 

libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.0.0" 

libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.0.0" 

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.0.0" 

// for jar building 
mainClass in compile := Some("StreamWordCount") 

plugins.sbt

// *** creating fat jar 
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") 

WordCount.scala

package prog 

import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.api.scala.DataStream 
import org.apache.flink.streaming.api.windowing.time.Time 

object WordCount { 

    type WordCount = (String, Int) 

    def main(lines: DataStream[String], stopWords: Set[String], window: Time): DataStream[WordCount] = { 
    lines 
     .flatMap(line => line.split(" ")) 
     .filter(word => !word.isEmpty) 
     .map(word => word.toLowerCase) 
     .filter(word => !stopWords.contains(word)) 
     .map(word => (word, 1)) 
     .keyBy(0) 
     .timeWindow(window) 
     .sum(1) 
    } 

} 

StreamWordCount.scala

package prog 

import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 

import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.api.scala.DataStream 
import org.apache.flink.streaming.api.windowing.time.Time 



object Main { 
    def main(args: Array[String]) { 

    type WordCount = (String, Int) 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val properties = new Properties() 
    properties.setProperty("bootstrap.servers", "localhost:9092") 
    properties.setProperty("zookeeper.connect", "localhost:2181") 
    properties.setProperty("group.id", "test") 
    val stream = env 
     .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties)) 
     .flatMap(line => line.split(" ")) 
     .filter(word => !word.isEmpty) 
     .map(word => word.toLowerCase) 
     .filter(word => !stopWords.contains(word)) 
     .map(word => (word, 1)) 
     .keyBy(0) 
     .timeWindow(window) 
     .sum(1) 
     .print 

    env.execute("Flink Kafka Example") 
    } 
} 

建立出錯罐子(修訂版)

[[email protected] ex]$ /opt/sbt/bin/sbt package 
    [error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:4: object connectors is not a member of package org.apache.flink.streaming 
[error] import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 
[error]         ^
[error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:18: not found: type Properties 
[error]  val properties = new Properties() 
[error]      ^
[error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:23: not found: type FlinkKafkaConsumer082 
[error]  .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties)) 
[error]     ^
[error] three errors found 
[error] (compile:compileIncremental) Compilation failed 
[error] Total time: 31 s, completed Jul 3, 2016 9:02:18 PM 

回答

2

你從哪裏得到這些版本?我沒有看到kafka版本1.0.0。看看maven(按SBT選項卡):

libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0.0" 

我也建議你檢查所有其他版本。例如,火花電流釋放是1.6.2

+0

謝謝,這工作。當我意識到它不是必需的時候,移除了火花釋放。現在嘗試爲卡夫卡讀取程序創建一個jar,但獲取新的問題。 – coredump

+0

@coredump隨時接受這個答案,並詢問新的,如果沒有類似的問題幫助你。 – ipoteka

+0

感謝您解決原始問題 – coredump