4
我工作的一個項目,使用卡桑德拉1.2,Hadoop的1.2創建ColumnFamilyInputFormat的自定義InputFormat爲卡桑德拉
我已經創建了我的正常卡桑德拉映射器和減速,但我想創建自己的輸入格式類,它會從cassandra讀取記錄,我將通過使用分割和索引分割該值來獲得期望的列值,因此,我計劃創建自定義的Format類。但我很困惑,不知道,我會怎麼做?什麼課要擴展和實現,以及如何我將能夠去取行鍵,列名,列值等
我有我的Mapperclass如下:
public class MyMapper extends
Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, Text> {
private Text word = new Text();
MyJDBC db = new MyJDBC();
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
Context context) throws IOException, InterruptedException {
long std_id = Long.parseLong(ByteBufferUtil.string(key));
long newSavePoint = 0;
if (columns.values().isEmpty()) {
System.out.println("EMPTY ITERATOR");
sb.append("column_N/A" + ":" + "N/A" + " , ");
} else {
for (IColumn cell : columns.values()) {
name = ByteBufferUtil.string(cell.name());
String value = null;
if (name.contains("int")) {
value = String.valueOf(ByteBufferUtil.toInt(cell.value()));
} else {
value = ByteBufferUtil.string(cell.value());
}
String[] data = value.toString().split(",");
// if (data[0].equalsIgnoreCase("login")) {
Long[] dif = getDateDiffe(d1, d2);
// logics i want to perform inside my custominput class , rather here, i just want a simple mapper class
if (condition1 && condition2) {
myhits++;
sb.append(":\t " + data[0] + " " + data[2] + " "+ data[1] /* + " " + data[3] */+ "\n");
newSavePoint = d2;
}
}
sb.append("~" + like + "~" + newSavePoint + "~");
word.set(sb.toString().replace("\t", ""));
}
db.setInterval(Long.parseLong(ByteBufferUtil.string(key)), newSavePoint);
db.setHits(Long.parseLong(ByteBufferUtil.string(key)), like + "");
context.write(new Text(ByteBufferUtil.string(key)), word);
}
我想減少我的Mapper類邏輯,並且想對我的自定義輸入類執行相同的計算。
請幫幫忙,我想從stackies正r4esponse ...