Ho circa 60 GB di file JSON che sto analizzando utilizzando Python e quindi inserendo in un database MySQL utilizzando il connettore Python-MySQL. Ogni file JSON è di circa 500 MBBasso numero di scritture InnoDB al secondo: da AWS EC2 a MySQL RDS utilizzando Python
Ho utilizzato un'istanza di AWS r3.xlarge EC2 con un volume secondario per contenere i 60 GB di dati JSON.
Sto quindi utilizzando un'istanza MySQL RDS r3.xlarge di AWS. Queste istanze si trovano tutte nella stessa area e zona di disponibilità. L'istanza EC2 utilizza il seguente script Python per caricare il JSON, analizzarlo e quindi inserirlo nel MySQL RDS. Il mio pitone:
import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os
os.chdir("./json_data")
for file in glob.glob("*.json"):
with open(file, 'rU') as data_file:
results = json.load(data_file)
print('working on file:', file)
cnx = mysql.connector.connect(user='', password='',
host='')
cursor = cnx.cursor(buffered=True)
DB_NAME = 'DB'
def create_database(cursor):
try:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
except mysql.connector.Error as err:
print("Failed creating database: {}".format(err))
exit(1)
try:
cnx.database = DB_NAME
except mysql.connector.Error as err:
if err.errno == errorcode.ER_BAD_DB_ERROR:
create_database(cursor)
cnx.database = DB_NAME
else:
print(err)
exit(1)
add_overall_data = ("INSERT INTO master"
"(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
"VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")
add_polyline = ("INSERT INTO polyline"
"(Overview_polyline, request_no)"
"VALUES (%(Overview_polyline)s, %(request_no)s)")
add_summary = ("INSERT INTO summary"
"(summary, request_no)"
"VALUES (%(summary)s, %(request_no)s)")
add_warnings = ("INSERT INTO warnings"
"(warnings, request_no)"
"VALUES (%(warnings)s, %(request_no)s)")
add_waypoint_order = ("INSERT INTO waypoint_order"
"(waypoint_order, request_no)"
"VALUES (%(waypoint_order)s, %(request_no)s)")
add_leg_data = ("INSERT INTO leg_data"
"(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)"
"VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
error_messages = []
for result in results:
if result["status"] == "OK":
for leg in result['routes'][0]['legs']:
try:
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": leg['dtf']['value'],
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": leg['start_address'],
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": leg['end_address']
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
except KeyError, e:
error_messages.append(e)
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": "000",
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": 'unknown',
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": 'unknown'
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for overview_polyline in result['routes']:
params = {
"request_no": request_no,
"Overview_polyline": overview_polyline['overview_polyline']['points']
}
cursor.execute(add_polyline, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for summary in result['routes']:
params = {
"request_no": request_no,
"summary": summary['summary']
}
cursor.execute(add_summary, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for warnings in result['routes']:
params = {
"request_no": request_no,
"warnings": str(warnings['warnings'])
}
cursor.execute(add_warnings, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for waypoint_order in result['routes']:
params = {
"request_no": request_no,
"waypoint_order": str(waypoint_order['waypoint_order'])
}
cursor.execute(add_waypoint_order, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for steps in result['routes'][0]['legs'][0]['steps']:
params = {
"request_no": request_no,
"leg_dt": steps['dt']['value'],
"leg_ds": steps['ds']['value'],
"leg_O_l": steps['start_location']['lat'],
"leg_O_ln": steps['start_location']['lng'],
"leg_D_l": steps['end_location']['lat'],
"leg_D_ln": steps['end_location']['lng'],
"leg_html_inst": steps['html_instructions'],
"leg_polyline": steps['polyline']['points'],
"leg_travel_mode": steps['travel_mode']
}
cursor.execute(add_leg_data, params)
cnx.commit()
print('error messages:', error_messages)
cursor.close()
cnx.close()
print('finished' + file)
Utilizzando htop sul Linux istanza posso vedere il seguente:
Per quanto riguarda il database MySQL, utilizzando MySQL Workbench posso vedere che:
Questo script python è stato utilizzato per giorni ma ho inserito solo il 20% dei dati in MySQL.
Le mie domande: come identificare il collo di bottiglia? È lo script Python? Sembra che stia usando una quantità di memoria bassa - posso aumentare questo? Ho controllato la dimensione InnoDB buffer pool secondo (How to improve the speed of InnoDB writes per second of MySQL DB) e trovato ad essere grande:
SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
| 11674845184 |
+---------------------------+
Dal momento che sto utilizzando un'istanza EC2 RDS e nella stessa regione non credo ci sia un collo di bottiglia della rete . Indicatori su dove dovrei cercare i maggiori risparmi sarebbero i benvenuti!
EDIT
Credo di aver inciampato sul problema. Per efficienza durante l'analisi, scrivo separatamente ogni livello di JSON. Tuttavia, devo quindi eseguire una query per abbinare una parte annidata di JSON con il suo livello più alto. Questa query ha un sovraccarico basso quando si utilizzano piccoli database. Ho notato che la velocità degli inserti è diminuita drasticamente su questo db. Questo perché sta cercando un db più grande e in continua crescita per connettere correttamente i dati JSON.
Non sono sicuro di come risolvere questo altro che aspettare fuori ....
Hai menzionato che EC2 e RDS si trovano nella stessa regione; sono anche nella stessa zona di disponibilità? In caso contrario, potrebbe essere un modo abbastanza semplice per vedere ulteriori miglioramenti. –
Sì, considerato quello. Sono entrambi nella stessa zona di disponibilità – LearningSlowly
Hai provato a utilizzare gli IOP forniti nell'istanza RDS? – mickzer