1
新的卡夫卡/弗林克/斯卡拉/ SBT組合,並試圖建立以下疑難解答使用scala sbt的kafka + flink示例?
- 多主題卡夫卡隊列
- 弗林克數據流作業,使用Scala的罐子
- Scala的罐子,從讀取數據一個主題,過程,然後將數據推送到另一個話題
Uptil現在
- 能夠正確設置Kafka和Flink。
- 能夠使用flink二進制附帶的Kafka.jar示例讀取kafka隊列。
能夠創造一個單詞計數罐子(感謝ipoteka)
現在試圖建立以流式單詞計數罐子,但運行到SBT發出
現在試圖在嘗試之前創建一個示例wordcount.jar實際的卡夫卡/火花流示例。
但遇到simSBT問題 任何想法我可以忽略。
也讓我知道,如果我有任何不必要的聲明。
如果有人分享一個簡單的程序來讀取/寫入kakfa隊列,也將不勝感激。
項目設置 -
|- project/plugins.sbt
|- build.sbt
|- src/main/scala/WordCount.scala
build.sbt
name := "Kakfa-Flink Project"
version := "1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"
// Updated : Correction pointed by ipoteka
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.0.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.0.0"
// for jar building
mainClass in compile := Some("StreamWordCount")
plugins.sbt
// *** creating fat jar
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
WordCount.scala
package prog
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
object WordCount {
type WordCount = (String, Int)
def main(lines: DataStream[String], stopWords: Set[String], window: Time): DataStream[WordCount] = {
lines
.flatMap(line => line.split(" "))
.filter(word => !word.isEmpty)
.map(word => word.toLowerCase)
.filter(word => !stopWords.contains(word))
.map(word => (word, 1))
.keyBy(0)
.timeWindow(window)
.sum(1)
}
}
StreamWordCount.scala
package prog
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
object Main {
def main(args: Array[String]) {
type WordCount = (String, Int)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
.flatMap(line => line.split(" "))
.filter(word => !word.isEmpty)
.map(word => word.toLowerCase)
.filter(word => !stopWords.contains(word))
.map(word => (word, 1))
.keyBy(0)
.timeWindow(window)
.sum(1)
.print
env.execute("Flink Kafka Example")
}
}
建立出錯罐子(修訂版)
[[email protected] ex]$ /opt/sbt/bin/sbt package
[error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:4: object connectors is not a member of package org.apache.flink.streaming
[error] import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
[error] ^
[error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:18: not found: type Properties
[error] val properties = new Properties()
[error] ^
[error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:23: not found: type FlinkKafkaConsumer082
[error] .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
[error] ^
[error] three errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 31 s, completed Jul 3, 2016 9:02:18 PM
謝謝,這工作。當我意識到它不是必需的時候,移除了火花釋放。現在嘗試爲卡夫卡讀取程序創建一個jar,但獲取新的問題。 – coredump
@coredump隨時接受這個答案,並詢問新的,如果沒有類似的問題幫助你。 – ipoteka
感謝您解決原始問題 – coredump