2013-03-18 27 views
2

我有一個Cassandra數據庫分裂在多個節點上。當豬,是由豬「崩潰」創建具有以下例外的Hadoop節點上的MapReduce工作查詢它:Pig + Cassandra消息長度超過

 
2013-03-18 00:57:19,374 WARN org.apache.hadoop.mapred.Child: Error running child 
java.lang.RuntimeException: org.apache.thrift.TException: Message length exceeded: 674 
     at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:384) 
     at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:390) 
     at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:313) 
     at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) 
     at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) 
     at org.apache.cassandra.hadoop.ColumnFamilyRecordReader.getProgress(ColumnFamilyRecordReader.java:103) 
     at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.getProgress(PigRecordReader.java:169) 
     at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.getProgress(MapTask.java:514) 
     at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:539) 
     at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67) 
     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143) 
     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) 
     at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:396) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 
     at org.apache.hadoop.mapred.Child.main(Child.java:249) 
Caused by: org.apache.thrift.TException: Message length exceeded: 674, readLength: 192 
     at org.apache.thrift.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:393) 
     at org.apache.thrift.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363) 
     at org.apache.cassandra.thrift.Column.read(Column.java:535) 
     at org.apache.cassandra.thrift.ColumnOrSuperColumn.read(ColumnOrSuperColumn.java:507) 
     at org.apache.cassandra.thrift.KeySlice.read(KeySlice.java:408) 
     at org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:12905) 
     at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) 
     at org.apache.cassandra.thrift.Cassandra$Client.recv_get_range_slices(Cassandra.java:734) 
     at org.apache.cassandra.thrift.Cassandra$Client.get_range_slices(Cassandra.java:718) 
     at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:346) 
     ... 17 more 

脫穎而出是org.apache.thrift.TException: Message length exceeded: 674的一個。每次Pig查詢啓動時,異常中吐出的消息長度都會有所不同。從任務在hadoop節點上初始化的那一刻開始,觸發異常的時間不到一秒鐘。

卡桑德拉大約有1GB的數據。用於導致此異常的Pig查詢如下:

 
rows = LOAD 'cassandra://[keyspace here]/[cf here]' USING org.apache.cassandra.hadoop.pig.CassandraStorage() AS([column definitions here]); 
testvals = foreach rows generate mycolumn.$1; 
names = limit testvals 57343; 
dump names; 

爲什麼57343限制你問? 57343以下的任何數字都可以讓Pig作業成功完成,任何大於57343的數字都會使其崩潰。 Cassandra附帶的Pig示例也會以相同的異常退出。 另外,在Cassandra中使用較小的數據集讓Pig成功完成工作。

我在Thrift抱怨消息長度時發現了一些類似的錯誤,但通常這是在超出cassandra.yaml中指定的最大消息長度時。在這種情況下,cassandra.yaml中的消息長度設置爲64MB以測試它是否有幫助,但仍然發生相同的異常。此外,該例外指出,即使在例外情況中,消息的長度太長,在這種情況下消息本身只有674字節!

我的嘗試:

  • 增加thrift_max_message_length_in_mbthrift_framed_transport_size_in_mb值cassandra.yaml
  • 重建節儉罐子
  • 降的Cassandra的密鑰空間,並填充它

設置:

  • 的Hadoop 1.0.4
  • 卡桑德拉1.2.2
  • 豬0.11.0

TL; DR 豬+卡桑德拉崩潰上更大的數據集(org.apache.thrift.TException: Message length exceeded: 674)。較小的數據集或較大數據集的較小子集可以正常工作。

編輯 卡桑德拉日誌顯示沒有錯誤。它按照工作要求提供切片,而Cassandra做這件事時,工作就會消失。

+1

ConfigHelper.setThriftMaxMessageLengthInMb(); – 2013-04-19 08:15:00

回答

0

如果此列家族使用寬行或有很多列,那麼您可能想嘗試傳遞widerows選項。

set cassandra.input.widerows true; 
data = load 'cassandra://[keyspace here]/[cf here]/?widerows=true' 
      using org.apache.cassandra.hadoop.pig.CassandraStorage();