2016-09-29 45 views
0

我想將對象轉換爲org.apache.hadoop.hbase.client。放入DoFn。 我不斷收到此錯誤:com.google.cloud.dataflow.sdk.util.IllegalMutationException在DoFn轉換爲org.apache.hadoop.hbase.client.Put

com.google.cloud.dataflow.sdk.util.IllegalMutationException: DoFn DataBuckettingToBigTableConnectorFn mutated value {"totalColumns":36,"row":"HealthTechnology#MedicalSpecialties#1470010560000#gnip","families":{"SENT":[{"qualifier":"sector","vlen":16,"tag":[],"timestamp":1475153818546},{"qualifier":"industry","vlen":18,"tag":[],"timestamp":1475153818546},{"qualifier":"updateTime","vlen":8,"tag":[],"timestamp":1475153818546},{"qualifier":"timeBucket","vlen":8,"tag":[],"timestamp":1475153818546}]}} after it was output (new value was {"totalColumns":36,"row":"HealthTechnology#MedicalSpecialties#1470010560000#gnip","families":{"SENT":[{"qualifier":"sector","vlen":16,"tag":[],"timestamp":1475153818576},{"qualifier":"industry","vlen":18,"tag":[],"timestamp":1475153818576},{"qualifier":"updateTime","vlen":8,"tag":[],"timestamp":1475153818576},{"qualifier":"timeBucket","vlen":8,"tag":[],"timestamp":1475153818576}]}}). Values must not be mutated in any way after being output. 
Caused by: com.google.cloud.dataflow.sdk.util.IllegalMutationException: Value {"totalColumns":36,"row":"HealthTechnology#MedicalSpecialties#1470010560000#gnip","families":{"SENT":[{"qualifier":"sector","vlen":16,"tag":[],"timestamp":1475153818546},{"qualifier":"industry","vlen":18,"tag":[],"timestamp":1475153818546},{"qualifier":"updateTime","vlen":8,"tag":[],"timestamp":1475153818546},{"qualifier":"timeBucket","vlen":8,"tag":[],"timestamp":1475153818546}]}} mutated illegally, new value was {"totalColumns":36,"row":"HealthTechnology#MedicalSpecialties#1470010560000#gnip","families":{"SENT":[{"qualifier":"sector","vlen":16,"tag":[],"timestamp":1475153818576},{"qualifier":"industry","vlen":18,"tag":[],"timestamp":1475153818576},{"qualifier":"updateTime","vlen":8,"tag":[],"timestamp":1475153818576},{"qualifier":"timeBucket","vlen":8,"tag":[],"timestamp":1475153818576}]}}. Encoding was kg4SNkhlYWx0aFRlY2hub2xvZ3kjTWVkaWNhbFNwZWNpYWx0aWVzIzE0NzAwMTA1NjAwMDAjZ25pcBorCikKBFNFTlQSBnNlY3RvchjQ3t-a0LTPAiIQSGVhbHRoVGVjaG5vbG9neRovCi0KBFNFTlQSCGluZHVzdHJ5GNDe35rQtM8CIhJNZWRpY2FsU3BlY2lhbHRpZXMaJwolCgRTRU5UEgp1cGRhdGVUaW1lGNDe35rQtM8CIggAAAFXdgTwiBonCiUKBFNFTlQSCnRpbWVCdWNrZXQY0N7fmtC0zwIiCAAAAVZDdQ4AGh8KHQoEU0VOVBIGc291cmNlGNDe35rQtM8CIgRHbmlwGiMKIQoEU0VOVBIEbWVhbhjQ3t-a0LTPAiIKAAAAD6HuTKnCExorCikKBFNFTlQSDG1lYW5OZWdhdGl2ZRjQ3t-a0LTPAiIKAAAAD6HuTKnCExomCiQKBFNFTlQSDG1lYW5Qb3NpdGl2ZRjQ3t-a0LTPAiIFAAAAAAAaKwopCgRTRU5UEhFzdGFuZGFyZERldmlhdGlvbhjQ3t-a0LTPAiIFAAAAAAAaMwoxCgRTRU5UEhlzdGFuZGFyZERldmlhdGlvbk5lZ2F0aXZlGNDe35rQtM8CIgUAAAAAABozCjEKBFNFTlQSGXN0YW5kYXJkRGV2aWF0aW9uUG9zaXRpdmUY0N7fmtC0zwIiBQAAAAAAGiIKIAoEU0VOVBIIdmFyaWFuY2UY0N7fmtC0zwIiBQAAAAAAGioKKAoEU0VOVBIQdmFyaWFuY2VOZWdhdGl2ZRjQ3t-a0LTPAiIFAAAAAAAaKgooCgRTRU5UEhB2YXJpYW5jZVBvc2l0aXZlGNDe35rQtM8CIgUAAAAAABo1CjMKBFNFTlQSFnNjb3JlRm9sbG93ZXJzV2VpZ2h0ZWQY0N7fmtC0zwIiCgAAAA-h7kypwhMaNQozCgRTRU5UEhVzY29yZU1heFNjb3JlV2VpZ2h0ZWQY0N7fmtC0zwIiCwAAAA_8coFbOYAAGjQKMgoEU0VOVBIac2NvcmVTaWduRm9sbG93ZXJzV2VpZ2h0ZWQY0N7fmtC0zwIiBQAAAAH2GicKJQoEU0VOVBIIbWF4U2NvcmUY0N7fmtC0zwIiCgAAAA9eEbNWPe0aMQovCgRTRU5UEhRudW1iZXJPZk9ic2VydmF0aW9ucxjQ3t-a0LTPAiIIAAAAAAAAAAEaOQo3CgRTRU5UEhxudW1iZXJPZk9ic2VydmF0aW9uc05lZ2F0aXZlGNDe35rQtM8CIggAAAAAAAAAARo5CjcKBFNFTlQSHG51bWJlck9mT2JzZXJ2YXRpb25zUG9zaXRpdmUY0N7fmtC0zwIiCAAAAAAAAAAAGjUKMwoEU0VOVBIYbnVtYmVyT2ZCdWxsT2JzZXJ2YXRpb25zGNDe35rQtM8CIggAAAAAAAAAABo9CjsKBFNFTlQSIG51bWJlck9mQnVsbFBvc2l0aXZlT2JzZXJ2YXRpb25zGNDe35rQtM8CIggAAAAAAAAAABo9CjsKBFNFTlQSIG51bWJlck9mQnVsbE5lZ2F0aXZlT2JzZXJ2YXRpb25zGNDe35rQtM8CIggAAAAAAAAAABo1CjMKBFNFTlQSGG51bWJlck9mQmVhck9ic2VydmF0aW9ucxjQ3t-a0LTPAiIIAAAAAAAAAAAaPQo7CgRTRU5UEiBudW1iZXJPZkJlYXJQb3NpdGl2ZU9ic2VydmF0aW9ucxjQ3t-a0LTPAiIIAAAAAAAAAAAaPQo7CgRTRU5UEiBudW1iZXJPZkJlYXJOZWdhdGl2ZU9ic2VydmF0aW9ucxjQ3t-a0LTPAiIIAAAAAAAAAAAaJAoiCgRTRU5UEgVzY29yZRjQ3t-a0LTPAiIKAAAAD6HuTKnCExosCioKBFNFTlQSDXNjb3JlTmVnYXRpdmUY0N7fmtC0zwIiCgAAAA-h7kypwhMaJwolCgRTRU5UEg1zY29yZVBvc2l0aXZlGNDe35rQtM8CIgUAAAABABosCioKBFNFTlQSDHNjb3JlU3F1YXJlZBjQ3t-a0LTPAiILAAAAEiYBlRXVSeUaNAoyCgRTRU5UEhRzY29yZVNxdWFyZWROZWdhdGl2ZRjQ3t-a0LTPAiILAAAAEiYBlRXVSeUaLgosCgRTRU5UEhRzY29yZVNxdWFyZWRQb3NpdGl2ZRjQ3t-a0LTPAiIFAAAAAQAaJwolCgRTRU5UEgxzdW1Gb2xsb3dlcnMY0N7fmtC0zwIiBgAAAAAGIRoyCjAKBFNFTlQSEnN1bVNjb3JlWEZvbGxvd2VycxjQ3t-a0LTPAiILAAAADsZYjS-kpXIaLAoqCgRTRU5UEhFzdW1TaWduWEZvbGxvd2VycxjQ3t-a0LTPAiIGAAAAAcK2, now kg4SNkhlYWx0aFRlY2hub2xvZ3kjTWVkaWNhbFNwZWNpYWx0aWVzIzE0NzAwMTA1NjAwMDAjZ25pcBorCikKBFNFTlQSBnNlY3RvchiAyeGa0LTPAiIQSGVhbHRoVGVjaG5vbG9neRovCi0KBFNFTlQSCGluZHVzdHJ5GIDJ4ZrQtM8CIhJNZWRpY2FsU3BlY2lhbHRpZXMaJwolCgRTRU5UEgp1cGRhdGVUaW1lGIDJ4ZrQtM8CIggAAAFXdgTwiBonCiUKBFNFTlQSCnRpbWVCdWNrZXQYgMnhmtC0zwIiCAAAAVZDdQ4AGh8KHQoEU0VOVBIGc291cmNlGIDJ4ZrQtM8CIgRHbmlwGiMKIQoEU0VOVBIEbWVhbhiAyeGa0LTPAiIKAAAAD6HuTKnCExorCikKBFNFTlQSDG1lYW5OZWdhdGl2ZRiAyeGa0LTPAiIKAAAAD6HuTKnCExomCiQKBFNFTlQSDG1lYW5Qb3NpdGl2ZRiAyeGa0LTPAiIFAAAAAAAaKwopCgRTRU5UEhFzdGFuZGFyZERldmlhdGlvbhiAyeGa0LTPAiIFAAAAAAAaMwoxCgRTRU5UEhlzdGFuZGFyZERldmlhdGlvbk5lZ2F0aXZlGIDJ4ZrQtM8CIgUAAAAAABozCjEKBFNFTlQSGXN0YW5kYXJkRGV2aWF0aW9uUG9zaXRpdmUYgMnhmtC0zwIiBQAAAAAAGiIKIAoEU0VOVBIIdmFyaWFuY2UYgMnhmtC0zwIiBQAAAAAAGioKKAoEU0VOVBIQdmFyaWFuY2VOZWdhdGl2ZRiAyeGa0LTPAiIFAAAAAAAaKgooCgRTRU5UEhB2YXJpYW5jZVBvc2l0aXZlGIDJ4ZrQtM8CIgUAAAAAABo1CjMKBFNFTlQSFnNjb3JlRm9sbG93ZXJzV2VpZ2h0ZWQYgMnhmtC0zwIiCgAAAA-h7kypwhMaNQozCgRTRU5UEhVzY29yZU1heFNjb3JlV2VpZ2h0ZWQYgMnhmtC0zwIiCwAAAA_8coFbOYAAGjQKMgoEU0VOVBIac2NvcmVTaWduRm9sbG93ZXJzV2VpZ2h0ZWQYgMnhmtC0zwIiBQAAAAH2GicKJQoEU0VOVBIIbWF4U2NvcmUYgMnhmtC0zwIiCgAAAA9eEbNWPe0aMQovCgRTRU5UEhRudW1iZXJPZk9ic2VydmF0aW9ucxiAyeGa0LTPAiIIAAAAAAAAAAEaOQo3CgRTRU5UEhxudW1iZXJPZk9ic2VydmF0aW9uc05lZ2F0aXZlGIDJ4ZrQtM8CIggAAAAAAAAAARo5CjcKBFNFTlQSHG51bWJlck9mT2JzZXJ2YXRpb25zUG9zaXRpdmUYgMnhmtC0zwIiCAAAAAAAAAAAGjUKMwoEU0VOVBIYbnVtYmVyT2ZCdWxsT2JzZXJ2YXRpb25zGIDJ4ZrQtM8CIggAAAAAAAAAABo9CjsKBFNFTlQSIG51bWJlck9mQnVsbFBvc2l0aXZlT2JzZXJ2YXRpb25zGIDJ4ZrQtM8CIggAAAAAAAAAABo9CjsKBFNFTlQSIG51bWJlck9mQnVsbE5lZ2F0aXZlT2JzZXJ2YXRpb25zGIDJ4ZrQtM8CIggAAAAAAAAAABo1CjMKBFNFTlQSGG51bWJlck9mQmVhck9ic2VydmF0aW9ucxiAyeGa0LTPAiIIAAAAAAAAAAAaPQo7CgRTRU5UEiBudW1iZXJPZkJlYXJQb3NpdGl2ZU9ic2VydmF0aW9ucxiAyeGa0LTPAiIIAAAAAAAAAAAaPQo7CgRTRU5UEiBudW1iZXJPZkJlYXJOZWdhdGl2ZU9ic2VydmF0aW9ucxiAyeGa0LTPAiIIAAAAAAAAAAAaJAoiCgRTRU5UEgVzY29yZRiAyeGa0LTPAiIKAAAAD6HuTKnCExosCioKBFNFTlQSDXNjb3JlTmVnYXRpdmUYgMnhmtC0zwIiCgAAAA-h7kypwhMaJwolCgRTRU5UEg1zY29yZVBvc2l0aXZlGIDJ4ZrQtM8CIgUAAAABABosCioKBFNFTlQSDHNjb3JlU3F1YXJlZBiAyeGa0LTPAiILAAAAEiYBlRXVSeUaNAoyCgRTRU5UEhRzY29yZVNxdWFyZWROZWdhdGl2ZRiAyeGa0LTPAiILAAAAEiYBlRXVSeUaLgosCgRTRU5UEhRzY29yZVNxdWFyZWRQb3NpdGl2ZRiAyeGa0LTPAiIFAAAAAQAaJwolCgRTRU5UEgxzdW1Gb2xsb3dlcnMYgMnhmtC0zwIiBgAAAAAGIRoyCjAKBFNFTlQSEnN1bVNjb3JlWEZvbGxvd2VycxiAyeGa0LTPAiILAAAADsZYjS-kpXIaLAoqCgRTRU5UEhFzdW1TaWduWEZvbGxvd2VycxiAyeGa0LTPAiIGAAAAAcK2. 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:167) ~[google-cloud-dataflow-java-sdk-all-1.7.0.jar:na] 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:162) ~[google-cloud-dataflow-java-sdk-all-1.7.0.jar:na] 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:113) ~[google-cloud-dataflow-java-sdk-all-1.7.0.jar:na] 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.verifyOutputUnmodified(ParDo.java:1338) ~[google-cloud-dataflow-java-sdk-all-1.7.0.jar:na] 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1306) ~[google-cloud-dataflow-java-sdk-all-1.7.0.jar:na] 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) ~[google-cloud-dataflow-java-sdk-all-1.7.0.jar:na] 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) ~[google-cloud-dataflow-java-sdk-all-1.7.0.jar:na] 

