我每天都在計算登錄到system.Data的每日唯一用戶的數量,通過kafka.We維護計數使用mapwithstate功能。每10秒計算數據發送到kafka.I需要在午夜每天重置這些數字。 有沒有辦法清除「mapwithState」變量中的所有數據?重置mapWithState每天的火花流
public class UserCounts {
private static JavaStreamingContext createContext(String[] args) {
String brokers = args[1];
String inputTopic = args[2];
String outputTopic = args[3];
String masterNode = args[4];
HashMap<String, String> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Create the context with a 3 second batch size
SparkConf sparkConf = new SparkConf().setAppName("UserCounts").setMaster(masterNode);
//sparkConf.set("spark.driver.bindAddress", "127.0.0.1");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(inputTopic.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
ssc.checkpoint(".");
// Create direct kafka stream with brokers and inputTopic
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = messages.map(Tuple2::_2);
JavaDStream<String> userIds = lines.map(x -> {
String array[] = x.split(",");
return array[3];
});
JavaPairDStream<String, Integer> usersDstream = userIds.mapToPair(s -> new Tuple2<>(s, 1));
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
(userId, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(userId, sum);
state.update(sum);
return output;
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
usersDstream.mapWithState(StateSpec.function(mappingFunc));
JavaPairDStream<String, Integer> stateSnapshotStream = stateDstream.stateSnapshots();
stateSnapshotStream.count().foreachRDD(rdd -> {
System.out.println("# events = " + rdd.count());
String date = String.valueOf(System.currentTimeMillis());
rdd.foreachPartition(partition -> {
KafkaProducer<String, String> producer = new KafkaProducer(props);
while (partition.hasNext()) {
Long value = partition.next();
System.out.println("data >>>>>>>>>>" + value.toString());
String data = "{\"timestamp\":\"" + date + "\",\"usersCount\":\"" + value.toString() + "\"}";
producer.send(new ProducerRecord<String, String>(outputTopic, null, data));
}
producer.close();
}
);
});
return ssc;
}
public static void main(String[] args) throws Exception {
String checkpointPath = args[0];
Function0<JavaStreamingContext> createContextFunc =() -> createContext(args);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointPath, createContextFunc);
ssc.start();
ssc.awaitTermination();
}
}
'mapWithState'是不走這裏的路。您需要能夠觸摸每批次的所有鑰匙才能在00:00重置鑰匙。你需要爲此使用'updateStateByKey'。 –
你可以請分享一些代碼示例,因爲我沒有得到如何在00:00觸發updatestate? –
它不會在'00:00'處觸發更新狀態,它會在您定義的每個批次間隔觸發'updateStateByKey'。你自己將需要檢查當前時間併爲所有鍵重置該狀態。 –