2016-03-01 21 views
0

使用下面的配置我能夠samza連接到卡夫卡經紀人如何samza連接到其他系統,以及如何編寫systemFactory類

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory 
systems.kafka.samza.msg.serde=json 
systems.kafka.consumer.zookeeper.connect=localhost:2181/ 
systems.kafka.producer.bootstrap.servers=localhost:9092 

但我有關於SystemFactory類的一些疑慮。如何編寫我們自己的systemfactory類?和SystemFactoryClass的目的是什麼?請給我一些想法

回答

0

你不需要實現你的KafkaSystemFactory。你剛纔實現StreamTask

例子:

public class MyTaskClass implements StreamTask { 

    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { 
    // process message 
    } 
} 

配置:

# This is the class above, which Samza will instantiate when the job is run 
task.class=com.example.samza.MyTaskClass 

# Define a system called "kafka" (you can give it any name, and you can define 
# multiple systems if you want to process messages from different sources) 
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory 

# The job consumes a topic called "PageViewEvent" from the "kafka" system 
task.inputs=kafka.PageViewEvent 

# Define a serializer/deserializer called "json" which parses JSON messages 
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory 

# Use the "json" serializer for messages in the "PageViewEvent" topic 
systems.kafka.streams.PageViewEvent.samza.msg.serde=json 

欲瞭解更多信息:Documentation

3

您可以通過擴展SystemFactory接口編寫自己的系統,工廠類和實現其三個抽象函數getConsumer,getProducergetAdmin。在每個功能中,以getConsumer爲例,您需要創建一個系統客戶,另一個定製類的實例擴展爲SystemConsumer並定義系統應如何使用。通過這樣做,您的Samza工作將知道如何在需要時獲得系統的admin/consumer/producer

實施例(Scala中):

class YourSystemFactory extends SystemFactory { 
    override def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = { 
    new YourSystemConsumer(
     getAdmin(systemName, config).asInstanceOf[YourSystemAdmin], 
     config.get("someParam")) 
    } 

    override def getAdmin(systemName: String, config: Config): SystemAdmin = { 
    new YourSystemAdmin(
     config.get("someParam"), 
     config.get("someOtherParam")) 
    ) 
    } 

    override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { 
    new YourSystemProducer(
     getAdmin(systemName, config).asInstanceOf[YourSystemAdmin], 
     config.get("someParam")) 
    } 
} 

在你的配置:

# Your system params 
systems.your.samza.factory=your.package.YourSystemFactory 
systems.your.consumer.param=value 
systems.your.producer.param=value 
相關問題