1

是否可以配置Spark和spark-streaming-kafka-0-10庫來讀取多個Kafka分區或整個Kafka主題使用單個任務而不是爲每個可用的Kafka分區創建不同的Spark任務?spark-streaming-kafka-0-10:如何限制Spark分區的數量

請原諒我對這些技術的粗略理解;我認爲我對Spark和Kafka仍然是新手。體系結構和設置大多隻是爲了探索並瞭解這些技術如何協同工作。

我有一個四個虛擬主機,一個有Spark主,每個都有一個Spark工作者。根據Spotify's Docker image,其中一位主持人也在經營卡夫卡經紀人。每個主機有四個內核和大約8 GB未使用的RAM。

卡夫卡經紀人有206個主題,每個主題有10個分區。所以總共有2,060個分區供應用程序讀取。

我正在使用spark-streaming-kafka-0-10庫(當前是試驗版)通過Spark Streaming作業訂閱Kafka中的主題。我現在用的是SubscribePattern類從星火訂閱所有206個主題:

val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    SubscribePattern[String, String](Pattern.compile("(pid\\.)\\d+"), 
    kafkaParams) 
) 

當我提交這份工作星火主人,它看起來像16個執行人啓動,一個集羣中的每個核心。它也看起來像每個卡夫卡分區得到自己的任務,總共2060個任務。我認爲我的16個執行者集羣在執行這麼多任務時遇到了麻煩,因爲在1500到1800個任務完成之後,這個任務在不同的點上一直處於故障狀態。

我發現a tutorial by Michael Noll from 2014使用火花流 - 卡夫卡0-8庫來控制消費者線程的每個主題數哪些地址:

val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...) 

val consumerThreadsPerInputDstream = 3 
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream) 
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...) 

回答

3

是否有可能配置與星火spark-streaming-kafka-0-10庫可以通過單個任務讀取多個Kafka分區或整個Kafka主題,而不是爲每個可用的Kafka分區創建不同的Spark任務?

你可以通過調用流repartition改變所產生的分區數量,但你失去1:卡夫卡和RDD分區之間的一一對應。

卡夫卡分區生成的任務數量與您擁有16個執行程序的事實無關。執行程序的數量取決於您正在使用的設置和資源管理器。

卡夫卡分區和RDD分區與直接流API之間存在1:1映射,每個執行器都會從卡夫卡獲取這些分區的子集並進行處理,每個分區都是獨立的,並且可以獨立計算。這與基於接收器的API不同,後者在任意執行器上創建單個接收器,並通過節點上的線程使用數據本身。

如果您有206個主題和10個分區,您最好有一個體面大小的集羣,它可以處理生成的任務的負載。您可以控制每個分區生成的最大消息,但您可以更改分區數,除非您想調用repartition轉換的混洗效果。

+0

感謝您使用上相當細緻。你碰到一個問題,我其實並沒有完全弄明白。你的意思是,新的直接API中,對應於一個分區的任務(卡夫卡分區/ RDD分區),做數據及其處理的同時閱讀。我一直在想,爲了確定我的執行者,最終是我的節點和集羣。 – MaatDeamon

+0

@Maatdeamon是的。當執行器被分配到一個分區進行操作時,驅動程序爲每個分區分配偏移量以供讀取。 –

+0

我認爲這個問題確定了他自己的職位。我仍然有點困惑。請你看看這裏https://stackoverflow.com/questions/45526554/spark-streaming-kafka-direct-streaming-parallelism我的文章,並嘗試回答呢? – MaatDeamon

1

第二種方法將是最適合你的需求。只有你必須設置consumerThreadsPerInputDstream = 1,所以只有一個線程將每次讀取操作來創建的,因此單臺機器將每簇參與。