但是如果我使用不同的類比org.apache.hadoop.hbase.client.Put(任意自定義類型)的錯誤不會發生

這裏目前的DoFn processElementMethod:

@Override 
public void processElement(ProcessContext c) throws Exception { 
     KV<Key, DataflowBucketedData> element = c.element(); 
     if(element == null) 
      return; 
     Key itemKey = element.getKey(); 
     DataflowBucketedData val = element.getValue(); 

     String key; 
     switch (element.getKey().getKeyType()){ 
      case Industry: 
       key = INDUSTRY_KEY_UTILS.createKey(itemKey.getSectorCode(), itemKey.getIndustryCode(), val.getTimeBucket().getTime(), val.getSource().name().toLowerCase()); 
       break; 
      case Sector: 
      key = SECTOR_KEY_UTILS.createKey(itemKey.getSectorCode(), val.getTimeBucket().getTime(), val.getSource().name().toLowerCase()); 
      break; 
     case Symbol: 
      key = SYMBOL_KEY_UTILS.createKey(itemKey.getSymbol(), val.getTimeBucket().getTime(), val.getSource().name().toLowerCase()); 
      break; 
     default: 
      key = null; 
      break; 
    } 
    if(!Strings.isNullOrEmpty(key)) { 
     Put p = createEntity(key, itemKey.getSymbol(), itemKey.getSectorCode(), itemKey.getIndustryCode(), val); 
     c.output(p); 
    } 
} 

