2017-04-04 28 views
0

我努力學習Python ASYNCIO
我能理解的意思事件循環的手段,什麼協同程序....
較小的程度上什麼樣的未來和任務的手段
我想通過一些事件循環怎麼做任務了協同程序和一些如何在某種隊列調度它們,然後通過一個
我的問題,執行它們一個是方法run_in_executor
我想了解一些Python代碼爲了把它變成C++
在這個代碼,據我所知:
筆者作出額外一個線程>>>>
futures.append(executor.submit(do_work, symbol, day, files[symbol]))
那麼這個新的線程使事件循環>>
csv.append(day, decompress(day, ***fetch_day(symbol, day)***))爲什麼asyncio.run_in_executer不增加任務數的循環

`def fetch_day(symbol, day): 
    local_data = threading.local() 
    loop = getattr(local_data, 'loop', asyncio.new_event_loop()) 
    asyncio.set_event_loop(loop) 
    ***loop = asyncio.get_event_loop()***#first event loop 
    loop.set_debug(True)` 

該時間表24個任務則>>
`高清create_tasks(符號,天):

start = 0 

    if is_dst(day): 
     start = 1 

    url_info = { 
     'currency': symbol, 
     'year': day.year, 
     'month': day.month - 1, 
     'day': day.day 
    } 
    tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)] 

    # if is_dst(day): 
    #  next_day = day + datetime.timedelta(days=1) 
    #  url_info = { 
    #   'currency': symbol, 
    #   'year': next_day.year, 
    #   'month': next_day.month - 1, 
    #   'day': next_day.day 
    #  } 
    #  tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0)))) 
    return tasks` 

` tasks = create_tasks(symbol, day) 
    #z=asyncio.Task.all_tasks[0]############## 
    ***loop.run_until_complete(asyncio.wait(tasks))*** 
    #y=asyncio.Task.all_tasks[0]##############` 

每個任務
async def get(url):#each task with total24 of get tasks ***loop = asyncio.get_event_loop()***#i dont know if same loop or new one buffer = BytesIO() id = url[35:].replace('/', " ") start = time.time() Logger.info("Fetching {0}".format(id)) for i in range(ATTEMPTS): try: #z=asyncio.Task.all_tasks[0]############## ***res = await loop.run_in_executor(None, lambda: requests.get(url, stream=True))***#this loop if same loop why not increase number of tasks with each get run of the 24 gets>>we should have 48 futures in total

「在這裏,我不知道這是否作出新的循環,或使用被稱爲1號線的額外的線程的同一迴路」
使用循環run_in_executer功能使得新的線程

我想知道,如果它是使新的循環
或只使用同一個循環從額外的1線

那麼如果使用相同的循環>>>>>
那麼爲什麼任務的處理程序的數量不運行run_in_executer協程

後增加

「我的理解是run_in_executer協同程序作出新的任務添加到循環爲什麼它不會增加循環任務數

另一個想法是,只有用線使用另一組從環任務未來獨立的

我怎樣才能知道在循環中等待期貨的數量?

的代碼是從輪轂的git

這裏杜卡主是代碼:

main.py
#!/usr/bin/env python3.5 
import sys 
import logging 
import argparse 
from datetime import date, timedelta 

#from duka.app import app 
#from duka.core import valid_date, set_up_signals 
#from duka.core.utils import valid_timeframe, TimeFrame 
from app import app 
from core import valid_date, set_up_signals 
from core.utils import valid_timeframe, TimeFrame 
VERSION = '0.2.1' 


def main(): 
    parser = argparse.ArgumentParser(prog='duka', usage='%(prog)s [options]') 
    parser.add_argument('-v', '--version', action='version', 
         version='Version: %(prog)s-{version}'.format(version=VERSION)) 
    parser.add_argument('symbols', metavar='SYMBOLS', type=str, nargs='?', 
         help='symbol list using format EURUSD EURGBP', default=["GBPJPY"]) 
    parser.add_argument('-d', '--day', type=valid_date, help='specific day format YYYY-MM-DD (default today)', 
         default=date.today() - timedelta(1)) 
    parser.add_argument('-s', '--startdate', type=valid_date, help='start date format YYYY-MM-DD (default today)') 
    parser.add_argument('-e', '--enddate', type=valid_date, help='end date format YYYY-MM-DD (default today)') 
    parser.add_argument('-t', '--thread', type=int, help='number of threads (default 20)', default=5) 
    parser.add_argument('-f', '--folder', type=str, help='destination folder (default .)', default='.') 
    parser.add_argument('-c', '--candle', type=valid_timeframe, 
         help='use candles instead of ticks. Accepted values M1 M2 M5 M10 M15 M30 H1 H4', 
         default=TimeFrame.TICK) 
    parser.add_argument('--header', action='store_true', help='include CSV header (default false)', default=False) 
    args = parser.parse_args() 

    if args.startdate is not None: 
     start = args.startdate 
    else: 
     start = args.day 

    if args.enddate is not None: 
     end = args.enddate 
    else: 
     end = args.day 

    # Configure logging to show the name of the thread 
    # where the log message originates. 
    logging.basicConfig(
     level=logging.DEBUG, 
     format='%(threadName)10s %(name)18s: %(message)s', 
     stream=sys.stderr, 
    ) 
    #logging.basicConfig(level=logging.DEBUG) 

    set_up_signals() 
    app(args.symbols, start, end, args.thread, args.candle, args.folder, args.header) 


