我努力學習Python ASYNCIO
我能理解的意思事件循環的手段,什麼協同程序....
較小的程度上什麼樣的未來和任務的手段
我想通過一些事件循環怎麼做任務了協同程序和一些如何在某種隊列調度它們,然後通過一個
我的問題,執行它們一個是方法run_in_executor
我想了解一些Python代碼爲了把它變成C++
在這個代碼,據我所知:
筆者作出額外一個線程>>>>
futures.append(executor.submit(do_work, symbol, day, files[symbol]))
那麼這個新的線程使事件循環>>
csv.append(day, decompress(day, ***fetch_day(symbol, day)***))
爲什麼asyncio.run_in_executer不增加任務數的循環
`def fetch_day(symbol, day):
local_data = threading.local()
loop = getattr(local_data, 'loop', asyncio.new_event_loop())
asyncio.set_event_loop(loop)
***loop = asyncio.get_event_loop()***#first event loop
loop.set_debug(True)`
該時間表24個任務則>>
`高清create_tasks(符號,天):
start = 0
if is_dst(day):
start = 1
url_info = {
'currency': symbol,
'year': day.year,
'month': day.month - 1,
'day': day.day
}
tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)]
# if is_dst(day):
# next_day = day + datetime.timedelta(days=1)
# url_info = {
# 'currency': symbol,
# 'year': next_day.year,
# 'month': next_day.month - 1,
# 'day': next_day.day
# }
# tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0))))
return tasks`
` tasks = create_tasks(symbol, day)
#z=asyncio.Task.all_tasks[0]##############
***loop.run_until_complete(asyncio.wait(tasks))***
#y=asyncio.Task.all_tasks[0]##############`
每個任務
async def get(url):#each task with total24 of get tasks ***loop = asyncio.get_event_loop()***#i dont know if same loop or new one buffer = BytesIO() id = url[35:].replace('/', " ") start = time.time() Logger.info("Fetching {0}".format(id)) for i in range(ATTEMPTS): try: #z=asyncio.Task.all_tasks[0]############## ***res = await loop.run_in_executor(None, lambda: requests.get(url, stream=True))***#this loop if same loop why not increase number of tasks with each get run of the 24 gets>>we should have 48 futures in total
「在這裏,我不知道這是否作出新的循環,或使用被稱爲1號線的額外的線程的同一迴路」
使用循環run_in_executer功能使得新的線程
我想知道,如果它是使新的循環
或只使用同一個循環從額外的1線
那麼如果使用相同的循環>>>>>
那麼爲什麼任務的處理程序的數量不運行run_in_executer協程
「我的理解是run_in_executer協同程序作出新的任務添加到循環爲什麼它不會增加循環任務數
另一個想法是,只有用線使用另一組從環任務未來獨立的
我怎樣才能知道在循環中等待期貨的數量?
的代碼是從輪轂的git
這裏杜卡主是代碼:
main.py#!/usr/bin/env python3.5
import sys
import logging
import argparse
from datetime import date, timedelta
#from duka.app import app
#from duka.core import valid_date, set_up_signals
#from duka.core.utils import valid_timeframe, TimeFrame
from app import app
from core import valid_date, set_up_signals
from core.utils import valid_timeframe, TimeFrame
VERSION = '0.2.1'
def main():
parser = argparse.ArgumentParser(prog='duka', usage='%(prog)s [options]')
parser.add_argument('-v', '--version', action='version',
version='Version: %(prog)s-{version}'.format(version=VERSION))
parser.add_argument('symbols', metavar='SYMBOLS', type=str, nargs='?',
help='symbol list using format EURUSD EURGBP', default=["GBPJPY"])
parser.add_argument('-d', '--day', type=valid_date, help='specific day format YYYY-MM-DD (default today)',
default=date.today() - timedelta(1))
parser.add_argument('-s', '--startdate', type=valid_date, help='start date format YYYY-MM-DD (default today)')
parser.add_argument('-e', '--enddate', type=valid_date, help='end date format YYYY-MM-DD (default today)')
parser.add_argument('-t', '--thread', type=int, help='number of threads (default 20)', default=5)
parser.add_argument('-f', '--folder', type=str, help='destination folder (default .)', default='.')
parser.add_argument('-c', '--candle', type=valid_timeframe,
help='use candles instead of ticks. Accepted values M1 M2 M5 M10 M15 M30 H1 H4',
default=TimeFrame.TICK)
parser.add_argument('--header', action='store_true', help='include CSV header (default false)', default=False)
args = parser.parse_args()
if args.startdate is not None:
start = args.startdate
else:
start = args.day
if args.enddate is not None:
end = args.enddate
else:
end = args.day
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.DEBUG,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
#logging.basicConfig(level=logging.DEBUG)
set_up_signals()
app(args.symbols, start, end, args.thread, args.candle, args.folder, args.header)
if __name__ == '__main__':
main()
app.py
import concurrent
import threading
import time
from collections import deque
from datetime import timedelta, date
#from ..core import decompress, fetch_day, Logger
#from ..core.csv_dumper import CSVDumper
#from ..core.utils import is_debug_mode, TimeFrame
from core import decompress, fetch_day, Logger
from core.csv_dumper import CSVDumper
from core.utils import is_debug_mode, TimeFrame
SATURDAY = 5
day_counter = 0
def days(start, end):
if start > end:
return
end = end + timedelta(days=1)
today = date.today()
while start != end:
if start.weekday() != SATURDAY and start != today:
yield start
start = start + timedelta(days=1)
def format_left_time(seconds):
if seconds < 0:
return "--:--:--"
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return "%d:%02d:%02d" % (h, m, s)
def update_progress(done, total, avg_time_per_job, threads):
progress = 1 if total == 0 else done/total
progress = int((1.0 if progress > 1.0 else progress) * 100)
remainder = 100 - progress
estimation = (avg_time_per_job * (total - done)/threads)
if not is_debug_mode():
print('\r[{0}] {1}% Left : {2} '.format('#' * progress + '-' * remainder, progress,
format_left_time(estimation)), end='')
def how_many_days(start, end):
return sum(1 for _ in days(start, end))
def avg(fetch_times):
if len(fetch_times) != 0:
return sum(fetch_times)/len(fetch_times)
else:
return -1
def name(symbol, timeframe, start, end):
ext = ".csv"
for x in dir(TimeFrame):
if getattr(TimeFrame, x) == timeframe:
ts_str = x
name = symbol + "_" + ts_str + "_" + str(start)
if start != end:
name += "_" + str(end)
return name + ext
def app(symbols, start, end, threads, timeframe, folder, header):
if start > end:
return
lock = threading.Lock()
global day_counter
total_days = how_many_days(start, end)
if total_days == 0:
return
last_fetch = deque([], maxlen=5)
update_progress(day_counter, total_days, -1, threads)
def do_work(symbol, day, csv):
global day_counter
star_time = time.time()
Logger.info("Fetching day {0}".format(day))
try:
csv.append(day, decompress(day, ***fetch_day(symbol, day)***))
except Exception as e:
print("ERROR for {0}, {1} Exception : {2}".format(day, symbol, str(e)))
elapsed_time = time.time() - star_time
last_fetch.append(elapsed_time)
with lock:
day_counter += 1
Logger.info("Day {0} fetched in {1}s".format(day, elapsed_time))
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
files = {symbol: CSVDumper(symbol, timeframe, start, end, folder, header) for symbol in symbols}
for symbol in symbols:
for day in days(start, end):
***futures.append(executor.submit(do_work, symbol, day, files[symbol]))***#>>>>>>>>>>first extra thread
for future in concurrent.futures.as_completed(futures):
if future.exception() is None:
update_progress(day_counter, total_days, avg(last_fetch), threads)
else:
Logger.error("An error happen when fetching data : ", future.exception())
Logger.info("Fetching data terminated")
for file in files.values():
file.dump()
update_progress(day_counter, total_days, avg(last_fetch), threads)
fetch.py
import asyncio
import datetime
import threading
import time
from functools import reduce
from io import BytesIO, DEFAULT_BUFFER_SIZE
import requests
#from ..core.utils import Logger, is_dst
from core.utils import Logger, is_dst
URL = "https://www.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5"
ATTEMPTS = 5
***async def get(url):***#each task with total24 of get tasks
***loop = asyncio.get_event_loop()***#i dont know if same loop or new one
buffer = BytesIO()
id = url[35:].replace('/', " ")
start = time.time()
Logger.info("Fetching {0}".format(id))
for i in range(ATTEMPTS):
try:
#z=asyncio.Task.all_tasks[0]##############
***res = await loop.run_in_executor(None, lambda: requests.get(url, stream=True))***#this loop if same loop why not increase number of tasks with each get run of the 24 gets>>we should have 48 futures in total
if res.status_code == 200:
for chunk in res.iter_content(DEFAULT_BUFFER_SIZE):
buffer.write(chunk)
Logger.info("Fetched {0} completed in {1}s".format(id, time.time() - start))
if len(buffer.getbuffer()) <= 0:
Logger.info("Buffer for {0} is empty ".format(id))
return buffer.getbuffer()
else:
Logger.warn("Request to {0} failed with error code : {1} ".format(url, str(res.status_code)))
except Exception as e:
Logger.warn("Request {0} failed with exception : {1}".format(id, str(e)))
time.sleep(0.5 * i)
raise Exception("Request failed for {0} after ATTEMPTS attempts".format(url))
def create_tasks(symbol, day):
start = 0
if is_dst(day):
start = 1
url_info = {
'currency': symbol,
'year': day.year,
'month': day.month - 1,
'day': day.day
}
tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)]
# if is_dst(day):
# next_day = day + datetime.timedelta(days=1)
# url_info = {
# 'currency': symbol,
# 'year': next_day.year,
# 'month': next_day.month - 1,
# 'day': next_day.day
# }
# tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0))))
return tasks
def fetch_day(symbol, day):
local_data = threading.local()
loop = getattr(local_data, 'loop', asyncio.new_event_loop())
asyncio.set_event_loop(loop)
***loop = asyncio.get_event_loop()***#first event loop
loop.set_debug(True)
tasks = create_tasks(symbol, day)
#z=asyncio.Task.all_tasks[0]##############
***loop.run_until_complete(asyncio.wait(tasks))***
#y=asyncio.Task.all_tasks[0]##############
def add(acc, task):
acc.write(task.result())
return acc
return reduce(add, tasks, BytesIO()).getbuffer()
其他代碼:
在此代碼run_in_thread使期貨塊列表
所以在前面的代碼在哪裏這些任務???????????
import asyncio
import concurrent.futures
import logging
import sys
import time
def blocks(n):
log = logging.getLogger('blocks({})'.format(n))
log.info('running')
time.sleep(0.1)
log.info('done')
return n ** 2
async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
`
問題是什麼?不要添加所有代碼,只需添加與問題相關的部分即可。 – Udi