我正在使用Spark Streaming從Kafka中讀取DStream的JSON字符串。我需要以JSON格式將輸入數據保存在S3上。這是我這樣做的方式,但是當批量數據量大約爲5Mb時,執行此代碼需要很長時間。可以優化嗎? 我需要保存在JSON文件中的數據(*.json
),因爲使用這些文件進行讀取如下另一方案:將RDD另存爲JSON文件的問題,考慮到每個RDD的大小不會超過10 Mb
var df = sqlContext.read.json("s3n://" + bucketNameData + "/" + directoryS3 + "/*.json")
因此,爲了RDD另存爲的單層JSON文件,我嘗試了rdd.map(lambda x :json.loads(x)) .coalesce(1, shuffle=True).saveAsTextFile('examples/src/main/resources/demo.json')
,但是它以一種不錯的方式保存數據,並且據我所知,我希望能夠像以上所示的那樣讀取它們(從json文件獲取df
)。因此,我切換到amazonS3Client
,但我覺得它可能會優化。也許我應該將rdd
轉換爲DataFrame左右,然後以某種方式將它保存爲JSON?
val mySet = ssc.sparkContext.broadcast(Map("metadataBrokerList"->metadataBrokerList,
"bucketNameData"->bucketNameData,
"bucketNameCode"->bucketNameCode))
dstreamdata.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreachPartition { iter =>
val producer = UtilsTest.createProducer(mySet.value("metadataBrokerList"))
val amazonS3Client = UtilsTest.createS3()
iter.foreach { msg =>
if (msg.nonEmpty) {
// Save messages to S3
val CONTENT_TYPE = "application/json"
val fileContentBytes = msg.getBytes(StandardCharsets.UTF_8)
val fileInputStream = new ByteArrayInputStream(fileContentBytes)
val metadata = new ObjectMetadata()
metadata.setContentType(CONTENT_TYPE)
metadata.setContentLength(fileContentBytes.length)
val datetime = Calendar.getInstance.getTime
val formatter = new SimpleDateFormat("yyyy-MMM-dd-HH-mm-ss")
val setID = formatter.format(datetime)
val filePath = mySet.value("bucketNameData") + "/file_" + setID + ".json"
val putObjectRequest = new PutObjectRequest(mySet.value("bucketNameData"), filePath, fileInputStream, metadata)
amazonS3Client.putObject(putObjectRequest)
//---
謝謝您的回答。如果你使用newHadooopRDD,你是什麼意思?我可以在我的'pom.xml'中集成一些庫嗎? – Dinosaurius
不,它意味着你用'SparkContext.newAPIHadoopRDD()創建RDD' –