if __name__ == '__main__': 
    main() 

app.py
import concurrent 
import threading 
import time 
from collections import deque 
from datetime import timedelta, date 

#from ..core import decompress, fetch_day, Logger 
#from ..core.csv_dumper import CSVDumper 
#from ..core.utils import is_debug_mode, TimeFrame 
from core import decompress, fetch_day, Logger 
from core.csv_dumper import CSVDumper 
from core.utils import is_debug_mode, TimeFrame 

SATURDAY = 5 
day_counter = 0 


def days(start, end): 
    if start > end: 
     return 
    end = end + timedelta(days=1) 
    today = date.today() 
    while start != end: 
     if start.weekday() != SATURDAY and start != today: 
      yield start 
     start = start + timedelta(days=1) 


def format_left_time(seconds): 
    if seconds < 0: 
     return "--:--:--" 
    m, s = divmod(seconds, 60) 
    h, m = divmod(m, 60) 
    return "%d:%02d:%02d" % (h, m, s) 


def update_progress(done, total, avg_time_per_job, threads): 
    progress = 1 if total == 0 else done/total 
    progress = int((1.0 if progress > 1.0 else progress) * 100) 
    remainder = 100 - progress 
    estimation = (avg_time_per_job * (total - done)/threads) 
    if not is_debug_mode(): 
     print('\r[{0}] {1}% Left : {2} '.format('#' * progress + '-' * remainder, progress, 
                format_left_time(estimation)), end='') 


def how_many_days(start, end): 
    return sum(1 for _ in days(start, end)) 


def avg(fetch_times): 
    if len(fetch_times) != 0: 
     return sum(fetch_times)/len(fetch_times) 
    else: 
     return -1 


def name(symbol, timeframe, start, end): 
    ext = ".csv" 

    for x in dir(TimeFrame): 
     if getattr(TimeFrame, x) == timeframe: 
      ts_str = x 

    name = symbol + "_" + ts_str + "_" + str(start) 

    if start != end: 
     name += "_" + str(end) 

    return name + ext 


def app(symbols, start, end, threads, timeframe, folder, header): 
    if start > end: 
     return 
    lock = threading.Lock() 
    global day_counter 
    total_days = how_many_days(start, end) 

    if total_days == 0: 
     return 

    last_fetch = deque([], maxlen=5) 
    update_progress(day_counter, total_days, -1, threads) 

    def do_work(symbol, day, csv): 
     global day_counter 
     star_time = time.time() 
     Logger.info("Fetching day {0}".format(day)) 
     try: 
      csv.append(day, decompress(day, ***fetch_day(symbol, day)***)) 
     except Exception as e: 
      print("ERROR for {0}, {1} Exception : {2}".format(day, symbol, str(e))) 
     elapsed_time = time.time() - star_time 
     last_fetch.append(elapsed_time) 
     with lock: 
      day_counter += 1 
     Logger.info("Day {0} fetched in {1}s".format(day, elapsed_time)) 

    futures = [] 

    with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: 

     files = {symbol: CSVDumper(symbol, timeframe, start, end, folder, header) for symbol in symbols} 

     for symbol in symbols: 
      for day in days(start, end): 
       ***futures.append(executor.submit(do_work, symbol, day, files[symbol]))***#>>>>>>>>>>first extra thread 

     for future in concurrent.futures.as_completed(futures): 
      if future.exception() is None: 
       update_progress(day_counter, total_days, avg(last_fetch), threads) 
      else: 
       Logger.error("An error happen when fetching data : ", future.exception()) 

     Logger.info("Fetching data terminated") 
     for file in files.values(): 
      file.dump() 

    update_progress(day_counter, total_days, avg(last_fetch), threads) 

fetch.py
import asyncio 
import datetime 
import threading 
import time 
from functools import reduce 
from io import BytesIO, DEFAULT_BUFFER_SIZE 

import requests 

#from ..core.utils import Logger, is_dst 
from core.utils import Logger, is_dst 

URL = "https://www.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5" 
ATTEMPTS = 5 


