我們有一個spark插件應用程序(以下是代碼),它可以從kafka獲取數據,並在將數據插入MongoDB之前對每條消息進行一些轉換。我們有一箇中間件應用程序,將消息(批量)推送到Kafka並等待來自Spark流應用程序的確認(針對每條消息)。如果在將消息發送到Kafka後的特定時間段(5秒)內中間件沒有收到確認,則中間件應用程序會重新發送消息。火花流應用程序能夠接收大約50-100條消息(在一批中),並在5秒內發送所有消息的確認。但是,如果中間件應用程序推送超過100條消息,則由於延遲發送確認的火花流,導致中間件應用程序重新發送消息。在我們目前的實施中,我們每次發送確認時都會創建生產者,這需要3-4秒。在Spark Streaming中重用kafka製作者
package com.testing
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame }
import java.util.HashMap
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.joda.time._
import org.joda.time.format._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.mongodb.util.JSON
import scala.io.Source._
import java.util.Properties
import java.util.Calendar
import scala.collection.immutable
import org.json4s.DefaultFormats
object Sample_Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Sample_Streaming")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val props = new HashMap[String, Object]()
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val TopicMap = Map("sampleTopic" -> 1)
val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)
val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", "connectionURI")
.option("spark.mongodb.input.collection", "schemaCollectionName")
.load()
val outSchema = schemaDf.schema
var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema)
KafkaDstream.foreachRDD(rdd => rdd.collect().map { x =>
{
val jsonInput: JValue = parse(x)
/*Do all the transformations using Json libraries*/
val json4s_transformed = "transformed json"
val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
val df = sqlContext.read.schema(outSchema).json(rdd)
df.write.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("topic_name", null, "message_received")
producer.send(message)
producer.close()
}
}
)
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}
}
因此,我們嘗試了foreachRDD之外創建生產者和重複使用爲整個批次間隔的另一種方法(以下爲代碼)。這似乎有所幫助,因爲我們每次發送確認時都不會創建制作人。但由於某種原因,當我們在火花UI上監控應用程序時,流媒體應用程序的內存消耗正在穩步增加,這並非如此。我們嘗試在spark-submit中使用--num-executors 1選項來限制由紗線啓動的執行程序的數量。
object Sample_Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Sample_Streaming")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val props = new HashMap[String, Object]()
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val TopicMap = Map("sampleTopic" -> 1)
val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)
val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", "connectionURI")
.option("spark.mongodb.input.collection", "schemaCollectionName")
.load()
val outSchema = schemaDf.schema
val producer = new KafkaProducer[String, String](props)
KafkaDstream.foreachRDD(rdd =>
{
rdd.collect().map (x =>
{
val jsonInput: JValue = parse(x)
/*Do all the transformations using Json libraries*/
val json4s_transformed = "transformed json"
val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
val df = sqlContext.read.schema(outSchema).json(rdd)
df.write.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()
val message = new ProducerRecord[String, String]("topic_name", null, "message_received")
producer.send(message)
producer.close()
}
)
}
)
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}
}
我的問題是:
如何監控火花應用程序的內存消耗,我們目前正在人工監視應用程序,每5分鐘,直到耗盡可用內存在我們的集羣(2節點每個16GB)?
在使用Spark streaming和kafka時,業內遵循的最佳實踐是什麼?
謝謝您的輸入,我會執行您的建議並回復您 – Sid
我回去分析您的建議,他們啓發了我。我們正在將所有的轉變轉移到foreachRDD之外。根據你的建議,我們也正在考慮刪除承認文件並替換檢查點。我們正在做的一件事是從多個主題採購。這些消息可能已損壞(沒有我們正在查找的信息),在這種情況下,我們想要過濾掉這些消息。我搜索了一些關於如何基於元素的存在來過濾DStream RDD(在JSON字符串中)的例子。你能對此有所瞭解嗎? – Sid
我能弄明白。這裏是我是如何篩選RDD VAL filteredDStream = kafkaDstream.filter {X => VAL JSON:JValue =解析(x)的 VAL驗證=(JSON \ 「KEY1」)= JNothing && ( !JSON \ 「KEY2」)= JNothing && (JSON \ 「KEY3」)= JNothing 驗證 } VAL transformedVibrationStream = filteredDStream.transform {RDD => rdd.map {ELEM => VAL JSON: JValue = parse(elem) /*做所有轉換*/ compact(render(json_transfor med)) } – Sid