2015-10-18 27 views
1

我有一個未綁定的數據流管道,它從Pub/Sub中讀取數據,應用ParDo並寫入Cassandra。它僅適用於ParDo轉換,所以即使源未被綁定,我仍使用缺省觸發的全局窗口。如何在雲數據流中保持與外部數據庫的連接

在這樣的管道中,我該如何保持與Cassandra的連接?

目前我保持它在startBundle這樣的:

private class CassandraWriter <T> extends DoFn<T, Void> { 
    private transient Cluster cluster; 
    private transient Session session; 
    private transient MappingManager mappingManager; 

    @Override 
    public void startBundle(Context c) { 
    this.cluster = Cluster.builder() 
     .addContactPoints(hosts) 
     .withPort(port) 
     .withoutMetrics() 
     .withoutJMXReporting() 
     .build(); 
    this.session = cluster.connect(keyspace); 
    this.mappingManager = new MappingManager(session); 
    } 

    @Override 
    public void processElement(ProcessContext c) throws IOException { 
    T element = c.element(); 
    Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass()); 
    mapper.save(element); 
    } 

    @Override 
    public void finishBundle(Context c) throws IOException { 
    session.close(); 
    cluster.close(); 
    } 
} 

然而,這種方式爲每個元素創建一個新的連接。

另一種選擇是在它傳遞https://github.com/benjumanji/cassandra-dataflow作爲側輸入像:

public PDone apply(PCollection<T> input) { 
    Pipeline p = input.getPipeline(); 

    CassandraWriteOperation<T> op = new CassandraWriteOperation<T>(this); 

    Coder<CassandraWriteOperation<T>> coder = 
    (Coder<CassandraWriteOperation<T>>)SerializableCoder.of(op.getClass()); 

    PCollection<CassandraWriteOperation<T>> opSingleton = 
    p.apply(Create.<CassandraWriteOperation<T>>of(op)).setCoder(coder); 

    final PCollectionView<CassandraWriteOperation<T>> opSingletonView = 
    opSingleton.apply(View.<CassandraWriteOperation<T>>asSingleton()); 

    PCollection<Void> results = input.apply(ParDo.of(new DoFn<T, Void>() { 
    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     // use the side input here 
    } 
    }).withSideInputs(opSingletonView)); 

    PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable()); 

    opSingleton.apply(ParDo.of(new DoFn<CassandraWriteOperation<T>, Void>() { 
    private static final long serialVersionUID = 0; 

    @Override 
    public void processElement(ProcessContext c) { 
     CassandraWriteOperation<T> op = c.element(); 
     op.finalize(); 
    } 

    }).withSideInputs(voidView)); 

    return new PDone(); 
} 

然而這樣我必須使用加窗因爲PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());由適用的基團。

一般來說,從無界PCollection寫入外部數據庫的PTransform應該如何保持與數據庫的連接?

回答

2

您正確地觀察到,與批處理/有界情況相比,流/無界情況下的典型捆綁包大小更小。實際的捆綁包大小取決於許多參數,有時捆綁包可能包含單個元素。解決這個問題的

一種方法是使用每個工人的連接池,存儲在您的DoFn的靜止狀態。您應該能夠在第一次調用startBundle期間對其進行初始化,並在捆綁包中使用它。或者,您可以按需創建連接,並在不再需要時將其釋放到池中以供重用。

您應該確保靜態靜態是線程安全的,並且您沒有假設Dataflow如何管理包。

+0

謝謝。我正在考慮靜態,但我有點不情願使用它,因爲它不是[通過附加(帶外)數據]的四種記錄方式之一(https://cloud.google.com/dataflow/FAQ#帕爾-附加數據)。 –

0

正如Davor Bonaci所說,使用靜態變量解決了這個問題。

public class CassandraWriter<T> extends DoFn<T, Void> { 
    private static final Logger log = LoggerFactory.getLogger(CassandraWriter.class); 

    // Prevent multiple threads from creating multiple cluster connection in parallel. 
    private static transient final Object lock = new Object(); 
    private static transient Cluster cluster; 
    private static transient Session session; 
    private static transient MappingManager mappingManager; 

    private final String[] hosts; 
    private final int port; 
    private final String keyspace; 

    public CassandraWriter(String[] hosts, int port, String keyspace) { 
    this.hosts = hosts; 
    this.port = port; 
    this.keyspace = keyspace; 
    } 

    @Override 
    public void startBundle(Context c) { 
    synchronized (lock) { 
     if (cluster == null) { 
     cluster = Cluster.builder() 
      .addContactPoints(hosts) 
      .withPort(port) 
      .withoutMetrics() 
      .withoutJMXReporting() 
      .build(); 
     session = cluster.connect(keyspace); 
     mappingManager = new MappingManager(session); 
     } 
    } 
    } 

    @Override 
    public void processElement(ProcessContext c) throws IOException { 
    T element = c.element(); 
    Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass()); 
    mapper.save(element); 
    } 
}