我有一個未綁定的數據流管道,它從Pub/Sub中讀取數據,應用ParDo並寫入Cassandra。它僅適用於ParDo轉換,所以即使源未被綁定,我仍使用缺省觸發的全局窗口。如何在雲數據流中保持與外部數據庫的連接
在這樣的管道中,我該如何保持與Cassandra的連接?
目前我保持它在startBundle
這樣的:
private class CassandraWriter <T> extends DoFn<T, Void> {
private transient Cluster cluster;
private transient Session session;
private transient MappingManager mappingManager;
@Override
public void startBundle(Context c) {
this.cluster = Cluster.builder()
.addContactPoints(hosts)
.withPort(port)
.withoutMetrics()
.withoutJMXReporting()
.build();
this.session = cluster.connect(keyspace);
this.mappingManager = new MappingManager(session);
}
@Override
public void processElement(ProcessContext c) throws IOException {
T element = c.element();
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
mapper.save(element);
}
@Override
public void finishBundle(Context c) throws IOException {
session.close();
cluster.close();
}
}
然而,這種方式爲每個元素創建一個新的連接。
另一種選擇是在它傳遞https://github.com/benjumanji/cassandra-dataflow作爲側輸入像:
public PDone apply(PCollection<T> input) {
Pipeline p = input.getPipeline();
CassandraWriteOperation<T> op = new CassandraWriteOperation<T>(this);
Coder<CassandraWriteOperation<T>> coder =
(Coder<CassandraWriteOperation<T>>)SerializableCoder.of(op.getClass());
PCollection<CassandraWriteOperation<T>> opSingleton =
p.apply(Create.<CassandraWriteOperation<T>>of(op)).setCoder(coder);
final PCollectionView<CassandraWriteOperation<T>> opSingletonView =
opSingleton.apply(View.<CassandraWriteOperation<T>>asSingleton());
PCollection<Void> results = input.apply(ParDo.of(new DoFn<T, Void>() {
@Override
public void processElement(ProcessContext c) throws Exception {
// use the side input here
}
}).withSideInputs(opSingletonView));
PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
opSingleton.apply(ParDo.of(new DoFn<CassandraWriteOperation<T>, Void>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
CassandraWriteOperation<T> op = c.element();
op.finalize();
}
}).withSideInputs(voidView));
return new PDone();
}
然而這樣我必須使用加窗因爲PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
由適用的基團。
一般來說,從無界PCollection寫入外部數據庫的PTransform應該如何保持與數據庫的連接?
謝謝。我正在考慮靜態,但我有點不情願使用它,因爲它不是[通過附加(帶外)數據]的四種記錄方式之一(https://cloud.google.com/dataflow/FAQ#帕爾-附加數據)。 –