從[email protected]轉發如何使用Apache Apex對Kafka 0.9運算符進行單元測試?
我想使用支持0.9版本協議的新Kafka運算符運行單元測試代碼。
在這個過程中,我包含了Malhar-Kafka庫版本(3.3.1-incubating),並使用Apex-engine(版本3.3.0)作爲測試/提供。
編譯工作正常,但我的單元測試無法正確運行「java.lang.ClassNotFoundException:com.datatorrent.lib.util.KryoCloneUtils」異常。
運行與Apex引擎集成的Kafka 0.9運算符的單元測試的推薦方式是什麼?我假設Malhar-的contrib庫卡夫卡運營商是不是符合0.9 ..
單元測試代碼是這樣的:
類CassandraEventDetailsStreamingApp擴展AbstractKafkaInputOperator中的代碼下面的代碼片段。
lma.getController()方法出現異常。
@Test
public void testApplication() throws IOException, Exception {
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/dag-test-props.xml"));
lma.prepareDAG(new CassandraEventDetailsStreamingApp(), conf);
LocalMode.Controller lc = lma.getController();
lc.run();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}