我正在學習關於Apache Cassandra 3.x.x的所有內容,我正在嘗試開發一些東西來玩。問題是,我想將數據存儲到包含這些列的表卡桑德拉:將Cassandra的UnixTimestamp轉換爲TIMEUUID
id (UUID - Primary Key) | Message (TEXT) | REQ_Timestamp (TIMEUUID) | Now_Timestamp (TIMEUUID)
REQ_Timestamp具有當郵件離開客戶端在前端電平的時間。另一方面,Now_Timestamp是消息最終存儲在Cassandra中的時間。我需要兩個時間戳,因爲我想測量處理來自其原始請求所需的時間,直到數據安全存儲爲止。
創建Now_Timestamp很簡單,我只是使用now()函數並自動生成TIMEUUID。 REQ_Timestamp出現問題。我該如何將Unix Timestamp轉換爲TIMEUUID以便Cassandra可以存儲它?這甚至有可能嗎?
我後端的體系結構是這樣的:我從前端獲取JSON中的數據到處理它並將其存儲在Kafka中的Web服務。然後,Spark Streaming作業將該Kafka日誌放入Cassandra中。
這是我的WebService,將數據放入卡夫卡。
@Path("/")
public class MemoIn {
@POST
@Path("/in")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.TEXT_PLAIN)
public Response goInKafka(InputStream incomingData){
StringBuilder bld = new StringBuilder();
try {
BufferedReader in = new BufferedReader(new InputStreamReader(incomingData));
String line = null;
while ((line = in.readLine()) != null) {
bld.append(line);
}
} catch (Exception e) {
System.out.println("Error Parsing: - ");
}
System.out.println("Data Received: " + bld.toString());
JSONObject obj = new JSONObject(bld.toString());
String line = obj.getString("id_memo") + "|" + obj.getString("id_writer") +
"|" + obj.getString("id_diseased")
+ "|" + obj.getString("memo") + "|" + obj.getLong("req_timestamp");
try {
KafkaLogWriter.addToLog(line);
} catch (Exception e) {
e.printStackTrace();
}
return Response.status(200).entity(line).build();
}
}
這裏是我的卡夫卡作家
package main.java.vcemetery.webservice;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
public class KafkaLogWriter {
public static void addToLog(String memo)throws Exception {
// private static Scanner in;
String topicName = "MemosLog";
/*
First, we set the properties of the Kafka Log
*/
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// We create the producer
Producer<String, String> producer = new KafkaProducer<>(props);
// We send the line into the producer
producer.send(new ProducerRecord<>(topicName, memo));
// We close the producer
producer.close();
}
}
最後這裏就是我有我的星火流工作
public class MemoStream {
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
Logger.getLogger("akka").setLevel(Level.ERROR);
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkExample").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
/* Se crea un array con los tópicos a consultar, en este caso solamente un tópico */
Collection<String> topics = Arrays.asList("MemosLog");
final JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
// Split each bucket of kafka data into memos a splitable stream
JavaDStream<String> stream = kafkaStream.map(record -> (record.value().toString()));
// Then, we split each stream into lines or memos
JavaDStream<String> memos = stream.flatMap(x -> Arrays.asList(x.split("\n")).iterator());
/*
To split each memo into sections of ids and messages, we have to use the code \\ plus the character
*/
JavaDStream<String> sections = memos.flatMap(y -> Arrays.asList(y.split("\\|")).iterator());
sections.print();
sections.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
//We establish the connection with Cassandra
Cluster cluster = null;
try {
cluster = Cluster.builder()
.withClusterName("VCemeteryMemos") // ClusterName
.addContactPoint("127.0.0.1") // Host IP
.build();
} finally {
if (cluster != null) cluster.close();
}
while(partitionOfRecords.hasNext()){
}
});
});
ssc.start();
ssc.awaitTermination();
}
}
預先感謝您。
當我問這個問題時,我實際上是在引用同一個文檔。關於如何在客戶端進行轉換的任何想法?我被困在這裏。 –
取決於您使用的客戶端。它是datastax javadriver嗎?也許你可以展示你的一些代碼,你已經在做什麼。 –
我的後端架構是這樣的:我從前端獲取JSON中的數據到一個處理它並將其存儲在Kafka中的Web服務。然後,Spark Streaming作業將該Kafka日誌放入Cassandra中。我將使用迄今爲止編寫的WebService/Kafka代碼和Spark代碼編輯我的原始文章。 –