我有一個數據庫記錄集(大約1000行),我目前正在遍歷它們,爲每條記錄使用額外的數據庫查詢來集成更多的數據。Python在記錄集中的多線程
這樣做會將整體處理時間提高到大概100秒。
我想要做的是將功能分享到2-4個過程。
我正在使用Python 2.7來兼容AWS Lambda。
def handler(event, context):
try:
records = connection.get_users()
mandrill_client = open_mandrill_connection()
mandrill_messages = get_mandrill_messages()
mandrill_template = 'POINTS weekly-report-to-user'
start_time = time.time()
messages = build_messages(mandrill_messages, records)
print("OVERALL: %s seconds ---" % (time.time() - start_time))
send_mandrill_message(mandrill_client, mandrill_template, messages)
connection.close_database_connection()
return "Process Completed"
except Exception as e:
print(e)
以下是我希望投入線程功能:
def build_messages(messages, records):
for record in records:
record = dict(record)
stream = get_user_stream(record)
data = compile_loyalty_stream(stream)
messages['to'].append({
'email': record['email'],
'type': 'to'
})
messages['merge_vars'].append({
'rcpt': record['email'],
'vars': [
{
'name': 'total_points',
'content': record['total_points']
},
{
'name': 'total_week',
'content': record['week_points']
},
{
'name': 'stream_greek',
'content': data['el']
},
{
'name': 'stream_english',
'content': data['en']
}
]
})
return messages
我已經試過被導入多道庫:
from multiprocessing.pool import ThreadPool
創造了嘗試內池塊並映射此池中的功能:
pool = ThreadPool(4)
messages = pool.map(build_messages_in, itertools.izip(itertools.repeat(mandrill_messages), records))
def build_messages_in(a_b):
build_msg(*a_b)
def build_msg(a, b):
return build_messages(a, b)
def get_user_stream(record):
response = []
i = 0
for mod, mod_id, act, p, act_created in izip(record['models'], record['model_ids'], record['actions'],
record['points'], record['action_creation']):
information = get_reference(mod, mod_id)
if information:
response.append({
'action': act,
'points': p,
'created': act_created,
'info': information
})
if (act == 'invite_friend') \
or (act == 'donate') \
or (act == 'bonus_500_general') \
or (act == 'bonus_1000_general') \
or (act == 'bonus_500_cancel') \
or (act == 'bonus_1000_cancel'):
response[i]['info']['date_ref'] = act_created
response[i]['info']['slug'] = 'attiki'
if (act == 'bonus_500_general') \
or (act == 'bonus_1000_general') \
or (act == 'bonus_500_cancel') \
or (act == 'bonus_1000_cancel'):
response[i]['info']['title'] = ''
i += 1
return response
最後,我從build_message函數中刪除了for循環。
我得到的結果是'NoneType'對象不可迭代。
這是這樣做的正確方法嗎?
@GhostCat我提交了這個問題,忘了寫下我試過的東西。這裏有一些代碼可以工作,下面是一些應該可以工作的代碼,但是沒有。基本上我試圖多處理build_messages函數。 – mallix
太好了。現在讓專家來幫助你;-) – GhostCat
你還沒有說過你在哪裏得到這個錯誤。從我可以收集的信息來看,這是使用'mailchimp' API,我假設最長的等待時間是針對API響應的? – roganjosh