2017-05-25 37 views
0

我所經歷的例子這裏提到:https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java獲取類轉換異常而左連接卡夫卡行旅

我產生輸入數據作爲JSON字符串。

對於主題 - 網頁瀏覽量

{"industry":"eng","user":"bob","page":"index.html"} 

對於主題 - 的UserProfiles

{"experience":"some","region":"europe"} 

我班的樣子:

import com.google.gson.Gson; 
import com.sohi.examples.dto.PageViews; 
import com.sohi.examples.dto.UserProfiles; 
import com.sohi.examples.dto.ViewsByRegion; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.Serde; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.KeyValue; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.*; 

import java.io.IOException; 
import java.util.Properties; 


public class PageViewRegionLambdaExample { 

    public static void main(String[] args) throws IOException { 

     final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092"; 
     final Properties streamsConfiguration = new Properties(); 

     streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "pageview-region-lambda-example"); 
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR 
     streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR 
     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 


     final Serde<String> stringSerde = Serdes.String();//NOSONAR 
     final Serde<Long> longSerde = Serdes.Long();//NOSONAR 

     final KStreamBuilder builder = new KStreamBuilder(); 

     final KStream<String, String> views = builder.stream("PageViews"); 

     final KStream<String, PageViews> viewsByUser = views 
       .map((dummy, record) -> new KeyValue<>(dummy, new Gson().fromJson(record, PageViews.class))) 
       .map((dummy, record) -> { 
          System.out.println(record); 
          return new KeyValue<>(record.getUser(), record); 
         } 
       ); 

     final KTable<String, String> userProfiles = builder.table("UserProfiles", "UserProfilesStore"); 
     final KTable<String, String> userRegions = 
       userProfiles 
         .mapValues(record -> new Gson().fromJson(record, UserProfiles.class).getRegion()); 


     final KTable<Windowed<String>, Long> viewsByRegion = 

       viewsByUser.leftJoin(userRegions, (view, region) -> { 
        ViewsByRegion viewRegion = new ViewsByRegion(); 
        viewRegion.setUser(view.getUser()); 
        viewRegion.setPage(view.getPage()); 
        viewRegion.setRegion(region); 
        return viewRegion; 
       }) 
         .map((user, viewRegion) -> new KeyValue<>(viewRegion.getRegion(), "")) 
         .groupByKey() 
         .count(TimeWindows.of(5 * 60 * 1000L).advanceBy(60 * 1000L), "GeoPageViewsStore"); 


     final KStream<String, Long> viewsByRegionForConsole = viewsByRegion 
       .toStream((windowedRegion, count) -> windowedRegion.toString()); 


     viewsByRegionForConsole.to(stringSerde, longSerde, "PageViewsByRegion"); 

     final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 

     streams.cleanUp(); 
     streams.start(); 

     // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams 
     Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 

    } 


} 

在運行這段代碼,我得到類轉換異常。

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PageViews, partition=0, offset=44 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) 
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer/value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String/value type: com.sohi.examples.dto.PageViews). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. 
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) 
    ... 2 more 
Caused by: java.lang.ClassCastException: com.sohi.examples.dto.PageViews cannot be cast to java.lang.String 
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) 
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) 
    ... 20 more 

Process finished with exit code 0 

我需要幫助來了解如果我創建一個自定義serde的應用程序。

回答

3

您的自定義Serde應該用於處理拓撲操作。在這種情況下,它是 - leftJoin,groupByKey。例如

PageViewsSerde pageViewsSerde = //instantiate your custom Serde impl 

viewsByUser.leftJoin(userRegions, (view, region) -> { 
        ViewsByRegion viewRegion = new ViewsByRegion(); 
        viewRegion.setUser(view.getUser()); 
        viewRegion.setPage(view.getPage()); 
        viewRegion.setRegion(region); 
        return viewRegion; 
       }, 
Serdes.String(), pageViewsSerde 
) 
+0

它解決了這個問題。非常感謝 。 – Sohi