我設置了一個系統來過濾twitter實時流樣本。顯然,數據庫寫入速度太慢,無法跟上比一些低容量關鍵字更復雜的任何事情。我實現了django-rq作爲一個簡單的排隊系統,以在推送過程中將推文推送到基於redis的隊列中,這很有效。我的問題在另一方面。這個問題的背景是我現在有一個系統正在運行,有150萬推文用於分析,另有375,000個通過redis排隊。按照目前的表現速度,如果我關掉那些我不想要的流,我需要花費3天才能趕上。如果我維持這些流,那麼根據我最近的估計,這需要大約一個月的時間。對大型(ish)數據集進行緩慢的django數據庫操作。
數據庫現在跨兩個主表有數百萬行,寫入速度非常慢。 rq工作者的最佳數量似乎是4,並且平均每秒排列1.6個任務。 (下面列入的是什麼代碼)。我認爲可能問題在於爲每個新的隊列任務打開數據庫連接,因此將CONN_MAX_AGE設置爲60,但這並沒有改進任何東西。
剛剛在localhost上測試了這個,我在Macbook 2011上以超過13次寫入/秒的速度運行了Chrome等,但是該數據庫中只有幾千行,這讓我相信它與尺寸有關。有幾個我正在使用的get_or_create
命令(見下文),這可能會減慢速度,但不能通過使用它們看到任何其他方式 - 我需要檢查用戶是否存在,並且我需要檢查是否推文已經存在(我可能,我懷疑,移動後者嘗試/除非,基於從直播流進來的推文不應該已經存在,原因很明顯)。我會獲得很多性能增益那個?由於這仍在運行,我很想優化代碼,並在那裏獲得一些更快/更高效的工作,這樣我就可以趕上!將運行一個預審員工批量工作? (即我可以批量創建不存在的用戶,或類似的東西?)
我在數字海洋上運行4核/ 8Gb Ram液滴,所以感覺這是一些相當糟糕的性能,並且大概是代碼我在這裏錯了什麼地方?
(我在這裏發佈了這個,而不是代碼審查,因爲我認爲這與Q &格式相關,因爲我試圖解決特定的代碼問題,而不是'我怎麼能這樣做通常更好?')
注:我正在django 1.6工作,因爲這是我已經浮動了一段時間的代碼,並沒有信心升級當時 - 這不是公衆所面臨的,所以除非現在有一個令人信服的理由(比如這個性能問題),否則我不會升級(對於這個項目)。
流監聽器:
class StdOutListener(tweepy.StreamListener):
def on_data(self, data):
# Twitter returns data in JSON format - we need to decode it first
decoded = json.loads(data)
#print type(decoded), decoded
# Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users
try:
if decoded['lang'] == 'en':
django_rq.enqueue(read_both, decoded)
else:
pass
except KeyError,e:
print "Error on Key", e
except DataError, e:
print "DataError", e
return True
def on_error(self, status):
print status
讀取用戶/ Twitter微博/兩者
def read_user(tweet):
from harvester.models import User
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
#We might get weird results where user has changed their details"], so first we check the UID.
#print "MULTIPLE USER DEBUG", tweet["user"]["id_str"]
try:
current_user = User.objects.get(id_str=tweet["user"]["id_str"])
created=False
return current_user, created
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
current_user = User.objects.filter(id_str=tweet["user"]["id_str"])[0]
return current_user, False
if not tweet["user"]["follow_request_sent"]:
tweet["user"]["follow_request_sent"] = False
if not tweet["user"]["following"]:
tweet["user"]["following"] = False
if not tweet["user"]["description"]:
tweet["user"]["description"] = " "
if not tweet["user"]["notifications"]:
tweet["user"]["notifications"] = False
#If that doesn't work"], then we'll use get_or_create (as a failback rather than save())
from dateutil.parser import parse
if not tweet["user"]["contributors_enabled"]:
current_user, created = User.objects.get_or_create(
follow_request_sent=tweet["user"]["follow_request_sent"],
_json = {},
verified = tweet["user"]["verified"],
followers_count = tweet["user"]["followers_count"],
profile_image_url_https = tweet["user"]["profile_image_url_https"],
id_str = tweet["user"]["id_str"],
listed_count = tweet["user"]["listed_count"],
utc_offset = tweet["user"]["utc_offset"],
statuses_count = tweet["user"]["statuses_count"],
description = tweet["user"]["description"],
friends_count = tweet["user"]["friends_count"],
location = tweet["user"]["location"],
profile_image_url= tweet["user"]["profile_image_url"],
following = tweet["user"]["following"],
geo_enabled = tweet["user"]["geo_enabled"],
profile_background_image_url =tweet["user"]["profile_background_image_url"],
screen_name = tweet["user"]["screen_name"],
lang = tweet["user"]["lang"],
profile_background_tile = tweet["user"]["profile_background_tile"],
favourites_count = tweet["user"]["favourites_count"],
name = tweet["user"]["name"],
notifications = tweet["user"]["notifications"],
url = tweet["user"]["url"],
created_at = parse(tweet["user"]["created_at"]),
contributors_enabled = False,
time_zone = tweet["user"]["time_zone"],
protected = tweet["user"]["protected"],
default_profile = tweet["user"]["default_profile"],
is_translator = tweet["user"]["is_translator"]
)
else:
current_user, created = User.objects.get_or_create(
follow_request_sent=tweet["user"]["follow_request_sent"],
_json = {},
verified = tweet["user"]["verified"],
followers_count = tweet["user"]["followers_count"],
profile_image_url_https = tweet["user"]["profile_image_url_https"],
id_str = tweet["user"]["id_str"],
listed_count = tweet["user"]["listed_count"],
utc_offset = tweet["user"]["utc_offset"],
statuses_count = tweet["user"]["statuses_count"],
description = tweet["user"]["description"],
friends_count = tweet["user"]["friends_count"],
location = tweet["user"]["location"],
profile_image_url= tweet["user"]["profile_image_url"],
following = tweet["user"]["following"],
geo_enabled = tweet["user"]["geo_enabled"],
profile_background_image_url =tweet["user"]["profile_background_image_url"],
screen_name = tweet["user"]["screen_name"],
lang = tweet["user"]["lang"],
profile_background_tile = tweet["user"]["profile_background_tile"],
favourites_count = tweet["user"]["favourites_count"],
name = tweet["user"]["name"],
notifications = tweet["user"]["notifications"],
url = tweet["user"]["url"],
created_at = parse(tweet["user"]["created_at"]),
contributors_enabled = tweet["user"]["contributers_enabled"],
time_zone = tweet["user"]["time_zone"],
protected = tweet["user"]["protected"],
default_profile = tweet["user"]["default_profile"],
is_translator = tweet["user"]["is_translator"]
)
#print "CURRENT USER:""], type(current_user)"], current_user
#current_user"], created = User.objects.get_or_create(current_user)
return current_user, created
def read_tweet(tweet, current_user):
import logging
logger = logging.getLogger('django')
from datetime import date, datetime
#print "Inside read_Tweet"
from harvester.models import Tweet
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.db import DataError
#We might get weird results where user has changed their details"], so first we check the UID.
#print tweet_data["created_at"]
from dateutil.parser import parse
tweet["created_at"] = parse(tweet["created_at"])
try:
#print "trying tweet_data["id"
current_tweet =Tweet.objects.get(id_str=tweet["id_str"])
created=False
return current_user, created
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
current_tweet =Tweet.objects.filter(id_str=tweet["id_str"])[0]
try:
current_tweet, created = Tweet.objects.get_or_create(
truncated=tweet["truncated"],
text=tweet["text"],
favorite_count=tweet["favorite_count"],
author = current_user,
_json = {},
source=tweet["source"],
retweeted=tweet["retweeted"],
coordinates = tweet["coordinates"],
entities = tweet["entities"],
in_reply_to_screen_name = tweet["in_reply_to_screen_name"],
id_str = tweet["id_str"],
retweet_count = tweet["retweet_count"],
favorited = tweet["favorited"],
user = tweet["user"],
geo = tweet["geo"],
in_reply_to_user_id_str = tweet["in_reply_to_user_id_str"],
lang = tweet["lang"],
created_at = tweet["created_at"],
place = tweet["place"])
print "DEBUG", current_user, current_tweet
return current_tweet, created
except DataError, e:
#Catchall to pick up non-parsed tweets
print "DEBUG ERROR", e, tweet
return None, False
def read_both(tweet):
current_user, created = read_user(tweet)
current_tweet, created = read_tweet(tweet, current_user)
你確定錯誤來自django嗎?我的問題的理由是因爲twitter限制了您從Streaming API獲取數據的速度。 13寫/秒是巨大的,你得到什麼django?另外,我建議在您將流監聽器調用到所需語言時更改過濾器,而不是在獲取數據後對其進行過濾(單靠這應該會加快速度)。 – Leb
這個問題不在流監聽器中 - 它將設置爲獲取廣泛的地理位置和關鍵字列表,並且可以在週末的某個點上跟上超過100條推/每秒的內存隊列 - 隊列在Redis中有超過350,000個排隊等待寫入數據庫的推文,所以這是寫入方面 - 我可以看到在過去的500秒內完成了多少個推文。 13 /秒是在本地主機上的django上基本爲空的數據庫。 Twitter過濾器沒有(afaik)讓你直接過濾語言,你必須在收到流時做,但很樂意被糾正。 – Withnail