2015-08-13 19 views
9

Ho alcuni DataFrames Panda piuttosto grandi e mi piacerebbe utilizzare i nuovi mapping di massa di SQL per caricarli su un Microsoft SQL Server tramite SQL Alchemy. Il metodo pandas.to_sql, sebbene sia bello, è lento.Inserimento di massa Un DataFrame di Pandas Utilizzo di SQLAlchemy

sto avendo problemi a scrivere il codice ...

Mi piacerebbe essere in grado di passare questa funzione un dataframe panda che sto chiamando table, un nome di schema sto chiamando schema, e un nome di tabella che sto chiamando name. Idealmente, la funzione 1.) cancellerà la tabella se già esiste. 2.) creare una nuova tabella 3.) creare un mapper e 4.) inserire bulk utilizzando i dati di mapper e panda. Sono bloccato sulla parte 3.

Ecco il mio codice (ovviamente approssimativo). Sto lottando con come far funzionare la funzione di mapper con le mie chiavi primarie. Non ho davvero bisogno di chiavi primarie ma la funzione mapper lo richiede.

Grazie per gli approfondimenti.

from sqlalchemy import create_engine Table, Column, MetaData 
from sqlalchemy.orm import mapper, create_session 
from sqlalchemy.ext.declarative import declarative_base 
from pandas.io.sql import SQLTable, SQLDatabase 

def bulk_upload(table, schema, name): 
    e = create_engine('mssql+pyodbc://MYDB') 
    s = create_session(bind=e) 
    m = MetaData(bind=e,reflect=True,schema=schema) 
    Base = declarative_base(bind=e,metadata=m) 
    t = Table(name,m) 
    m.remove(t) 
    t.drop(checkfirst=True) 
    sqld = SQLDatabase(e, schema=schema,meta=m) 
    sqlt = SQLTable(name, sqld, table).table 
    sqlt.metadata = m 
    m.create_all(bind=e,tables=[sqlt])  
    class MyClass(Base): 
     return 
    mapper(MyClass, sqlt)  

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records')) 
    return 
+0

Sembra che tu stia ricreando da solo la funzione 'to_sql', e dubito che questo sarà più veloce. Il collo di bottiglia che scrive i dati in SQL si trova principalmente nei driver python ('pyobdc' nel tuo caso), e questo è qualcosa che non puoi evitare con l'implementazione di cui sopra. Inoltre, 'to_sql' non usa l'ORM, che è considerato più lento di CORE sqlalchemy anche quando si utilizza l'inserimento in blocco (http://docs.sqlalchemy.org/en/latest/faq/performance.html # im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow) – joris

+0

Inoltre, se 'to_sql' è troppo lento e non è possibile migliorarlo (ad esempio modificando la connessione i parametri, il driver utilizzato (ad es. pymssql), la velocità di internet, rimuovendo i vincoli sulla tabella, ecc.), un'alternativa più veloce è scrivere i dati in csv e caricarli nella tabella SQL. – joris

+0

@joris Grazie. Sembra che le "operazioni alla rinfusa" elencate qui siano un po 'errate. http://docs.sqlalchemy.org/en/rel_1_0/orm/persistence_techniques.html#bulk-operations Quello che ho davvero bisogno di fare è inviare il file di dati pandas ad un file di testo e scrivere l'operazione BULK INSERT in questo modo ... http: //stackoverflow.com/questions/29638136/how-to-speed-up-with-bulk-insert-to-ms-server-from-python-with-pyodbc-from-csv – Charles

risposta

14

Ho avuto un problema simile con pd.to_sql che impiega ore per caricare i dati. Il sotto codice bulk ha inserito gli stessi dati in pochi secondi.

from sqlalchemy import create_engine 
import psycopg2 as pg 
#load python script that batch loads pandas df to sql 
import cStringIO 

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>' 
engine = create_engine(address) 
connection = engine.raw_connection() 
cursor = connection.cursor() 

#df is the dataframe containing an index and the columns "Event" and "Day" 
#create Index column to use as primary key 
df.reset_index(inplace=True) 
df.rename(columns={'index':'Index'}, inplace =True) 

#create the table but first drop if it already exists 
command = '''DROP TABLE IF EXISTS localytics_app2; 
CREATE TABLE localytics_app2 
(
"Index" serial primary key, 
"Event" text, 
"Day" timestamp without time zone, 
);''' 
cursor.execute(command) 
connection.commit() 

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function 
output = cStringIO.StringIO() 
#ignore the index 
df.to_csv(output, sep='\t', header=False, index=False) 
#jump to start of stream 
output.seek(0) 
contents = output.getvalue() 
cur = connection.cursor() 
#null values become '' 
cur.copy_from(output, 'localytics_app2', null="")  
connection.commit() 
cur.close() 
+4

Sembra interessante. Provato con un DB Oracle, dice cx_Oracle.Cursor oggetto non ha attributo 'copy_from'. Il metodo copy_from sembra essere una cosa postgres. Qualche idea su un metodo agnostico DB? –

+0

questa è una bella soluzione in-mem. solo un punto è la variabile 'contents' non utilizzata. potrebbe anche lasciarlo cadere poiché fa una lettura attraverso l'intero buffer di stringa che può diventare grande a seconda della dimensione del db. testato senza e funziona bene. – marko

