使用Flink和Java來使我們的推薦系統使用我們的邏輯。如何使用我的模型在Flink中進行分組
所以我有一個數據集:
[user] [item]
100 1
100 2
100 3
100 4
100 5
200 1
200 2
200 3
200 6
300 1
300 6
400 7
所以我都映射到一個元組:
DataSet<Tuple3<Long, Long, Integer>> csv = text.flatMap(new LineSplitter()).groupBy(0, 1).reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<Long, Long>> iterable, Collector<Tuple3<Long, Long, Integer>> collector) throws Exception {
Long customerId = 0L;
Long itemId = 0L;
Integer count = 0;
for (Tuple2<Long, Long> item : iterable) {
customerId = item.f0;
itemId = item.f1;
count = count + 1;
}
collector.collect(new Tuple3<>(customerId, itemId, count));
}
});
後我得到的所有客戶和裏面的ArrayList項目:
DataSet<CustomerItems> customerItems = csv.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple3<Long, Long, Integer>, CustomerItems>() {
@Override
public void reduce(Iterable<Tuple3<Long, Long, Integer>> iterable, Collector<CustomerItems> collector) throws Exception {
ArrayList<Long> newItems = new ArrayList<>();
Long customerId = 0L;
for (Tuple3<Long, Long, Integer> item : iterable) {
customerId = item.f0;
newItems.add(item.f1);
}
collector.collect(new CustomerItems(customerId, newItems));
}
});
現在我需要獲得所有「類似」客戶。但嘗試了很多東西之後,沒有什麼用。
邏輯將是:
for ci : CustomerItems
c1 = c1.customerId
for ci2 : CustomerItems
c2 = ci2.cstomerId
if c1 != c2
if c2.getItems() have any item inside c1.getItems()
collector.collect(new Tuple2<c1, c2>)
我嘗試了使用減少,但我不能上迭代迭代兩個時間(內部循環迴路)。
任何人都可以幫助我嗎?
我做了,但結果是16個新項目。我把所有的代碼和結果放在這裏: https://gist.github.com/prsolucoes/b406ae98ea24120436954967e37103f6 –