我們已經成功運行加繆一年左右,成功地從卡夫卡(版本0.82)中提取avro有效載荷,並將其作爲.avro文件存儲在HDFS中,只需使用幾個卡夫卡話題。最近,我們公司內的一個新團隊在預生產環境中註冊了約60個新主題,並開始向這些主題發送數據。該團隊在將數據路由到kafka主題時犯了一些錯誤,當Camus將這些有效載荷反序列化爲這些主題時,會導致錯誤。 由於超過了「失敗的其他」錯誤閾值,加繆作業失敗。在失敗之後,加繆導致的行爲令人驚訝,我想與其他開發人員一起檢查,看看我們觀察到的行爲是否預期,或者我們的實施是否存在一些問題。加繆期望的提交/回滾行爲是什麼?
當Camus作業由於超過「失敗的其他」閾值而失敗時,我們注意到這種情況: 1.所有的mapper任務都成功了,所以TaskAttempt被允許提交 - 這意味着所有寫入的數據由加繆被複制到最終的HDFS位置。 2. CamusJob在計算%錯誤率(這是在映射程序提交之後)時引發異常,導致作業失敗 3.由於作業失敗(我認爲),所以Kafka偏移不是預先的
我們遇到的這個問題是我們的卡姆斯作業每5分鐘運行一次。因此,我們每5分鐘就看到一次數據被寫入HDFS,作業失敗,Kafka偏移沒有更新 - 這意味着我們寫了重複的數據,直到我們注意到我們的磁盤已滿。
我寫了一個確認結果的集成測試 - 它向一個主題提交了10條好記錄,並且10條記錄使用了一個意外的模式到相同的主題,運行了只有該主題列入白名單的Camus作業,而且我們可以看到將10條記錄寫入HDFS並且Kafka偏移不會提前。以下是該測試中的日誌片段,以及我們在運行作業時使用的屬性。
任何幫助表示讚賞 - 我不確定這是否爲Camus的預期行爲,或者我們的實施是否存在問題,以及防止此行爲(複製數據)的最佳方法是什麼。
謝謝〜馬特
的測試CamusJob屬性:從測試
etl.destination.path=/user/camus/kafka/data
etl.execution.base.path=/user/camus/kafka/workspace
etl.execution.history.path=/user/camus/kafka/history
dfs.default.classpath.dir=/user/camus/kafka/libs
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder
camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z
mapreduce.output.fileoutputformat.compress=false
mapred.map.tasks=15
kafka.max.pull.hrs=1
kafka.max.historical.days=3
kafka.whitelist.topics=advertising.edmunds.admax
log4j.configuration=true
kafka.client.name=camus
kafka.brokers=<kafka brokers>
max.decoder.exceptions.to.print=5
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=<schema repo url>
etl.run.tracking.post=false
kafka.monitor.time.granularity=10
etl.daily=daily
etl.ignore.schema.errors=false
etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=America/Los_Angeles
mapred.output.compress=false
mapred.map.max.attempts=2
日誌片斷,展示了提交行爲後映射器成功和後續作業失敗,由於超過了「其他」門檻:
LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map
[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing
[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map
[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now
[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0
[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data
[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro
[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro
[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro
[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro
[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0
[LocalJobRunner] - map task executor complete.
[Job] - map 100% reduce 0%
[Job] - Job job_local866350146_0001 completed successfully
[Job] - Counters: 23
File System Counters
FILE: Number of bytes read=117251
FILE: Number of bytes written=350942
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=10
Map output records=15
Input split bytes=793
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=13
Total committed heap usage (bytes)=251658240
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
DECODE_SUCCESSFUL=10
SKIPPED_OTHER=10
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=5907
total
data-read=840
decode-time(ms)=123
event-count=20
mapper-time(ms)=58
request-time(ms)=12114
skip-old=0
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read: 117251
[CamusJob] - FILE: Number of bytes written: 350942
[CamusJob] - FILE: Number of read operations: 0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations: 0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 10
[CamusJob] - Map output records: 15
[CamusJob] - Input split bytes: 793
[CamusJob] - Spilled Records: 0
[CamusJob] - Failed Shuffles: 0
[CamusJob] - Merged Map outputs: 0
[CamusJob] - GC time elapsed (ms): 13
[CamusJob] - Total committed heap usage (bytes): 251658240
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
[CamusJob] - DECODE_SUCCESSFUL: 10
[CamusJob] - SKIPPED_OTHER: 10
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%
感謝您的迴應 - 迄今爲止,我們的方法是禁用加繆錯誤檢查(我們不希望因任何原因而失敗)。我們會盡可能快地過渡加繆。 – user2994581