2015-12-01 10 views
15

Ho circa 60 GB di file JSON che sto analizzando utilizzando Python e quindi inserendo in un database MySQL utilizzando il connettore Python-MySQL. Ogni file JSON è di circa 500 MBBasso numero di scritture InnoDB al secondo: da AWS EC2 a MySQL RDS utilizzando Python

Ho utilizzato un'istanza di AWS r3.xlarge EC2 con un volume secondario per contenere i 60 GB di dati JSON.

Sto quindi utilizzando un'istanza MySQL RDS r3.xlarge di AWS. Queste istanze si trovano tutte nella stessa area e zona di disponibilità. L'istanza EC2 utilizza il seguente script Python per caricare il JSON, analizzarlo e quindi inserirlo nel MySQL RDS. Il mio pitone:

import json 
import mysql.connector 
from mysql.connector import errorcode 
from pprint import pprint 
import glob 
import os 

os.chdir("./json_data") 

for file in glob.glob("*.json"): 
    with open(file, 'rU') as data_file: 
     results = json.load(data_file) 
     print('working on file:', file) 

    cnx = mysql.connector.connect(user='', password='', 
     host='') 

    cursor = cnx.cursor(buffered=True) 

    DB_NAME = 'DB' 

    def create_database(cursor): 
     try: 
      cursor.execute(
       "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME)) 
     except mysql.connector.Error as err: 
      print("Failed creating database: {}".format(err)) 
      exit(1) 

    try: 
     cnx.database = DB_NAME  
    except mysql.connector.Error as err: 
     if err.errno == errorcode.ER_BAD_DB_ERROR: 
      create_database(cursor) 
      cnx.database = DB_NAME 
     else: 
      print(err) 
      exit(1) 

    add_overall_data = ("INSERT INTO master" 
     "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)" 
     "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)") 

    add_polyline = ("INSERT INTO polyline" 
     "(Overview_polyline, request_no)" 
     "VALUES (%(Overview_polyline)s, %(request_no)s)") 

    add_summary = ("INSERT INTO summary" 
     "(summary, request_no)" 
     "VALUES (%(summary)s, %(request_no)s)") 

    add_warnings = ("INSERT INTO warnings" 
     "(warnings, request_no)" 
     "VALUES (%(warnings)s, %(request_no)s)") 

    add_waypoint_order = ("INSERT INTO waypoint_order" 
     "(waypoint_order, request_no)" 
     "VALUES (%(waypoint_order)s, %(request_no)s)") 

    add_leg_data = ("INSERT INTO leg_data" 
     "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
     "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)") 
    error_messages = [] 
    for result in results: 
     if result["status"] == "OK": 
      for leg in result['routes'][0]['legs']: 
       try: 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": leg['dtf']['value'], 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": leg['start_address'], 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": leg['end_address'] 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
       except KeyError, e: 
        error_messages.append(e) 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": "000", 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": 'unknown', 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": 'unknown' 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
      for overview_polyline in result['routes']: 
       params = { 
       "request_no": request_no, 
       "Overview_polyline": overview_polyline['overview_polyline']['points'] 
       } 
       cursor.execute(add_polyline, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for summary in result['routes']: 
       params = { 
       "request_no": request_no, 
       "summary": summary['summary'] 
       } 
       cursor.execute(add_summary, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for warnings in result['routes']: 
       params = { 
       "request_no": request_no, 
       "warnings": str(warnings['warnings']) 
       } 
       cursor.execute(add_warnings, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for waypoint_order in result['routes']: 
       params = { 
       "request_no": request_no, 
       "waypoint_order": str(waypoint_order['waypoint_order']) 
       } 
       cursor.execute(add_waypoint_order, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for steps in result['routes'][0]['legs'][0]['steps']: 
       params = { 
       "request_no": request_no, 
       "leg_dt": steps['dt']['value'], 
       "leg_ds": steps['ds']['value'], 
       "leg_O_l": steps['start_location']['lat'], 
       "leg_O_ln": steps['start_location']['lng'], 
       "leg_D_l": steps['end_location']['lat'], 
       "leg_D_ln": steps['end_location']['lng'], 
       "leg_html_inst": steps['html_instructions'], 
       "leg_polyline": steps['polyline']['points'], 
       "leg_travel_mode": steps['travel_mode'] 
       } 
       cursor.execute(add_leg_data, params) 
     cnx.commit() 
    print('error messages:', error_messages) 
    cursor.close() 
    cnx.close() 
    print('finished' + file) 

Utilizzando htop sul Linux istanza posso vedere il seguente: htop of python process

Per quanto riguarda il database MySQL, utilizzando MySQL Workbench posso vedere che:

MySQL WorkBench Output

Questo script python è stato utilizzato per giorni ma ho inserito solo il 20% dei dati in MySQL.

Le mie domande: come identificare il collo di bottiglia? È lo script Python? Sembra che stia usando una quantità di memoria bassa - posso aumentare questo? Ho controllato la dimensione InnoDB buffer pool secondo (How to improve the speed of InnoDB writes per second of MySQL DB) e trovato ad essere grande:

SELECT @@innodb_buffer_pool_size; 
+---------------------------+ 
| @@innodb_buffer_pool_size | 
+---------------------------+ 
|    11674845184 | 
+---------------------------+ 

Dal momento che sto utilizzando un'istanza EC2 RDS e nella stessa regione non credo ci sia un collo di bottiglia della rete . Indicatori su dove dovrei cercare i maggiori risparmi sarebbero i benvenuti!

EDIT

Credo di aver inciampato sul problema. Per efficienza durante l'analisi, scrivo separatamente ogni livello di JSON. Tuttavia, devo quindi eseguire una query per abbinare una parte annidata di JSON con il suo livello più alto. Questa query ha un sovraccarico basso quando si utilizzano piccoli database. Ho notato che la velocità degli inserti è diminuita drasticamente su questo db. Questo perché sta cercando un db più grande e in continua crescita per connettere correttamente i dati JSON.

Non sono sicuro di come risolvere questo altro che aspettare fuori ....

+1

Hai menzionato che EC2 e RDS si trovano nella stessa regione; sono anche nella stessa zona di disponibilità? In caso contrario, potrebbe essere un modo abbastanza semplice per vedere ulteriori miglioramenti. –

+0

Sì, considerato quello. Sono entrambi nella stessa zona di disponibilità – LearningSlowly

+0

Hai provato a utilizzare gli IOP forniti nell'istanza RDS? – mickzer

risposta

1

non riesco a vedere alcun definizioni di tabella nello script Python .... Ma quando proviamo e facciamo grandi operazioni sui dati - disabiliteremmo tutti gli indici del database quando caricavamo a MySQL, anche se avete dei vincoli/l'applicazione della chiave esterna - questo dovrebbe essere disabilitato anche durante il caricamento.

L'autocommit è disabilitato per impostazione predefinita durante la connessione tramite Connector/Python.

Ma non riesco a vedere alcun commit - opzioni nel codice che presenti

di riassumere

Disabilita/Rimuovi (per carico)

- Indici
- Vincoli - Chiavi esterne - Attiva

nel programma Caricamento

- Disabilitare autocommit - commettono mai n record (N dipenderà la dimensione del buffer disponibili)

1

mia Englist è povero

se faccio questo lavoro, io

  1. usare Python convertito jSON a txt

  2. uso di utensili mysq imp, importazione txt a MySQL

se si deve fare python + mysql allinone, vi suggerisco di utilizzare

insert table values(1),value(2)...value(xxx) 

perché 'SELECT request_no DA verificarsi master'multiple, dovrebbe essere letto da JSON

mia Englist è molto poor.so ..

0

Data questa informazione, sembra che sia lo script sia il DB siano in gran parte inattivi. Tweaking qualsiasi cosa a livello MySQL sarebbe prematuro.

È necessaria una maggiore visibilità su ciò che sta facendo il programma.

Inizia registrando per quanto tempo ciascuna query richiede, quanti errori si verificano e così via.

Questi SELECTs potrebbero aver bisogno di aggiungere un indice per funzionare bene, se si tratta di un problema.

Problemi correlati