是否可以配置Spark和spark-streaming-kafka-0-10庫來讀取多個Kafka分區或整個Kafka主題使用單個任務而不是爲每個可用的Kafka分區創建不同的Spark任務?spark-streaming-kafka-0-10:如何限制Spark分區的數量
請原諒我對這些技術的粗略理解;我認爲我對Spark和Kafka仍然是新手。體系結構和設置大多隻是爲了探索並瞭解這些技術如何協同工作。
我有一個四個虛擬主機,一個有Spark主,每個都有一個Spark工作者。根據Spotify's Docker image,其中一位主持人也在經營卡夫卡經紀人。每個主機有四個內核和大約8 GB未使用的RAM。
卡夫卡經紀人有206個主題,每個主題有10個分區。所以總共有2,060個分區供應用程序讀取。
我正在使用spark-streaming-kafka-0-10庫(當前是試驗版)通過Spark Streaming作業訂閱Kafka中的主題。我現在用的是SubscribePattern
類從星火訂閱所有206個主題:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
SubscribePattern[String, String](Pattern.compile("(pid\\.)\\d+"),
kafkaParams)
)
當我提交這份工作星火主人,它看起來像16個執行人啓動,一個集羣中的每個核心。它也看起來像每個卡夫卡分區得到自己的任務,總共2060個任務。我認爲我的16個執行者集羣在執行這麼多任務時遇到了麻煩,因爲在1500到1800個任務完成之後,這個任務在不同的點上一直處於故障狀態。
我發現a tutorial by Michael Noll from 2014使用火花流 - 卡夫卡0-8庫來控制消費者線程的每個主題數哪些地址:
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val consumerThreadsPerInputDstream = 3
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
感謝您使用上相當細緻。你碰到一個問題,我其實並沒有完全弄明白。你的意思是,新的直接API中,對應於一個分區的任務(卡夫卡分區/ RDD分區),做數據及其處理的同時閱讀。我一直在想,爲了確定我的執行者,最終是我的節點和集羣。 – MaatDeamon
@Maatdeamon是的。當執行器被分配到一個分區進行操作時,驅動程序爲每個分區分配偏移量以供讀取。 –
我認爲這個問題確定了他自己的職位。我仍然有點困惑。請你看看這裏https://stackoverflow.com/questions/45526554/spark-streaming-kafka-direct-streaming-parallelism我的文章,並嘗試回答呢? – MaatDeamon