2009-08-25 20 views
8

Sto utilizzando SQLAlchemy con un backend Postgres per eseguire un inserimento o aggiornamento di massa. Per provare a migliorare le prestazioni, sto tentando di eseguire il commit solo una volta ogni mille righe o così:Come faccio a fare un inserimento o aggiornamento di massa con SQLAlchemy?

trans = engine.begin() 
    for i, rec in enumerate(records): 
    if i % 1000 == 0: 
     trans.commit() 
     trans = engine.begin() 
    try: 
     inserter.execute(...) 
    except sa.exceptions.SQLError: 
     my_table.update(...).execute() 
trans.commit() 

Tuttavia, questo non funziona. Sembra che quando l'INSERT fallisce, lascia le cose in uno stato strano che impedisce che l'UPDATE accada. Sta automaticamente ripristinando la transazione? Se è così, può essere fermato? Non voglio che la mia intera transazione venga annullata in caso di problemi, ed è per questo che cerco di cogliere l'eccezione in primo luogo.

Il messaggio di errore che sto ottenendo, BTW, è "sqlalchemy.exc.InternalError: (InternalError) la transazione corrente viene interrotta, i comandi ignorati fino alla fine del blocco della transazione) chiamata.

risposta

5

Si sta verificando un comportamento strano Postgresql: se si verifica un errore in una transazione, impone il rollback dell'intera transazione. Lo considero un bug del design di Postgres; in alcuni casi ci vuole un bel po 'di contorsionismo SQL.

Una soluzione è quella di eseguire prima l'AGGIORNAMENTO. Rileva se ha effettivamente modificato una riga guardando cursor.rowcount; se non ha modificato alcuna riga, non esisteva, così fa l'INSERT. (Questo sarà più veloce se si aggiorna più frequentemente di quanto si inserisce, naturalmente.)

Un'altra soluzione è quella di utilizzare i punti di salvataggio:

SAVEPOINT a; 
INSERT INTO ....; 
-- on error: 
ROLLBACK TO SAVEPOINT a; 
UPDATE ...; 
-- on success: 
RELEASE SAVEPOINT a; 

Questo ha un serio problema per il codice di produzione di qualità: è necessario rilevare l'errore con precisione. Presumibilmente ti aspetti di eseguire un controllo dei vincoli univoco, ma potresti colpire qualcosa di inaspettato e potrebbe essere quasi impossibile distinguere in modo affidabile l'errore previsto da quello inatteso. Se questo raggiunge la condizione di errore in modo errato, porterà a problemi oscuri in cui nulla verrà aggiornato o inserito e non verrà visualizzato alcun errore. Stai molto attento con questo. Puoi restringere il caso di errore osservando il codice di errore di Postgresql per assicurarti che sia il tipo di errore che ti aspetti, ma il potenziale problema è ancora lì.

Infine, se si desidera eseguire batch-insert-or-update, in realtà si desidera eseguire molti di questi in pochi comandi, non un elemento per comando. Ciò richiede SQL più complicato: SELEZIONA annidato all'interno di un INSERT, filtrando gli elementi giusti da inserire e aggiornare.

+1

"Se si verifica un errore in una transazione, impone il rollback dell'intera transazione. Lo considero un bug di progettazione Postgres." - Non è questo il punto delle transazioni? Da [Wikipedia] (http: //en.wikipedia.org/wiki/Database_transaction): "Le transazioni forniscono una proposizione" tutto o niente ", affermando che ogni unità di lavoro eseguita in un database deve essere completa nella sua interezza o non avere alcun effetto". – spiffytech

+0

@Spiffytech Buona risposta. Questo in realtà mi ha fatto LOL. –

4

Questo errore proviene da PostgreSQL. PostgreSQL non consente di eseguire comandi nella stessa transazione se un comando crea un errore. Per risolvere questo problema è possibile utilizzare transazioni nidificate (implementate utilizzando punti di salvataggio SQL) tramite conn.begin_nested(). Ecco qualcosa che potrebbe funzionare. Ho fatto in modo che il codice usasse connessioni esplicite, fattorizzava la parte chunking e faceva in modo che il codice usasse il gestore di contesto per gestire correttamente le transazioni.

from itertools import chain, islice 
def chunked(seq, chunksize): 
    """Yields items from an iterator in chunks.""" 
    it = iter(seq) 
    while True: 
     yield chain([it.next()], islice(it, chunksize-1)) 

conn = engine.commit() 
for chunk in chunked(records, 1000): 
    with conn.begin(): 
     for rec in chunk: 
      try: 
       with conn.begin_nested(): 
        conn.execute(inserter, ...) 
      except sa.exceptions.SQLError: 
       conn.execute(my_table.update(...)) 

Questo non avrà comunque prestazioni stellari a causa del sovraccarico della transazione nidificato. Se si desidera ottenere prestazioni migliori, provare a individuare quali righe creeranno errori in anticipo con una query selezionata e utilizzare il supporto executemany (execute può richiedere un elenco di dict se tutti gli inserti utilizzano le stesse colonne). Se è necessario gestire gli aggiornamenti simultanei, è comunque necessario eseguire la gestione degli errori sia riprovando che ricadendo a uno a uno inserimenti.

Problemi correlati