我的任務很簡單 - 我想使用Apache NiFi在HBase(計數器)中增加列值。使用Apache NiFi的Hbase計數器
我有一個作爲rowkey accountid,我想基於一個流值incr/decr平衡列。用NiFi做什麼是最好的方法。
例如帳戶A的餘額= 100的起始值。我將(A,-20)作爲事件。什麼是最好的開箱即用處理器來完成這項工作(餘額= 80)。似乎所有這些都將取代價值。我也開放改變我的模式...
我試着編寫groovy腳本,但在nifi中得到這個錯誤。如果我的基本結構是錯誤的,那只是一個簡單的問題。
2017年3月10日06:38:54067 ERROR [定時器驅動進程線程-6] oanifi.processors.script.ExecuteScript ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d] ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d]由於java.lang.NoClassDefFoundError而無法處理:org/apache/hadoop/conf/Configuration;回滾會話:
import org.apache.nifi.controller.ControllerService
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.ResultScanner
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.util.Bytes
def lookup = context.controllerServiceLookup
def HbaseServiceName =HBaseClient.value
def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == HBaseServiceName
}
def conn = lookup.getControllerService(HBaseServiceId)?.getConnection()
try {
flowFile = session.create()
def table = conn.getTable(TableName.valueOf("crap"))
myfile = flowFile.getAttribute("filename")
def p = new Put(Bytes.toBytes("crap"));
p.add(Bytes.toBytes("crap"), Bytes.toBytes("cf1"),Bytes.toBytes("SomeValue"))
table.put(p);
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn?.close()
同意。 GetHbase問題(獲取當前值)並添加流式傳輸值,最後做PutHBase(更新) - 是2次操作還是數據回到客戶端(NiFi)。我喜歡增量範例,因爲它速度更快,我不太在意讀取當前值(我只是想增加它)。吞吐量對我來說是一個大問題。我在每個NiFi機器上運行phoenix服務器的想法並且從NiFi調用phoenix-jdbc客戶機。 –
我沒有太多的HBase經驗。你可以使用'ExecuteScript'處理器在Groovy中做到這一點,將HBase庫包含在相關的模塊文件夾中?這裏有一個使用HBase'Increment'對象的[示例](https://github.com/larsgeorge/hbase-book/blob/master/ch04/src/main/java/client/IncrementMultipleExample.java) - 你應該能夠[從腳本引用HBase控制器服務](https://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html)。 – Andy
感謝安迪這似乎符合我的要求(將嘗試),除了涉及做比我們所希望的更多的編碼:)。完成後將發佈解決方案。 –