您可以通過擴展SystemFactory
接口編寫自己的系統,工廠類和實現其三個抽象函數getConsumer
,getProducer
和getAdmin
。在每個功能中,以getConsumer
爲例,您需要創建一個系統客戶,另一個定製類的實例擴展爲SystemConsumer
並定義系統應如何使用。通過這樣做,您的Samza工作將知道如何在需要時獲得系統的admin/consumer/producer
。
實施例(Scala中):
class YourSystemFactory extends SystemFactory {
override def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
new YourSystemConsumer(
getAdmin(systemName, config).asInstanceOf[YourSystemAdmin],
config.get("someParam"))
}
override def getAdmin(systemName: String, config: Config): SystemAdmin = {
new YourSystemAdmin(
config.get("someParam"),
config.get("someOtherParam"))
)
}
override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
new YourSystemProducer(
getAdmin(systemName, config).asInstanceOf[YourSystemAdmin],
config.get("someParam"))
}
}
在你的配置:
# Your system params
systems.your.samza.factory=your.package.YourSystemFactory
systems.your.consumer.param=value
systems.your.producer.param=value