2015-02-02 9 views
5

Sto provando a eseguire una COPIA di redshift in SQLAlchemy.Redshift L'operazione di COPY non funziona in SQLAlchemy

Il seguente SQL correttamente copie di oggetti dal mio secchio S3 nella mia tabella di Redshift quando eseguo in psql:

COPY posts FROM 's3://mybucket/the/key/prefix' 
WITH CREDENTIALS 'aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' 
JSON AS 'auto'; 

Ho diversi file di nome

s3://mybucket/the/key/prefix.001.json 
s3://mybucket/the/key/prefix.002.json 
etc. 

posso verificare che il nuovo le righe sono state aggiunte alla tabella con select count(*) from posts.

Tuttavia, quando eseguo esattamente la stessa espressione SQL in SQLAlchemy, l'esecuzione si completa senza errori, ma nessuna riga viene aggiunta alla mia tabella.

session = get_redshift_session() 
session.bind.execute("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';") 
session.commit() 

Non importa se faccio il sopra o

from sqlalchemy.sql import text 
session = get_redshift_session() 
session.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';")) 
session.commit() 

risposta

6

ho praticamente avuto lo stesso problema, anche se nel mio caso era più:

engine = create_engine('...') 
engine.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';")) 

Facendo un passo attraverso PDB, il problema era ovviamente la mancanza di un .commit() essere invocato. Non so perché session.commit() non funzioni nel tuo caso (forse la sessione "ha perso la traccia" dei comandi inviati?) Quindi potrebbe non risolvere il tuo problema.

Comunque, come explained in the sqlalchemy docs

Dato questo requisito, SQLAlchemy implementa il proprio caratteristica “autocommit”, che funziona in modo completamente uniforme in tutti i backend. Questo si ottiene rilevando istruzioni che rappresentano operazioni che cambiano i dati, cioè INSERT, UPDATE, DELETE [...] Se l'istruzione è un'istruzione di solo testo e il flag non è impostato, viene usata un'espressione regolare per rilevare INSERT, UPDATE , DELETE, nonché una varietà di altri comandi per un particolare back-end.

Quindi, ci sono 2 soluzioni, sia:

  • text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';").execution_options(autocommit=True).
  • Oppure, ottenere una versione fissa del dialetto redshift ... Ho appena opened a PR su di esso
0

Ho avuto successo utilizzando il linguaggio delle espressioni di base e Connection.execute() (in contrasto con l'ORM e sessioni) per copiare i file delimitati da Redshift con il codice qui sotto. Forse potresti adattarlo per JSON.

def copy_s3_to_redshift(conn, s3path, table, aws_access_key, aws_secret_key, delim='\t', uncompress='auto', ignoreheader=None): 
    """Copy a TSV file from S3 into redshift. 

    Note the CSV option is not used, so quotes and escapes are ignored. Empty fields are loaded as null. 
    Does not commit a transaction. 
    :param Connection conn: SQLAlchemy Connection 
    :param str uncompress: None, 'gzip', 'lzop', or 'auto' to autodetect from `s3path` extension. 
    :param int ignoreheader: Ignore this many initial rows. 
    :return: Whatever a copy command returns. 
    """ 
    if uncompress == 'auto': 
     uncompress = 'gzip' if s3path.endswith('.gz') else 'lzop' if s3path.endswith('.lzo') else None 

    copy = text(""" 
     copy "{table}" 
     from :s3path 
     credentials 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key}' 
     delimiter :delim 
     emptyasnull 
     ignoreheader :ignoreheader 
     compupdate on 
     comprows 1000000 
     {uncompress}; 
     """.format(uncompress=uncompress or '', table=text(table), aws_access_key=aws_access_key, aws_secret_key=aws_secret_key)) # copy command doesn't like table name or keys single-quoted 
    return conn.execute(copy, s3path=s3path, delim=delim, ignoreheader=ignoreheader or 0)