2017-10-17 63 views
0

我嘗試使用Apache束執行管線,但試圖把一些輸出標籤時,我得到一個錯誤:阿帕奇梁 - 無法推斷在DOFN編碼器與多輸出標籤

import com.google.cloud.Tuple; 
import com.google.gson.Gson; 
import com.google.gson.reflect.TypeToken; 
import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; 
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.transforms.DoFn; 
import org.apache.beam.sdk.transforms.ParDo; 
import org.apache.beam.sdk.transforms.windowing.FixedWindows; 
import org.apache.beam.sdk.transforms.windowing.Window; 
import org.apache.beam.sdk.values.TupleTag; 
import org.apache.beam.sdk.values.TupleTagList; 
import org.joda.time.Duration; 

import java.lang.reflect.Type; 
import java.util.Map; 
import java.util.stream.Collectors; 

/** 
* The Transformer. 
*/ 
class Transformer { 
    final static TupleTag<Map<String, String>> successfulTransformation = new TupleTag<>(); 
    final static TupleTag<Tuple<String, String>> failedTransformation = new TupleTag<>(); 

    /** 
    * The entry point of the application. 
    * 
    * @param args the input arguments 
    */ 
    public static void main(String... args) { 
     TransformerOptions options = PipelineOptionsFactory.fromArgs(args) 
       .withValidation() 
       .as(TransformerOptions.class); 

     Pipeline p = Pipeline.create(options); 

     p.apply("Input", PubsubIO 
       .readMessagesWithAttributes() 
       .withIdAttribute("id") 
       .fromTopic(options.getTopicName())) 
       .apply(Window.<PubsubMessage>into(FixedWindows 
         .of(Duration.standardSeconds(60)))) 
       .apply("Transform", 
         ParDo.of(new JsonTransformer()) 
           .withOutputTags(successfulTransformation, 
             TupleTagList.of(failedTransformation))); 

     p.run().waitUntilFinish(); 
    } 

    /** 
    * Deserialize the input and convert it to a key-value pairs map. 
    */ 
    static class JsonTransformer extends DoFn<PubsubMessage, Map<String, String>> { 

     /** 
     * Process each element. 
     * 
     * @param c the processing context 
     */ 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      String messagePayload = new String(c.element().getPayload()); 
      try { 
       Type type = new TypeToken<Map<String, String>>() { 
       }.getType(); 
       Gson gson = new Gson(); 
       Map<String, String> map = gson.fromJson(messagePayload, type); 
       c.output(map); 
      } catch (Exception e) { 
       LOG.error("Failed to process input {} -- adding to dead letter file", c.element(), e); 
       String attributes = c.element() 
         .getAttributeMap() 
         .entrySet().stream().map((entry) -> 
           String.format("%s -> %s\n", entry.getKey(), entry.getValue())) 
         .collect(Collectors.joining()); 
       c.output(failedTransformation, Tuple.of(attributes, messagePayload)); 
      } 

     } 
    } 
} 

錯誤所示出的是:

異常在線程 「主要」 java.lang.IllegalStateException:無法 換取Transform.out1 [PCollection]默認編碼器。更正以下根本原因之一 :未手動指定編碼器; 你可以使用.setCoder()。推測 CoderRegistry中的編碼器失敗:無法爲V提供編碼器。使用已註冊的CoderProvider創建 編碼器失敗。有關詳細故障,請參閱抑制 例外。使用來自 的默認輸出編碼器,產生PTransform失敗:無法爲V提供編碼器。 使用註冊的編碼器提供程序構建編碼器失敗。

我試過不同的方法來解決這個問題,但我想我只是不明白是什麼問題。我知道,這些線路會發生錯誤:

.withOutputTags(successfulTransformation,TupleTagList.of(failedTransformation)) 

,但我不明白的是哪一部分,在誤差哪些部分需要特定的編碼器,什麼是「V」(從「無法提供編碼器V「)。

爲什麼會發生錯誤?我也試着看看Apache Beam的文檔,但他們似乎沒有解釋這種用法,也不瞭解關於編碼器的部分。

感謝

回答

2

首先,我建議如下 - 變化:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<>(); 
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<>(); 

到這一點:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<Map<String, String>>() {}; 
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<Tuple<String, String>>() {}; 

這將有助於該編碼器推斷確定側輸出的類型。另外,你有沒有正確註冊CoderProviderTuple

+0

在當前階段,更改似乎沒有做任何事情。不,我沒有爲Tuple註冊一個CoderProvider,因爲我不清楚我應該這樣做的方式,您是否可以提供更多信息? –

+0

我用自己製作的類替換了'Tuple',它擴展了Serializable,你的答案解決了我的問題。爲什麼?我所做的和你所建議的有什麼不同?你也猜猜Google Cloud的'Tuple'爲什麼不擴展'Serializable'並且不能被繼承嗎? –

+1

我對Google Cloud的元組不熟悉。你能鏈接到它來自哪裏?一般來說,使用Serializable將比使用自定義編碼器效率更低,或者因爲Java序列化而導致類似Avro的效率。請參閱[註冊編碼器]的文檔(https://beam.apache.org/documentation/programming-guide/#data-encoding-and-type-safety)。 –