private Put createEntity(String key, String company, String sectorCode, String industryCode, DataflowBucketedData data){ 
    Put put = new Put(Bytes.toBytes(key)); 
    put.setId(key); 
    if(!Strings.isNullOrEmpty(company)) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.SYMBOL, Bytes.toBytes(company)); 
    else { 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.SECTOR, Bytes.toBytes(sectorCode)); 
     if(!Strings.isNullOrEmpty(industryCode)) 
      put.addColumn(Schema.COLUMN_FAMILY, Schema.INDUSTRY, Bytes.toBytes(industryCode)); 
    } 
    if(data.getUpdateTime() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.UPDATE_TIME, Bytes.toBytes(data.getUpdateTime().getTime())); 
    if (data.getTimeBucket() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.TIME_BUCKET, Bytes.toBytes(data.getTimeBucket().getTime())); 

    if (data.getSource() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.SOURCE, Bytes.toBytes(data.getSource().name())); 
    if (data.getMean() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.MEAN, Bytes.toBytes(data.getMean())); 
    if (data.getMeanNegative() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.MEAN_NEGATIVE, Bytes.toBytes(data.getMeanNegative())); 
    if (data.getMeanPositive() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.MEAN_POSITIVE, Bytes.toBytes(data.getMeanPositive())); 
    if (data.getStandardDeviation() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.STANDARD_DEVIATION, Bytes.toBytes(data.getStandardDeviation())); 
    if (data.getStandardDeviationNegative() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.STANDARD_DEVIATION_NEGATIVE, Bytes.toBytes(data.getStandardDeviationNegative())); 
    if (data.getStandardDeviationPositive() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.STANDARD_DEVIATION_POSITIVE, Bytes.toBytes(data.getStandardDeviationPositive())); 
    if (data.getVariance() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.VARIANCE, Bytes.toBytes(data.getVariance())); 
    if (data.getVarianceNegative() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.VARIANCE_NEGATIVE, Bytes.toBytes(data.getVarianceNegative())); 
    if (data.getVariancePositive() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.VARIANCE_POSITIVE, Bytes.toBytes(data.getVariancePositive())); 
    if (data.getScoreFollowersWeighted() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_FOLLOWERS_WEIGHTED, Bytes.toBytes(data.getScoreFollowersWeighted())); 
    if (data.getScoreMaxScoreWeighted() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_MAX_SCORE_WEIGHTED, Bytes.toBytes(data.getScoreMaxScoreWeighted())); 
    if (data.getScoreSignFollowersWeighted() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_SIGN_FOLLOWERS_WEIGHTED, Bytes.toBytes(data.getScoreSignFollowersWeighted())); 
    if (data.getMaxScore() != null) 
     put.addColumn(Schema.COLUMN_FAMILY, Schema.MAX_SCORE, Bytes.toBytes(data.getMaxScore())); 

    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_OBSERVATIONS, Bytes.toBytes(data.getNumberOfObservations())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_OBSERVATIONS_NEGATIVE, Bytes.toBytes(data.getNumberOfObservationsNegative())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_OBSERVATIONS_POSITIVE, Bytes.toBytes(data.getNumberOfObservationsPositive())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_BULL_OBSERVATIONS, Bytes.toBytes(data.getNumberOfBullObservations())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_BULL_POSITIVE_OBSERVATIONS, Bytes.toBytes(data.getNumberOfBullPositiveObservations())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_BULL_NEGATIVE_OBSERVATIONS, Bytes.toBytes(data.getNumberOfBullNegativeObservations())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_BEAR_OBSERVATIONS, Bytes.toBytes(data.getNumberOfBearObservations())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_BEAR_POSITIVE_OBSERVATIONS, Bytes.toBytes(data.getNumberOfBearPositiveObservations())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.NUMBER_OF_BEAR_NEGATIVE_OBSERVATIONS, Bytes.toBytes(data.getNumberOfBearNegativeObservations())); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE, Bytes.toBytes(BigDecimal.valueOf(data.getScore()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_NEGATIVE, Bytes.toBytes(BigDecimal.valueOf(data.getScoreNegative()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_POSITIVE, Bytes.toBytes(BigDecimal.valueOf(data.getScorePositive()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_SQUARED, Bytes.toBytes(BigDecimal.valueOf(data.getScoreSquared()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_SQUARED_NEGATIVE, Bytes.toBytes(BigDecimal.valueOf(data.getScoreSquaredNegative()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SCORE_SQUARED_POSITIVE, Bytes.toBytes(BigDecimal.valueOf(data.getScoreSquaredPositive()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SUM_FOLLOWERS, Bytes.toBytes(BigDecimal.valueOf(data.getSumFollowers()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SUM_SCORE_X_FOLLOWERS, Bytes.toBytes(BigDecimal.valueOf(data.getSumScoreXFollowers()))); 
    put.addColumn(Schema.COLUMN_FAMILY, Schema.SUM_SIGN_X_FOLLOWERS, Bytes.toBytes(BigDecimal.valueOf(data.getSumSignXFollowers()))); 
    return put; 
} 

正如你所看到的,Put對象的修改不會在post輸出中發生。

下面是我用

compile group:'com.google.cloud.dataflow', name:'google-cloud-dataflow-java-sdk-all', version:'1.7.0' 
compile group:'com.google.cloud.bigtable', name:'bigtable-hbase-dataflow', version:'0.9.2' 

任何想法庫?

更新: 我抓起例如在https://github.com/GoogleCloudPlatform/cloud-bigtable-examples/tree/master/java/dataflow-connector-examples ,它也不管用,失敗完全相同的方式:

[main] INFO com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner - Executing pipeline using the DirectPipelineRunner. 
Disconnected from the target VM, address: '127.0.0.1:50713', transport: 'socket' 
Exception in thread "main" com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: com.google.cloud.dataflow.sdk.util.IllegalMutationException: DoFn mutated value {"totalColumns":1,"row":"World","families":{"cf":[{"qualifier":"qualifier","vlen":24,"tag":[],"timestamp":1475158801718}]}} after it was output (new value was {"totalColumns":1,"row":"World","families":{"cf":[{"qualifier":"qualifier","vlen":24,"tag":[],"timestamp":1475158801752}]}}). Values must not be mutated in any way after being output. 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:186) 
    at com.google.cloud.bigtable.dataflow.example.HelloWorldWrite.main(HelloWorldWrite.java:113) 
Caused by: com.google.cloud.dataflow.sdk.util.IllegalMutationException: DoFn mutated value {"totalColumns":1,"row":"World","families":{"cf":[{"qualifier":"qualifier","vlen":24,"tag":[],"timestamp":1475158801718}]}} after it was output (new value was {"totalColumns":1,"row":"World","families":{"cf":[{"qualifier":"qualifier","vlen":24,"tag":[],"timestamp":1475158801752}]}}). Values must not be mutated in any way after being output. 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.verifyOutputUnmodified(ParDo.java:1344) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1306) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) 
    at com.google.cloud.bigtable.dataflow.example.HelloWorldWrite$1.processElement(HelloWorldWrite.java:71) 
Caused by: com.google.cloud.dataflow.sdk.util.IllegalMutationException: Value {"totalColumns":1,"row":"World","families":{"cf":[{"qualifier":"qualifier","vlen":24,"tag":[],"timestamp":1475158801718}]}} mutated illegally, new value was {"totalColumns":1,"row":"World","families":{"cf":[{"qualifier":"qualifier","vlen":24,"tag":[],"timestamp":1475158801752}]}}. Encoding was PRIFV29ybGQaNAoyCgJjZhIJcXVhbGlmaWVyGPC19OLitM8CIhh2YWx1ZV8xNC41NDc5NDY0NDY0NzMyNTY, now PRIFV29ybGQaNAoyCgJjZhIJcXVhbGlmaWVyGMC_9uLitM8CIhh2YWx1ZV8xNC41NDc5NDY0NDY0NzMyNTY. 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:167) 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:162) 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:113) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.verifyOutputUnmodified(ParDo.java:1338) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1306) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) 
    at com.google.cloud.bigtable.dataflow.example.HelloWorldWrite$1.processElement(HelloWorldWrite.java:71) 
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) 
    at com.google.cloud.bigtable.dataflow.example.HelloWorldWrite.main(HelloWorldWrite.java:113) 

回答

2

對不起你的煩惱。這是雲端Bigtable連接器中的一個錯誤。我們爲即將發佈的0.9.3版本解決了這個問題。

你有兩個選擇來解決這個問題:

  1. 升級到0.9.3快照。您需要this pom.xml的存儲庫部分。在發佈官方0.9.3版本之前,我們正在等待另外一次修復的確認。這應該有希望在下週初發生。
  2. 使用0.9.2並在所有Put.addColumn()調用上設置時間戳參數(System.currentTimeMillis())。
+0

@sduskis,謝謝,0.9.3上的ETA是什麼? – user1568967

+0

可能在下週初。我們正在進行額外的內部測試,並與有特定問題的客戶進行對話以確認修復。 –

+0

@sduskis,感謝信息 – user1568967