2

我得到了一個能夠完全抓取的數據流。數據全部投入Kafka,然後發送給Cassandra。現在卡夫卡消費者非常緩慢,比製片人慢得多。我希望他們完全一樣。我能做些什麼來實現這個結果或者我的代碼出了什麼問題?爲什麼我的卡夫卡消費者比我的卡夫卡生產者慢得多?

這裏是蟒蛇我卡夫卡消費者代碼:

import logging 
from cassandra.cluster import Cluster 
from kafka.consumer.kafka import KafkaConsumer 
from kafka.consumer.multiprocess import MultiProcessConsumer 
from kafka.client import KafkaClient 
from kafka.producer.simple import SimpleProducer 
import json 
from datetime import datetime, timedelta 
from cassandra import ConsistencyLevel 
from dateutil.parser import parse 
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG) 
class Whitelist(logging.Filter): 
    def __init__(self, *whitelist): 
     self.whitelist = [logging.Filter(name) for name in whitelist] 
    def filter(self, record): 
     return any(f.filter(record) for f in self.whitelist) 
for handler in logging.root.handlers: 
    handler.addFilter(Whitelist('consumer')) 
log = logging.getLogger('consumer') 
try: 
    cluster = Cluster(['localhost']); session = cluster.connect(keyspace) 
    kafka = KafkaClient('localhost') 
    consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None) 
    article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?") 
    article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM 
    article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") 
    article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)") 
    article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)") 
    schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)") 
    axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)") 
    while True: 
     messages = consumer.get_messages(count=16) 
     if len(messages) == 0: 
      print 'IDLE' 
      continue 
     for message in messages: 
      try: 
       response = json.loads(message.value) 
       data = json.loads(response['body']) 
       print response['body'] 
       articles = data['articles'] 
       idlist = [r['id'] for r in articles] 
       if len(idlist)>0: 
        article_rows = session.execute(article_lookup_stmt,[idlist]) 
        rows = [r.id for r in article_rows] 
        for article in articles: 
         try: 
          if not article['id'] in rows: 
           article['created_at'] = parse(article['created_at']) 
           scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0) 
           session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre'])) 
           session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id'])) 
           session.execute(article_by_url_insert_stmt, (article['url'], article['id'])) 
           session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id'])) 
           log.debug('%s %s' % (article['id'],article['created_at'])) 
          session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares'])) 
         except Exception as e: 
          print 'error==============:',e 
          continue 
      except Exception as e: 
       print 'error is:',e 
       log.exception(e.message) 
except Exception as e: 
    log.exception(e.message) 

編輯:

我還添加了我的個人資料結果與上面的代碼慢行似乎是

article_rows = session.execute(article_lookup_stmt,[idlist]) 

Sun Feb 14 16:01:01 2016 consumer.out 

     395793 function calls (394232 primitive calls) in 23.074 seconds 

    Ordered by: internal time 

    ncalls tottime percall cumtime percall filename:lineno(function) 
     141 10.695 0.076 10.695 0.076 {select.select} 
    7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects} 
     1 0.542 0.542 23.097 23.097 consumer.py:5(<module>) 
    1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects} 
     38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode) 
     13 0.078 0.006 0.078 0.006 {time.sleep} 
    2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__) 
    22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack) 
     3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response) 
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan) 
    1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__) 
     3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics) 
     33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple) 
    15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0} 
     141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request) 
     288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes) 
    2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller) 
     115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages) 
    2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers) 
    24548 0.017 0.000 0.017 0.000 {_struct.unpack} 
44228/43959 0.016 0.000 0.016 0.000 {len} 

謝謝期待你的答覆。

+1

正如目前所述,您的問題缺乏正確答案所需的詳細信息。使用分析器查明腳本的哪些部分很慢,然後嘗試重寫這些部分以使其更快。有關更多詳細信息,請參見https://docs.python.org/2/library/profile.html。 – liori

+0

我的腳本緩慢的部分是消息後的消息。 – peter

+1

你的消費者問題5 cassandra查詢 - 沒有跡象表明你的用戶做了什麼,但似乎5個同步CQL查詢可能需要比平凡的生產者花費更長的時間。 –

回答

2

您可以嘗試運行消費者而不保存到C *,因此您可以觀察它產生的差異。
如果事實證明保存到C *是一個阻塞點(我假設它是這樣),那麼您可以擁有一個線程池(大於16個線程用戶生成線程),其唯一責任是寫入C *。

這樣,您就可以卸載代碼的慢速部分,這隻會在消費者代碼中留下微不足道的部分。
您可以使用from multiprocessing import Pool
更多here

+0

謝謝!如果我已經這樣做了,但它仍然很慢?你是正確的寫給卡桑德拉的是瓶頸。但不知道數據增加時爲什麼這麼慢。 – peter

+1

如果你正在使用你發佈的代碼,那麼你還沒有這樣做:)但是如果你這樣做,並且你注意到無論你的線程池中有多少線程,你仍然處於大致相同的吞吐量,那麼你必須調整C *。由於我們知道Cassandra最顯着的特點是始終能夠接受無限數量的數據(如horz。scale增加),這應該是可行的,您應該將此問題發佈到C *標籤。 –

+0

非常感謝。對於很多後續問題抱歉,我只有一個:)如果我已經使用多進程使用者,爲什麼我仍然需要使用多線程?謝謝 – peter