val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(1))
val log = Logger.getLogger("sqsLog")
val sqs = streamContext.receiverStream(new SQSReceiver("queue")
.at(Regions.US_EAST_1)
.withTimeout(5))
val jsonRows = sqs.mapPartitions(partitions => {
val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY")))
val txfm = new LogLine2Json
val log = Logger.getLogger("parseLog")
val sqlSession = SparkSession
.builder()
.getOrCreate()
val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
val parsedDate = parsedFormat.format(new java.util.Date())
val outputPath = "/tmp/spark/presto"
partitions.map(messages => {
val sqsMsg = Json.parse(messages)
System.out.println(sqsMsg)
val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "")
val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
System.out.println(bucketName)
System.out.println(key)
val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
val stream = obj.getObjectContent()
scala.io.Source.fromInputStream(stream).getLines().map(line => {
try{
val str = txfm.parseLine(line)
val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str)
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
}
catch {
case e: Throwable => {log.info(line); "";}
}
}).filter(line => line != "{}")
})
})
streamContext.start()
streamContext.awaitTermination()
我的工作非常簡單,我們從SQS獲取一個S3密鑰。該文件的內容是nginx日誌,我們使用我們的解析器解析這是工作文件。 LogLine2Json
它將日誌轉換爲JSON格式,然後我們將其寫入orc
格式。Spark沒有輸出操作註冊,所以沒什麼可執行的,但我正在寫入文件
但我發現了這個錯誤
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at SparrowOrc$.main(sparrowOrc.scala:159)
at SparrowOrc.main(sparrowOrc.scala)
據我所知,星火需要一個動作,否則將無法正常工作。但是我有這個代碼寫入一個orc文件。我不確定我是否需要做其他事情?
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
在初始數據幀中應該有一連串的調用導致write或writeStream調用。你確實寫了一個數據框,但是因爲這是一個副作用,所以沒有理由開始處理初始數據框。經典I/O很難使用,因爲它在火花建立的處理鏈之外工作。 Spark的hadoop操作與S3兼容,我相信:https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html –