2016-10-10 63 views
0

我創建了以下腳本以從API端點下載圖像,並按預期工作。事情是因爲所有的請求必須相互等待,所以速度很慢。什麼是正確的方法可以使我想要獲取的每個項目都同步地執行步驟,但是要使每個項目都平行。這從在線服務稱爲 servicem8 所以我希望達到的目標是:python請求網址並行

  • 獲取所有可能的作業ID =>保持名稱/和其他信息
  • 獲取客戶
  • 取每個附件的名稱的工作

這三個步驟應該爲每個工作完成。所以我可以爲每項工作平行進行,因爲他們不必相互等待。

更新

問題我不明白的是你怎麼能確保你在一個電話作爲其唯一的每個項目,我可以做的事情在平行束例如每個項目的三個電話,以供例如,當欲

  • 取項(取名=>取描述=>取ID)

所以其取回我想使平行項目?

當前的代碼,我有工作,但相當緩慢:

import requests 
import dateutil.parser 
import shutil 
import os 

user = "[email protected]" 
passw = "test" 

print("Read json") 
url = "https://api.servicem8.com/api_1.0/job.json" 
r = requests.get(url, auth=(user, passw)) 

print("finished reading jobs.json file") 
scheduled_jobs = [] 
if r.status_code == 200: 
    for item in r.json(): 
     scheduled_date = item['job_is_scheduled_until_stamp'] 
     try: 
      parsed_date = dateutil.parser.parse(scheduled_date) 
      if parsed_date.year == 2016: 
       if parsed_date.month == 10: 
        if parsed_date.day == 10: 
         url_customer = "https://api.servicem8.com/api_1.0/Company/{}.json".format(item[ 
                            'company_uuid']) 
         c = requests.get(url_customer, auth=(user, passw)) 
         cus_name = c.json()['name'] 
         scheduled_jobs.append(
          [item['uuid'], item['generated_job_id'], cus_name]) 

     except ValueError: 
      pass 

    for job in scheduled_jobs: 
     print("fetch for job {}".format(job)) 
     url = "https://api.servicem8.com/api_1.0/Attachment.json?%24filter=related_object_uuid%20eq%20{}".format(job[ 
                               0]) 
     r = requests.get(url, auth=(user, passw)) 
     if r.json() == []: 
      pass 
     for attachment in r.json(): 
      if attachment['active'] == 1 and attachment['file_type'] != '.pdf': 
       print("fetch for attachment {}".format(attachment)) 
       url_staff = "https://api.servicem8.com/api_1.0/Staff.json?%24filter=uuid%20eq%20{}".format(
        attachment['created_by_staff_uuid']) 
       s = requests.get(url_staff, auth=(user, passw)) 
       for staff in s.json(): 
        tech = "{}_{}".format(staff['first'], staff['last']) 

       url = "https://api.servicem8.com/api_1.0/Attachment/{}.file".format(attachment[ 
                        'uuid']) 
       r = requests.get(url, auth=(user, passw), stream=True) 
       if r.status_code == 200: 
        creation_date = dateutil.parser.parse(
         attachment['timestamp']).strftime("%d.%m.%y") 
        if not os.path.exists(os.getcwd() + "/{}/{}".format(job[2], job[1])): 
         os.makedirs(os.getcwd() + "/{}/{}".format(job[2], job[1])) 
        path = os.getcwd() + "/{}/{}/SC -O {} {}{}".format(
         job[2], job[1], creation_date, tech.upper(), attachment['file_type']) 
        print("writing file to path {}".format(path)) 
        with open(path, 'wb') as f: 
         r.raw.decode_content = True 
         shutil.copyfileobj(r.raw, f) 
else: 
    print(r.text) 

更新[14/10] 我更新與給出一些提示以下方法的代碼。爲此非常感謝。只有我可以優化我猜的是附件下載,但它現在工作正常。我學到的有趣的事情是,你不能在Windows機器上創建CON文件夾:-)不知道。

我使用熊貓以及試圖避免在我的列表中的一些循環,但不知道如果我已經是最高性能。最長實際上是在完整的json文件中閱讀。我完全閱讀它們,因爲我無法找到一個僅告訴API的API方式,只能從2016年9月份返回我的作業.API查詢函數似乎在eq/lt/ht上工作。

import requests 
import dateutil.parser 
import shutil 
import os 
import pandas as pd 

user = "" 
passw = "" 

FOLDER = os.getcwd() 
headers = {"Accept-Encoding": "gzip, deflate"} 

import grequests 
urls = [ 
    'https://api.servicem8.com/api_1.0/job.json', 
    'https://api.servicem8.com/api_1.0/Attachment.json', 
    'https://api.servicem8.com/api_1.0/Staff.json', 
    'https://api.servicem8.com/api_1.0/Company.json' 
] 

#Create a set of unsent Requests: 

print("Read json files") 
rs = (grequests.get(u, auth=(user, passw), headers=headers) for u in urls) 
#Send them all at the same time: 
jobs,attachments,staffs,companies = grequests.map(rs) 

#create dataframes 
df_jobs = pd.DataFrame(jobs.json()) 
df_attachments = pd.DataFrame(attachments.json()) 
df_staffs = pd.DataFrame(staffs.json()) 
df_companies = pd.DataFrame(companies.json()) 