+2

cStringIO è stato deprecato per python3. Se si utilizza python3, è possibile utilizzare: import io; output = io.StringIO() – ansonw

8

Questo potrebbe essere stato risposto da allora, ma ho trovato la soluzione mediante la raccolta risposte diverse su questo sito e l'allineamento con il doc di SQLAlchemy.

  1. La tabella deve esistere già in db1; con un indice impostato con auto_increment su.
  2. Classe La corrente deve essere allineata con il dataframe importato nel CSV e la tabella nel db1.

Spero che questo aiuti chi arriva qui e vuole mescolare Panda e SQLAlchemy in modo rapido.

from urllib import quote_plus as urlquote 
import sqlalchemy 
from sqlalchemy import create_engine 
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy import Column, Integer, String, Numeric 
from sqlalchemy.orm import sessionmaker 
import pandas as pd 


# Set up of the engine to connect to the database 
# the urlquote is used for passing the password which might contain special characters such as "/" 
engine = create_engine('mysql://root:%[email protected]/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False) 
conn = engine.connect() 
Base = declarative_base() 

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc. 
class Current(Base): 
    __tablename__ = 'tableName' 

    id = Column(Integer, primary_key=True) 
    Date = Column(String(500)) 
    Type = Column(String(500)) 
    Value = Column(Numeric()) 

    def __repr__(self): 
     return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value) 

# Set up of the table in db and the file to import 
fileToRead = 'file.csv' 
tableToWriteTo = 'tableName' 

# Panda to create a lovely dataframe 
df_to_be_written = pd.read_csv(fileToRead) 
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks. 
listToWrite = df_to_be_written.to_dict(orient='records') 

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True) 
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True) 

# Open the session 
Session = sessionmaker(bind=engine) 
session = Session() 

# Inser the dataframe into the database in one bulk 
conn.execute(table.insert(), listToWrite) 

# Commit the changes 
session.commit() 

# Close the session 
session.close() 
+0

Ho trovato questo articolo SQLAlchemy molto utile per migliorare la velocità degli inserti: http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000- row-with-the-orm-and-it-s-really-slow –

+0

Grazie per questo, mi hai reso la vita molto più facile. Puoi spiegare il punto di 'def __repr __ (self)'? –

1

Poiché si tratta di un carico di lavoro O I/pesante è anche possibile utilizzare il modulo python threading attraverso multiprocessing.dummy. Questo ha accelerato le cose per me:

import math 
from multiprocessing.dummy import Pool as ThreadPool 

... 

def insert_df(df, *args, **kwargs): 
    nworkers = 4 

    chunksize = math.floor(df.shape[0]/nworkers) 
    chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)] 
    chunks.append((chunksize * nworkers, df.shape[0])) 
    pool = ThreadPool(nworkers) 

    def worker(chunk): 
     i, j = chunk 
     df.iloc[i:j, :].to_sql(*args, **kwargs) 

    pool.map(worker, chunks) 
    pool.close() 
    pool.join() 


.... 

insert_df(df, "foo_bar", engine, if_exists='append') 
2

Sulla base @ansonw risposte:

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'): 
    # Create Table 
    df[:0].to_sql(table, engine, if_exists=if_exists) 

    # Prepare data 
    output = cStringIO.StringIO() 
    df.to_csv(output, sep=sep, header=False, encoding=encoding) 
    output.seek(0) 

    # Insert data 
    connection = engine.raw_connection() 
    cursor = connection.cursor() 
    cursor.copy_from(output, table, sep=sep, null='') 
    connection.commit() 
    cursor.close() 

ho inserire 200000 linee in 5 secondi, invece di 4 minuti

+0

Chi ha downvoted dovrebbe spiegare un po 'perché. – javadba

+0

Non ho fatto un downvote, ma in realtà non sembra una soluzione che utilizzi i panda come desiderato: più processi + panda + sqlalchemy. Solitamente durante l'ingestione, in particolare con set di dati più grandi, ci sarà una posizione temporanea per archiviare i dati nel database e quindi eseguire il massaggio di tali dati (delete/back-populate) prima di un inserimento/aggiornamento. –

1

miei Postgres soluzione specifica al di sotto auto-crea la tabella del database utilizzando il tuo dataframe pandas, ed esegue un veloce inserimento di massa utilizzando il postgres COPY my_table FROM ...

import io 

import pandas as pd 
from sqlalchemy import create_engine 

def write_to_table(df, db_engine, table_name, if_exists='fail'): 
    string_data_io = io.StringIO() 
    df.to_csv(string_data_io, sep='|', index=False) 
    pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine) 
    table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df, 
           index=False, if_exists=if_exists) 
    table.create() 
    string_data_io.seek(0) 
    string_data_io.readline() # remove header 
    with db_engine.connect() as connection: 
     with connection.connection.cursor() as cursor: 
      copy_cmd = "COPY %s FROM STDIN HEADER DELIMITER '|' CSV" % table_name 
      cursor.copy_expert(copy_cmd, string_data_io) 
     connection.connection.commit() 
Problemi correlati