0
我有一個從spooldir上的文件讀取數據的過程過程將數據加載到MySQL數據庫中。將有多種類型的文件可以通過相同的flume流程進行處理。如何重置每個批次Flume的定製接收器類中的變量
我已經創建了一個自定義接收器Java類(擴展AbstractSink),它在初始/第一次讀取之後更新本地變量(sInterfaceType)以決定文件中的數據格式。 一旦文件處理完成,我必須重置它,以便它必須從識別下一批/接口文件開始。
我試着在stop()中做,但它沒有幫助。有人這樣做嗎?
我的接收器類看起來是這樣的:
public class MyFlumeSink2 extends AbstractSink implements Configurable {
private String sInterfaceType; //tells file format of current load
public MyFlumeSink2() {
//my initialization of variables
}
public void configure(Context context) {
//read context variables
}
public void start() {
//create db connection
}
@Override
public void stop() {
//destroy connection
sInterfaceType = ""; //This doesn't help me
super.stop();
}
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
if((sInterfaceType=="" || sInterfaceType==null))
{
//Read first line & set sInterfaceType
}else
//Insert data in MySQL
transaction.commit();
}
}
如何檢測(在接收器級別)文件是否已完成處理? – frb
我正在考慮在完成一個批處理或文件後調用stop()方法。但看起來情況並非如此。所以我不知道你的問題的答案。 – KiranM