***async def get(url):***#each task with total24 of get tasks 
    ***loop = asyncio.get_event_loop()***#i dont know if same loop or new one 
    buffer = BytesIO() 
    id = url[35:].replace('/', " ") 
    start = time.time() 
    Logger.info("Fetching {0}".format(id)) 
    for i in range(ATTEMPTS): 
     try: 
      #z=asyncio.Task.all_tasks[0]############## 
      ***res = await loop.run_in_executor(None, lambda: requests.get(url, stream=True))***#this loop if same loop why not increase number of tasks with each get run of the 24 gets>>we should have 48 futures in total 
      if res.status_code == 200: 
       for chunk in res.iter_content(DEFAULT_BUFFER_SIZE): 
        buffer.write(chunk) 
       Logger.info("Fetched {0} completed in {1}s".format(id, time.time() - start)) 
       if len(buffer.getbuffer()) <= 0: 
        Logger.info("Buffer for {0} is empty ".format(id)) 
       return buffer.getbuffer() 
      else: 
       Logger.warn("Request to {0} failed with error code : {1} ".format(url, str(res.status_code))) 
     except Exception as e: 
      Logger.warn("Request {0} failed with exception : {1}".format(id, str(e))) 
      time.sleep(0.5 * i) 

    raise Exception("Request failed for {0} after ATTEMPTS attempts".format(url)) 


def create_tasks(symbol, day): 

    start = 0 

    if is_dst(day): 
     start = 1 

    url_info = { 
     'currency': symbol, 
     'year': day.year, 
     'month': day.month - 1, 
     'day': day.day 
    } 
    tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)] 

    # if is_dst(day): 
    #  next_day = day + datetime.timedelta(days=1) 
    #  url_info = { 
    #   'currency': symbol, 
    #   'year': next_day.year, 
    #   'month': next_day.month - 1, 
    #   'day': next_day.day 
    #  } 
    #  tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0)))) 
    return tasks 


def fetch_day(symbol, day): 
    local_data = threading.local() 
    loop = getattr(local_data, 'loop', asyncio.new_event_loop()) 
    asyncio.set_event_loop(loop) 
    ***loop = asyncio.get_event_loop()***#first event loop 
    loop.set_debug(True) 
    tasks = create_tasks(symbol, day) 
    #z=asyncio.Task.all_tasks[0]############## 
    ***loop.run_until_complete(asyncio.wait(tasks))*** 
    #y=asyncio.Task.all_tasks[0]############## 
    def add(acc, task): 
     acc.write(task.result()) 
     return acc 

    return reduce(add, tasks, BytesIO()).getbuffer() 

其他代碼:

在此代碼run_in_thread使期貨塊列表
所以在前面的代碼在哪裏這些任務???????????

import asyncio 
import concurrent.futures 
import logging 
import sys 
import time 


def blocks(n): 
    log = logging.getLogger('blocks({})'.format(n)) 
    log.info('running') 
    time.sleep(0.1) 
    log.info('done') 
    return n ** 2 


async def run_blocking_tasks(executor): 
    log = logging.getLogger('run_blocking_tasks') 
    log.info('starting') 

    log.info('creating executor tasks') 
    loop = asyncio.get_event_loop() 
    blocking_tasks = [ 
     loop.run_in_executor(executor, blocks, i) 
     for i in range(6) 
    ] 
    log.info('waiting for executor tasks') 
    completed, pending = await asyncio.wait(blocking_tasks) 
    results = [t.result() for t in completed] 
    log.info('results: {!r}'.format(results)) 

    log.info('exiting') 


if __name__ == '__main__': 
    # Configure logging to show the name of the thread 
    # where the log message originates. 
    logging.basicConfig(
     level=logging.INFO, 
     format='%(threadName)10s %(name)18s: %(message)s', 
     stream=sys.stderr, 
    ) 

    # Create a limited thread pool. 
    executor = concurrent.futures.ThreadPoolExecutor(
     max_workers=3, 
    ) 

    event_loop = asyncio.get_event_loop() 
    try: 
     event_loop.run_until_complete(
      run_blocking_tasks(executor) 
     ) 
    finally: 
     event_loop.close() 

`

+1

問題是什麼?不要添加所有代碼,只需添加與問題相關的部分即可。 – Udi

回答

1

requests不兼容asyncio。用aiohttp代替:

import aiohttp 
import asyncio 
import async_timeout 

async def fetch(session, url): 
    with async_timeout.timeout(10): 
     async with session.get(url) as response: 
      return await response.text() 

async def main(loop): 
    async with aiohttp.ClientSession(loop=loop) as session: 
     html = await fetch(session, 'http://python.org') 
     print(html) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main(loop)) 
+0

感謝您的回覆...我試圖瞭解當從循環創建新線程時會發生什麼,以及它的未來在哪裏?.....我使用visual studio,當我調試並添加監視循環,我發現已經完成了所有24個任務,但是隨着新線程的建立以及新的期貨的創建,_ready列表永遠不會增加它的處理程序>>>>>>>這就是我尋找的內容....再次感謝 –

+0

請提供一個** short * *簡單的代碼就是爲了幫助你而做的。 – Udi