#url_customer = "https://api.servicem8.com/api_1.0/Company/{}.json".format(item['company_uuid']) 
#c = requests.get(url_customer, auth=(user, passw)) 

#url = "https://api.servicem8.com/api_1.0/job.json" 
#jobs = requests.get(url, auth=(user, passw), headers=headers) 


#print("Reading attachments json") 
#url = "https://api.servicem8.com/api_1.0/Attachment.json" 
#attachments = requests.get(url, auth=(user, passw), headers=headers) 

#print("Reading staff.json") 
#url_staff = "https://api.servicem8.com/api_1.0/Staff.json" 
#staffs = requests.get(url_staff, auth=(user, passw)) 

scheduled_jobs = [] 

if jobs.status_code == 200: 
    print("finished reading json file") 
    for job in jobs.json(): 
     scheduled_date = job['job_is_scheduled_until_stamp'] 
     try: 
      parsed_date = dateutil.parser.parse(scheduled_date) 
      if parsed_date.year == 2016: 
       if parsed_date.month == 9: 
        cus_name = df_companies[df_companies.uuid == job['company_uuid']].iloc[0]['name'].upper() 
        cus_name = cus_name.replace('/', '') 
        scheduled_jobs.append([job['uuid'], job['generated_job_id'], cus_name]) 

     except ValueError: 
      pass 
    print("{} jobs to fetch".format(len(scheduled_jobs))) 

    for job in scheduled_jobs: 
     print("fetch for job attachments {}".format(job)) 
     #url = "https://api.servicem8.com/api_1.0/Attachment.json?%24filter=related_object_uuid%20eq%20{}".format(job[0]) 

     if attachments == []: 
      pass 
     for attachment in attachments.json(): 
      if attachment['related_object_uuid'] == job[0]: 
       if attachment['active'] == 1 and attachment['file_type'] != '.pdf' and attachment['attachment_source'] != 'INVOICE_SIGNOFF': 
        for staff in staffs.json(): 
         if staff['uuid'] == attachment['created_by_staff_uuid']: 
          tech = "{}_{}".format(
           staff['first'].split()[-1].strip(), staff['last']) 

        creation_timestamp = dateutil.parser.parse(
         attachment['timestamp']) 
        creation_date = creation_timestamp.strftime("%d.%m.%y") 
        creation_time = creation_timestamp.strftime("%H_%M_%S") 

        path = FOLDER + "/{}/{}/SC_-O_D{}_T{}_{}{}".format(
         job[2], job[1], creation_date, creation_time, tech.upper(), attachment['file_type']) 

        # fetch attachment 

        if not os.path.isfile(path): 
         url = "https://api.servicem8.com/api_1.0/Attachment/{}.file".format(attachment[ 
                          'uuid']) 
         r = requests.get(url, auth=(user, passw), stream = True) 
         if r.status_code == 200: 
          if not os.path.exists(FOLDER + "/{}/{}".format(job[2], job[1])): 
           os.makedirs(
            FOLDER + "/{}/{}".format(job[2], job[1])) 

          print("writing file to path {}".format(path)) 
          with open(path, 'wb') as f: 
           r.raw.decode_content = True 
           shutil.copyfileobj(r.raw, f) 
        else: 
         print("file already exists") 
else: 
    print(r.text) 
+0

小心選擇一種方法t o因爲ServiceM8 API受速率限制,並且太多的同時請求導致「HTTP/1.1 429太多請求」 但是,您可以做的是逐步解析附件鏈接,而不是將它們作爲你走;從它們中構建一個url文件。您可以使用多種方法從文件中同時下載它們。 如果你有這一行:'r = requests.get(url,auth =(user,passw),stream = True)''r.url'響應將包含直接的「https:// data-cdn。 servicem8.com/....「鏈接,不會受到速率限制。 – hmedia1

+0

其他兩個簡單的步驟可以大大提高此效率: ** 1。**不要爲每個作業uuid調用Attachment API,只需在一個請求中獲取整個附件文件並過濾related_object_uuid與您在一次打到的作業uuids ** 2。**一旦成功下載了附件,將附件uuid存儲在某個文件或數據庫中,並跳過任何已經處理了uuid的迭代 - 即每次運行附件下載程序時,都會快速檢索新附件。 – hmedia1

+0

....繼續...您目前使用的方法在測試當前是否存在該文件之前,爲每個文件運行一個API請求。 – hmedia1

回答

0

總體思路是採用異步URL請求,並有一個名爲grequests用於覺得─https://github.com/kennethreitz/grequests

從文檔一個Python模塊:

import grequests 
urls = [ 
    'http://www.heroku.com', 
    'http://python-tablib.org', 
    'http://httpbin.org', 
    'http://python-requests.org', 
    'http://fakedomain/', 
    'http://kennethreitz.com' 
] 
#Create a set of unsent Requests: 
rs = (grequests.get(u) for u in urls) 
#Send them all at the same time: 
grequests.map(rs) 

而且resopnse

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, None, <Response [200]>]

+0

你可以舉一些例子如何從我的版本到grequest版本?因爲我需要其他http請求的信息來創建要保存的文件的文件名。 – Koen