2015-12-01 23 views
15

我使用Python解析大約60GB的JSON文件,然後使用Python-MySQL連接器插入MySQL數據庫。每個JSON文件大約500MB每秒低InnoDB寫入數 - 使用Python的AWS EC2到MySQL RDS

我一直在使用具有輔助卷的AWS r3.xlarge EC2實例來存儲60GB的JSON數據。

然後,我使用AWS RDS r3.xlarge MySQL實例。這些實例都位於相同的區域和可用區域中。 EC2實例使用以下Python腳本來加載JSON,解析它,然後將其插入到MySQL RDS中。我的Python:

import json 
import mysql.connector 
from mysql.connector import errorcode 
from pprint import pprint 
import glob 
import os 

os.chdir("./json_data") 

for file in glob.glob("*.json"): 
    with open(file, 'rU') as data_file: 
     results = json.load(data_file) 
     print('working on file:', file) 

    cnx = mysql.connector.connect(user='', password='', 
     host='') 

    cursor = cnx.cursor(buffered=True) 

    DB_NAME = 'DB' 

    def create_database(cursor): 
     try: 
      cursor.execute(
       "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME)) 
     except mysql.connector.Error as err: 
      print("Failed creating database: {}".format(err)) 
      exit(1) 

    try: 
     cnx.database = DB_NAME  
    except mysql.connector.Error as err: 
     if err.errno == errorcode.ER_BAD_DB_ERROR: 
      create_database(cursor) 
      cnx.database = DB_NAME 
     else: 
      print(err) 
      exit(1) 

    add_overall_data = ("INSERT INTO master" 
     "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)" 
     "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)") 

    add_polyline = ("INSERT INTO polyline" 
     "(Overview_polyline, request_no)" 
     "VALUES (%(Overview_polyline)s, %(request_no)s)") 

    add_summary = ("INSERT INTO summary" 
     "(summary, request_no)" 
     "VALUES (%(summary)s, %(request_no)s)") 

    add_warnings = ("INSERT INTO warnings" 
     "(warnings, request_no)" 
     "VALUES (%(warnings)s, %(request_no)s)") 

    add_waypoint_order = ("INSERT INTO waypoint_order" 
     "(waypoint_order, request_no)" 
     "VALUES (%(waypoint_order)s, %(request_no)s)") 

    add_leg_data = ("INSERT INTO leg_data" 
     "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
     "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)") 
    error_messages = [] 
    for result in results: 
     if result["status"] == "OK": 
      for leg in result['routes'][0]['legs']: 
       try: 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": leg['dtf']['value'], 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": leg['start_address'], 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": leg['end_address'] 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
       except KeyError, e: 
        error_messages.append(e) 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": "000", 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": 'unknown', 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": 'unknown' 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
      for overview_polyline in result['routes']: 
       params = { 
       "request_no": request_no, 
       "Overview_polyline": overview_polyline['overview_polyline']['points'] 
       } 
       cursor.execute(add_polyline, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for summary in result['routes']: 
       params = { 
       "request_no": request_no, 
       "summary": summary['summary'] 
       } 
       cursor.execute(add_summary, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for warnings in result['routes']: 
       params = { 
       "request_no": request_no, 
       "warnings": str(warnings['warnings']) 
       } 
       cursor.execute(add_warnings, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for waypoint_order in result['routes']: 
       params = { 
       "request_no": request_no, 
       "waypoint_order": str(waypoint_order['waypoint_order']) 
       } 
       cursor.execute(add_waypoint_order, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for steps in result['routes'][0]['legs'][0]['steps']: 
       params = { 
       "request_no": request_no, 
       "leg_dt": steps['dt']['value'], 
       "leg_ds": steps['ds']['value'], 
       "leg_O_l": steps['start_location']['lat'], 
       "leg_O_ln": steps['start_location']['lng'], 
       "leg_D_l": steps['end_location']['lat'], 
       "leg_D_ln": steps['end_location']['lng'], 
       "leg_html_inst": steps['html_instructions'], 
       "leg_polyline": steps['polyline']['points'], 
       "leg_travel_mode": steps['travel_mode'] 
       } 
       cursor.execute(add_leg_data, params) 
     cnx.commit() 
    print('error messages:', error_messages) 
    cursor.close() 
    cnx.close() 
    print('finished' + file) 

在Linux實例我使用HTOP可以看到如下: htop of python process

關於MySQL數據庫,使用MySQL Workbench中我可以看到:

MySQL WorkBench Output

這個python腳本已經有好幾天了,但我只在MySQL中插入了大約20%的數據。

我的問題 - 我如何識別瓶頸?它是Python腳本嗎?它似乎在使用少量的內存 - 我可以增加這個嗎?我已經檢查了InnoDB緩衝池大小爲全(How to improve the speed of InnoDB writes per second of MySQL DB),並發現它是大:

SELECT @@innodb_buffer_pool_size; 
+---------------------------+ 
| @@innodb_buffer_pool_size | 
+---------------------------+ 
|    11674845184 | 
+---------------------------+ 

因爲我在同一個區域,我不相信使用RDS和EC2實例存在網絡瓶頸。指點我應該尋找最大的儲蓄將是非常受歡迎的!

編輯

我想我可能已經偶然發現了這個問題。爲了在解析過程中提高效率,我正在分別編寫每個級別的JSON。但是,我必須執行查詢來將JSON的嵌套部分與其更高級別相匹配。使用小型數據庫時,此查詢的開銷較低。我注意到插入的速度在這個分貝上顯着下降。這是因爲它必須搜索更大且不斷增長的數據庫才能正確連接JSON數據。

我不知道怎麼不是等待出來解決這個其他....

+1

您提到的EC2和RDS在同一地區;他們是否也在同一個可用區域?如果沒有,那可能是一個很容易看到進一步改進的方法。 –

+0

是的 - 認爲。他們都在同一個可用區 – LearningSlowly

+0

您是否嘗試過在RDS實例上使用配置的IOP? – mickzer

回答

1

我看不到在Python腳本的任何表定義....但是,當我們嘗試做大量數據操作 - 當加載到MySQL時,我們總是禁用任何數據庫索引 - 如果您有任何約束/外鍵實施 - 也應在您加載時禁用。

通過連接器/ Python連接時,默認情況下禁用自動提交功能。

但我看不到任何承諾 - 在你的代碼目前

選項總結

禁用/刪除(裝載)

- 索引
- 約束 - 外鍵 - 觸發器

在您的裝載程序中

- 禁用自動提交 - 提交過N個記錄(N將取決於你的緩衝區大小可用)

1

我englist差

,如果我做這個工作,我會

  1. 使用Python JSON轉換爲txt

  2. 使用mysq小鬼工具,導入TXT到mysql

,如果你必須做的蟒蛇+ mysql的ALLINONE,我建議使用

insert table values(1),value(2)...value(xxx) 

爲什麼「request_no SELECT FROM master'multiple發生,應該從JSON讀

我englist非常poor.so ..

0

鑑於此信息,它看起來像腳本數據庫大多是空閒的。在MySQL級別調整任何內容都爲時過早。

您需要更多地瞭解您的程序正在做什麼。

從記錄每個查詢需要多少時間開始,獲得多少錯誤等等。

這些SELECTs可能需要添加一個索引來執行,如果這是一個問題。