2016-05-24 32 views
1

我試圖將來自Sample Stream的推文存儲到數據庫中,並同時存儲原始json。我在hbc Github存儲庫中的example之後使用Twitter4jStatusClient。由於我只是實時將一部分信息存儲到數據庫中,因此我希望能夠存儲推文的原始json,以便在需要時可以檢索其他信息。然而,使用Twitter4jStatusClient意味着偵聽器在不同的線程上執行,並且在here中,它表示爲了獲取json對象,它必須從檢索json對象的相同線程執行。使用Twitter4JStatusClient時,有沒有方法可以保存json字符串?我選擇不使用這個example,因爲我只想執行某些操作並保存json字符串(如果它是狀態的話)。謝謝!用hbc保存原始json字符串Twitter4jStatusClient

// Create an appropriately sized blocking queue 
    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000); 

    // Define our endpoint: By default, delimited=length is set (we need this for our processor) 
    // and stall warnings are on. 
    StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); 
    // Specify the language filter for the endpoint 
    endpoint.addQueryParameter(Constants.LANGUAGE_PARAM, Joiner.on(',').join(Lists.newArrayList("en"))); 
    endpoint.stallWarnings(false); 

    Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret); 

    // Create a new BasicClient. By default gzip is enabled. 
    BasicClient client = new ClientBuilder() 
      .name("sampleStreamClient") 
      .hosts(Constants.STREAM_HOST) 
      .endpoint(endpoint) 
      .authentication(auth) 
      .processor(new StringDelimitedProcessor(queue)) 
      .build(); 

    // Create an executor service which will spawn threads to do the actual work of parsing the incoming messages and 
    // calling the listeners on each message 
    int numProcessingThreads = 4; 
    ExecutorService service = Executors.newFixedThreadPool(numProcessingThreads); 


    StatusListener listener = new SampleStreamStatusListener(jsonInserter); 

    // Wrap our BasicClient with the twitter4j client 
    t4jClient = new Twitter4jStatusClient(
      client, queue, Lists.newArrayList(listener), service); 

回答

0

我也有類似的問題,Twitter4jStatusClient,這裏有一些想法

中間隊列

你可以有一個單獨的線程池讀取您的queue變量的原始信息,將它們存儲在某個地方,然後將它們放入一個新的隊列中,我們將調用hbcQueue,您將其傳入構造函數Twitter4jStatusClient而不是queue

BlockingQueue<String> hbcQueue = new LinkedBlockingQueue<>(10000); 
ExecutorService rawJsonSaver = Executors.newFixedThreadPool(numProcessingThreads); 
for (int i = 0; i < numProcessingThreads; i++) { 
    rawJsonSaver.execute(() -> { 
    for (;;) { 
     try { 
     String msg = queue.take(); 
     JSONObject jobj = new JSONObject(msg); 
     if (JSONObjectType.determine(jobj) == JSONObjectType.Type.STATUS) { 
      System.out.println(msg); // Save it 
      hbcQueue.add(msg); 
     } 
     } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); break; 
     } catch (JSONException e) { 
     continue; 
     } 
    } 
    }); 
} 
Twitter4jStatusClient t4jClient = new Twitter4jStatusClient(
    client, hbcQueue, Lists.newArrayList(listener), service); 

但是,這當然會帶來性能上的劣勢:第二次解析JSON併爲第二個併發隊列添加另一個阻塞鎖定操作。

重新序列

如果你打算以後再處理原始JSON在Java中,你可以使用純Java序列化,因爲傳遞給你的StatusListenerStatus對象實現Serializable。這並不是將它重新序列化回JSON,但至少你不需要手動序列化每個字段。

@Override 
    public void onStatus(final Status status) { 
    byte[] serializedStatus; 
    try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); 
     ObjectOutputStream objStream = new ObjectOutputStream(byteStream)) { 
     objStream.writeObject(status); 
     serializedStatus = byteStream.toByteArray(); 
    } catch (IOException e) { 
     throw new RuntimeException(e); 
    } 
    // store serializedStatus 
    // . . . 
    } 
相關問題