3

我們有一個spark插件應用程序(以下是代碼),它可以從kafka獲取數據,並在將數據插入MongoDB之前對每條消息進行一些轉換。我們有一箇中間件應用程序,將消息(批量)推送到Kafka並等待來自Spark流應用程序的確認(針對每條消息)。如果在將消息發送到Kafka後的特定時間段(5秒)內中間件沒有收到確認,則中間件應用程序會重新發送消息。火花流應用程序能夠接收大約50-100條消息(在一批中),並在5秒內發送所有消息的確認。但是,如果中間件應用程序推送超過100條消息,則由於延遲發送確認的火花流,導致中間件應用程序重新發送消息。在我們目前的實施中,我們每次發送確認時都會創建生產者,這需要3-4秒。在Spark Streaming中重用kafka製作者

package com.testing 

import org.apache.spark.streaming._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame } 
import java.util.HashMap 
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord } 
import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 

import org.joda.time._ 
import org.joda.time.format._ 

import org.json4s._ 
import org.json4s.JsonDSL._ 
import org.json4s.jackson.JsonMethods._ 
import com.mongodb.util.JSON 

import scala.io.Source._ 
import java.util.Properties 
import java.util.Calendar 

import scala.collection.immutable 
import org.json4s.DefaultFormats 


object Sample_Streaming { 

    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Sample_Streaming") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val props = new HashMap[String, Object]() 


    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config) 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 

    val TopicMap = Map("sampleTopic" -> 1) 
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2) 

     val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", "connectionURI") 
     .option("spark.mongodb.input.collection", "schemaCollectionName") 
     .load() 

     val outSchema = schemaDf.schema 
     var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema) 

    KafkaDstream.foreachRDD(rdd => rdd.collect().map { x => 
     { 
     val jsonInput: JValue = parse(x) 


     /*Do all the transformations using Json libraries*/ 

     val json4s_transformed = "transformed json" 

     val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil) 
     val df = sqlContext.read.schema(outSchema).json(rdd) 

     df.write.option("spark.mongodb.output.uri", "connectionURI") 
        .option("collection", "Collection") 
        .mode("append").format("com.mongodb.spark.sql").save() 

     val producer = new KafkaProducer[String, String](props) 
     val message = new ProducerRecord[String, String]("topic_name", null, "message_received") 

     producer.send(message) 
     producer.close() 


     } 

    } 

    ) 

    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 

因此,我們嘗試了foreachRDD之外創建生產者和重複使用爲整個批次間隔的另一種方法(以下爲代碼)。這似乎有所幫助,因爲我們每次發送確認時都不會創建制作人。但由於某種原因,當我們在火花UI上監控應用程序時,流媒體應用程序的內存消耗正在穩步增加,這並非如此。我們嘗試在spark-submit中使用--num-executors 1選項來限制由紗線啓動的執行程序的數量。

object Sample_Streaming { 

    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Sample_Streaming") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val props = new HashMap[String, Object]() 


    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config) 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 

    val TopicMap = Map("sampleTopic" -> 1) 
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2) 

     val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", "connectionURI") 
     .option("spark.mongodb.input.collection", "schemaCollectionName") 
     .load() 

     val outSchema = schemaDf.schema 
    val producer = new KafkaProducer[String, String](props) 
    KafkaDstream.foreachRDD(rdd => 
      { 

      rdd.collect().map (x => 
      { 

       val jsonInput: JValue = parse(x) 


       /*Do all the transformations using Json libraries*/ 

       val json4s_transformed = "transformed json" 

       val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil) 
       val df = sqlContext.read.schema(outSchema).json(rdd) 

       df.write.option("spark.mongodb.output.uri", "connectionURI") 
         .option("collection", "Collection") 
         .mode("append").format("com.mongodb.spark.sql").save() 


       val message = new ProducerRecord[String, String]("topic_name", null, "message_received") 

       producer.send(message) 
       producer.close() 


      } 

      ) 
     } 

    ) 

    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 

我的問題是:

  1. 如何監控火花應用程序的內存消耗,我們目前正在人工監視應用程序,每5分鐘,直到耗盡可用內存在我們的集羣(2節點每個16GB)?

  2. 在使用Spark streaming和kafka時,業內遵循的最佳實踐是什麼?

回答

3

卡夫卡是一個經紀人:它爲您提供生產者和消費者的交付保證。實施生產者和消費者之間的'頂級'承認機制是矯枉過正的。確保生產者行爲正確,消費者可以在故障情況下恢復並確保終端2端交付。

關於這項工作,難怪爲什麼它的性能差:處理是按順序進行的,逐個元素直到寫入外部數據庫。這是明顯錯誤,應在嘗試解決任何內存消耗問題之前解決。

這個過程可能像加以改進:

val producer = // create producer 

val jsonDStream = kafkaDstream.transform{rdd => rdd.map{elem => 
    val json = parse(elem) 
    render(doAllTransformations(json)) // output should be a String-formatted JSON object 
    } 
} 

jsonDStream.foreachRDD{ rdd => 
    val df = sqlContext.read.schema(outSchema).json(rdd) // transform the complete collection, not element by element 
    df.write.option("spark.mongodb.output.uri", "connectionURI") // write in bulk, not one by one 
    .option("collection", "Collection") 
    .mode("append").format("com.mongodb.spark.sql").save() 
    val msg = //create message 
    producer.send(msg) 
    producer.flush() // force send. *DO NOT Close* otherwise it will not be able to send any more messages 
} 

這個過程可以進一步改進,如果我們能夠通過case class情況下替換所有的字符串中心化JSON改造。

+0

謝謝您的輸入,我會執行您的建議並回復您 – Sid

+0

我回去分析您的建議,他們啓發了我。我們正在將所有的轉變轉移到foreachRDD之外。根據你的建議,我們也正在考慮刪除承認文件並替換檢查點。我們正在做的一件事是從多個主題採購。這些消息可能已損壞(沒有我們正在查找的信息),在這種情況下,我們想要過濾掉這些消息。我搜索了一些關於如何基於元素的存在來過濾DStream RDD(在JSON字符串中)的例子。你能對此有所瞭解嗎? – Sid

+0

我能弄明白。這裏是我是如何篩選RDD VAL filteredDStream = kafkaDstream.filter {X => VAL JSON:JValue =解析(x)的 VAL驗證=(JSON \ 「KEY1」)= JNothing && ( !JSON \ 「KEY2」)= JNothing && (JSON \ 「KEY3」)= JNothing 驗證 } VAL transformedVibrationStream = filteredDStream.transform {RDD => rdd.map {ELEM => VAL JSON: JValue = parse(elem) /*做所有轉換*/ compact(render(json_transfor med)) } – Sid

相關問題