2017-03-07 98 views
3

我的任務很簡單 - 我想使用Apache NiFi在HBase(計數器)中增加列值。使用Apache NiFi的Hb​​ase計數器

我有一個作爲rowkey accountid,我想基於一個流值incr/decr平衡列。用NiFi做什麼是最好的方法。

例如帳戶A的餘額= 100的起始值。我將(A,-20)作爲事件。什麼是最好的開箱即用處理器來完成這項工作(餘額= 80)。似乎所有這些都將取代價值。我也開放改變我的模式...

我試着編寫groovy腳本,但在nifi中得到這個錯誤。如果我的基本結構是錯誤的,那只是一個簡單的問題。

2017年3月10日06:38:54067 ERROR [定時器驅動進程線程-6] oanifi.processors.script.ExecuteScript ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d] ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d]由於java.lang.NoClassDefFoundError而無法處理:org/apache/hadoop/conf/Configuration;回滾會話:

import org.apache.nifi.controller.ControllerService 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.TableName 
import org.apache.hadoop.hbase.client.Connection 
import org.apache.hadoop.hbase.client.ConnectionFactory 
import org.apache.hadoop.hbase.client.Get 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.client.Result 
import org.apache.hadoop.hbase.client.ResultScanner 
import org.apache.hadoop.hbase.client.Scan 
import org.apache.hadoop.hbase.client.Table 
import org.apache.hadoop.hbase.util.Bytes 

def lookup = context.controllerServiceLookup 
def HbaseServiceName =HBaseClient.value 
def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == HBaseServiceName 
} 
def conn = lookup.getControllerService(HBaseServiceId)?.getConnection() 
try { 
    flowFile = session.create() 
    def table = conn.getTable(TableName.valueOf("crap")) 
    myfile = flowFile.getAttribute("filename") 
    def p = new Put(Bytes.toBytes("crap")); 
    p.add(Bytes.toBytes("crap"), Bytes.toBytes("cf1"),Bytes.toBytes("SomeValue")) 
    table.put(p); 
    session.transfer(flowFile, REL_SUCCESS) 
} catch(e) { 
    log.error('Scripting error', e) 
    session.transfer(flowFile, REL_FAILURE) 
} 
conn?.close() 

回答

0

你是正確PutHBaseCellPutHBaseJSON把flowfile內容到相應的HBase的目的地。你可能想要做的是使用GetHBase來檢索初始值,使用計數器(教程請參閱here)保留一個運行計數器,然後用正確的值更新HBase單元。您還可以使用DistributedMapCache系統在共享內存空間中獲取/計算/存儲值。

+0

同意。 GetHbase問題(獲取當前值)並添加流式傳輸值,最後做PutHBase(更新) - 是2次操作還是數據回到客戶端(NiFi)。我喜歡增量範例,因爲它速度更快,我不太在意讀取當前值(我只是想增加它)。吞吐量對我來說是一個大問題。我在每個NiFi機器上運行phoenix服務器的想法並且從NiFi調用phoenix-jdbc客戶機。 –

+0

我沒有太多的HBase經驗。你可以使用'ExecuteScript'處理器在Groovy中做到這一點,將HBase庫包含在相關的模塊文件夾中?這裏有一個使用HBase'Increment'對象的[示例](https://github.com/larsgeorge/hbase-book/blob/master/ch04/src/main/java/client/IncrementMultipleExample.java) - 你應該能夠[從腳本引用HBase控制器服務](https://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html)。 – Andy

+0

感謝安迪這似乎符合我的要求(將嘗試),除了涉及做比我們所希望的更多的編碼:)。完成後將發佈解決方案。 –