2017-09-14 112 views
1

我正在學習關於Apache Cassandra 3.x.x的所有內容,我正在嘗試開發一些東西來玩。問題是,我想將數據存儲到包含這些列的表卡桑德拉:將Cassandra的UnixTimestamp轉換爲TIMEUUID

id (UUID - Primary Key) | Message (TEXT) | REQ_Timestamp (TIMEUUID) | Now_Timestamp (TIMEUUID) 

REQ_Timestamp具有當郵件離開客戶端在前端電平的時間。另一方面,Now_Timestamp是消息最終存儲在Cassandra中的時間。我需要兩個時間戳,因爲我想測量處理來自其原始請求所需的時間,直到數據安全存儲爲止。

創建Now_Timestamp很簡單,我只是使用now()函數並自動生成TIMEUUID。 REQ_Timestamp出現問題。我該如何將Unix Timestamp轉換爲TIMEUUID以便Cassandra可以存儲它?這甚至有可能嗎?

我後端的體系結構是這樣的:我從前端獲取JSON中的數據到處理它並將其存儲在Kafka中的Web服務。然後,Spark Streaming作業將該Kafka日誌放入Cassandra中。

這是我的WebService,將數據放入卡夫卡。

@Path("/") 
public class MemoIn { 

    @POST 
    @Path("/in") 
    @Consumes(MediaType.APPLICATION_JSON) 
    @Produces(MediaType.TEXT_PLAIN) 
    public Response goInKafka(InputStream incomingData){ 
     StringBuilder bld = new StringBuilder(); 
     try { 
      BufferedReader in = new BufferedReader(new InputStreamReader(incomingData)); 
      String line = null; 
      while ((line = in.readLine()) != null) { 
       bld.append(line); 
      } 
     } catch (Exception e) { 
      System.out.println("Error Parsing: - "); 
     } 
     System.out.println("Data Received: " + bld.toString()); 

     JSONObject obj = new JSONObject(bld.toString()); 
     String line = obj.getString("id_memo") + "|" + obj.getString("id_writer") + 
           "|" + obj.getString("id_diseased") 
           + "|" + obj.getString("memo") + "|" + obj.getLong("req_timestamp"); 

     try { 
      KafkaLogWriter.addToLog(line); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     return Response.status(200).entity(line).build(); 
    } 


} 

這裏是我的卡夫卡作家

package main.java.vcemetery.webservice; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer; 

public class KafkaLogWriter { 

    public static void addToLog(String memo)throws Exception { 
     // private static Scanner in; 
      String topicName = "MemosLog"; 

      /* 
      First, we set the properties of the Kafka Log 
      */ 
      Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("acks", "all"); 
      props.put("retries", 0); 
      props.put("batch.size", 16384); 
      props.put("linger.ms", 1); 
      props.put("buffer.memory", 33554432); 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

      // We create the producer 
      Producer<String, String> producer = new KafkaProducer<>(props); 
      // We send the line into the producer 
      producer.send(new ProducerRecord<>(topicName, memo)); 
      // We close the producer 
      producer.close(); 

    } 
} 

最後這裏就是我有我的星火流工作

public class MemoStream { 

    public static void main(String[] args) throws Exception { 
     Logger.getLogger("org").setLevel(Level.ERROR); 
     Logger.getLogger("akka").setLevel(Level.ERROR); 

     // Create the context with a 1 second batch size 
     SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkExample").setMaster("local[2]"); 
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); 

     Map<String, Object> kafkaParams = new HashMap<>(); 
     kafkaParams.put("bootstrap.servers", "localhost:9092"); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", StringDeserializer.class); 
     kafkaParams.put("group.id", "group1"); 
     kafkaParams.put("auto.offset.reset", "latest"); 
     kafkaParams.put("enable.auto.commit", false); 

     /* Se crea un array con los tópicos a consultar, en este caso solamente un tópico */ 
     Collection<String> topics = Arrays.asList("MemosLog"); 

     final JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = 
       KafkaUtils.createDirectStream(
         ssc, 
         LocationStrategies.PreferConsistent(), 
         ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
       ); 

     kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value())); 
     // Split each bucket of kafka data into memos a splitable stream 
     JavaDStream<String> stream = kafkaStream.map(record -> (record.value().toString())); 
     // Then, we split each stream into lines or memos 
     JavaDStream<String> memos = stream.flatMap(x -> Arrays.asList(x.split("\n")).iterator()); 
     /* 
     To split each memo into sections of ids and messages, we have to use the code \\ plus the character 
      */ 
     JavaDStream<String> sections = memos.flatMap(y -> Arrays.asList(y.split("\\|")).iterator()); 
     sections.print(); 
     sections.foreachRDD(rdd -> { 
      rdd.foreachPartition(partitionOfRecords -> { 
       //We establish the connection with Cassandra 
       Cluster cluster = null; 
       try { 
        cluster = Cluster.builder() 
          .withClusterName("VCemeteryMemos") // ClusterName 
          .addContactPoint("127.0.0.1") // Host IP 
          .build(); 

       } finally { 
        if (cluster != null) cluster.close(); 
       } 
       while(partitionOfRecords.hasNext()){ 


       } 
      }); 
     }); 

     ssc.start(); 
     ssc.awaitTermination(); 

    } 
} 

預先感謝您。

回答

1

Cassandra無法將轉換爲UNIX時間戳。你必須在客戶端進行轉換。

Ref:https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html

+0

當我問這個問題時,我實際上是在引用同一個文檔。關於如何在客戶端進行轉換的任何想法?我被困在這裏。 –

+0

取決於您使用的客戶端。它是datastax javadriver嗎?也許你可以展示你的一些代碼,你已經在做什麼。 –

+0

我的後端架構是這樣的:我從前端獲取JSON中的數據到一個處理它並將其存儲在Kafka中的Web服務。然後,Spark Streaming作業將該Kafka日誌放入Cassandra中。我將使用迄今爲止編寫的WebService/Kafka代碼和Spark代碼編輯我的原始文章。 –

相關問題