2016-01-13 29 views
0

這是我的查詢,如果當前數據ID在卡桑德拉數據庫存在或不存在:在卡夫卡爲什麼我的數據插入到我的Cassandra數據庫中有時會穩定並且有時很慢?

row = session.execute("SELECT * FROM articles where id = %s", [id]) 

解決消息,則確定在卡桑德拉數據庫中是否存在該消息,如果它不存在,那麼它應該執行一個插入操作,如果它存在的話,它不應該被插入到數據中。

messages = consumer.get_messages(count=25) 

if len(messages) == 0: 
    print 'IDLE' 
    sleep(1) 
    continue 

for message in messages: 
    try: 
     message = json.loads(message.message.value) 
     data = message['data'] 
     if data: 
      for article in data: 
       source = article['source'] 
       id = article['id'] 
       title = article['title'] 
       thumbnail = article['thumbnail'] 
       #url = article['url'] 
       text = article['text'] 
       print article['created_at'],type(article['created_at']) 
       created_at = parse(article['created_at']) 
       last_crawled = article['last_crawled'] 
       channel = article['channel']#userid 
       category = article['category'] 
       #scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0) 
       scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0) 
       row = session.execute("SELECT * FROM articles where id = %s", [id]) 
       if len(list(row))==0: 
       #id parse base62 
        ids = [id[0:2],id[2:9],id[9:16]] 
        idstr='' 
        for argv in ids: 
         num = int(argv) 
         idstr=idstr+encode(num) 
        url='http://weibo.com/%s/%s?type=comment' % (channel,idstr) 
        session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category)) 
        session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id)) 
        log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category)) 

    except Exception, e: 
     log.exception(e) 
     #log.info('error %s %s' % (message['url'],body)) 
     print e 
     continue 

我有一個ID只有一個獨特的錶行,我想要這樣。只要爲唯一ID添加不同的scheduled_for次數,我的系統就會崩潰。加上這個if len(list(row))==0:是正確的想法,但我的系統在此之後非常緩慢。

這是我的表說明:

DROP TABLE IF EXISTS schedules; 

CREATE TABLE schedules (
source text, 
type text, 
scheduled_for timestamp, 
id text, 
PRIMARY KEY (source, type, scheduled_for, id) 
); 

這scheduled_for是可變的。這裏也是一個具體的例子:

Hao article 2016-01-12 02:09:00+0800 3930462206848285 
Hao article 2016-01-12 03:09:00+0801 3930462206848285 
Hao article 2016-01-12 04:09:00+0802 3930462206848285 
Hao article 2016-01-12 05:09:00+0803 3930462206848285 

這裏是我的文章CQL模式:

CREATE TABLE crawler.articles (
    source text, 
    created_at timestamp, 
    id text, 
    category text, 
    channel text, 
    last_crawled timestamp, 
    text text, 
    thumbnail text, 
    title text, 
    url text, 
    PRIMARY KEY (source, created_at, id) 
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC) 
AND bloom_filter_fp_chance = 0.01 
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}' 
AND comment = '' 
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'} 
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} 
AND dclocal_read_repair_chance = 0.1 
AND default_time_to_live = 604800 
AND gc_grace_seconds = 864000 
AND max_index_interval = 2048 
AND memtable_flush_period_in_ms = 0 
AND min_index_interval = 128 
AND read_repair_chance = 0.0 
AND speculative_retry = '99.0PERCENTILE'; 

CREATE INDEX articles_id_idx ON crawler.articles (id); 
CREATE INDEX articles_url_idx ON crawler.articles (url); 
+1

你可以提供你的表格模式的文章?這樣我們可以看到主鍵和所有細節。 –

+0

我在上面的文章中添加了文章計劃。感謝您的回覆! – peter

回答

1

在您的架構和方式看你使用它,我可以假設在ID字段二級索引創建的問題並減慢查詢速度。你可以查看更多的細節,爲什麼二級索引在很多地方只是使用谷歌搜索不好(這個source是一個好開始,也是DataStax documentation page)。基本上,當您在5節點集羣中使用二級索引時,必須擊中每個節點以查找要查找的項目,並且在使用主鍵時,每個節點都知道哪個節點存儲數據。

如果您使用高基數數據(添加更多項目時性能下降),並且您使用的ID對每篇文章都不相同,則二級索引尤其不好。當你使用低基數時,例如按星期幾指數(如果你的類別數量有限,你可以知道一週只有7天,所以你可以預測指數表的大小)或類別。 。

我會建議創建一個表,article_by_id這將是您的文章表的反向索引。如果操作返回true(意味着插入過去,因此以前沒有出現過此記錄),則可以對articles表執行常規INSERT操作,如果它返回false(表示數據未插入,因爲它已經存在),你可以跳過INSERT到articles表。

這裏是表(我建議,而不是使用文本ID UUID,但我創建的表根據您的文章表):

CREATE TABLE article_by_id (
    id text, 
    source text, 
    created_at timestamp, 
    PRIMARY KEY (id) 
) WITH comment = 'Article by id.'; 

這樣,你總是可以找到根據您的鑰匙的所有部分只是ID。如果ID是您的輸入參數,則從此表中選擇將爲您提供source和created_at。

這裏是插入查詢將返回true或false:

INSERT INTO article_by_id(id, source, created_at) VALUES (%s,%s, %s) IF NOT EXISTS; 

而更多的小費,如果你能找到根據您的實體部分非可變數據比鍵你不需要第二個表。例如,如果source和created_at唯一標識系統中的文章並且永遠不會更改,則可以刪除標識並使用原始表。

+0

偉大的答案謝謝。你認爲什麼是最好的文章主鍵? – peter

+1

您可以將它保留在文章表(PK源,然後通過created_at和id結尾以確保唯一性)。此表非常適合範圍查詢,並且您可以將多個數據源中的數據分佈到多個分區中,從而防止長行和熱數據。我只是建議使用附加的ID表來更容易地找到它。否則,如果每篇文章的URL唯一,則可以完全忽略ID,並使用URL而不是ID。然後在所有地方將ID更改爲URL。 –

+0

也是另一個問題,也許你可以幫助:如果我們有下面的表結構,我怎麼能通過「source ='abc'和created_at> ='2016-01-01 00:00:00'」查詢?現在這是一個問題,因爲cassandra不允許查詢非索引字段。 CREATE TABLE文章( ID文本, 源文本, created_at時間戳, 類文本, 頻道文本, last_crawled時間戳, 文本文字, 縮略圖文本, 標題文本, URL文本, PRIMARY KEY( id) ) – peter

相關問題