2014-09-02 14 views
6

Per fornire il contesto che posso/è necessario, sto provando a recuperare alcuni dati memorizzati su un server postgres remoto (heroku) in un DataFrame panda, usando psycopg2 per connettersi.Estrai grandi quantità di dati da un server remoto, in un DataFrame

Mi interessa due tabelle specifiche, utenti e eventi, e la connessione funziona bene, perché quando si tira giù i dati utente

import pandas.io.sql as sql 
# [...] 
users = sql.read_sql("SELECT * FROM users", conn) 

dopo aver atteso qualche secondo, il dataframe è restituito come previsto.

<class 'pandas.core.frame.DataFrame'> 
Int64Index: 67458 entries, 0 to 67457 
Data columns (total 35 columns): [...] 

Eppure, quando si cerca di tirare le più grandi, più pesanti eventi i dati direttamente dal ipython, dopo un lungo periodo di tempo, si blocca solo:

In [11]: events = sql.read_sql("SELECT * FROM events", conn) 
[email protected]:~$ 

e quando si cerca da un quaderno ipython ottengo il morto kernel errore

Il kernel è morto, ti piacerebbe riavviarlo? Se non si riavvia il kernel, sarà possibile salvare il notebook, ma il codice in esecuzione non funzionerà fino a quando il notebook non verrà riaperto.


Update # 1:

per avere una migliore idea delle dimensioni degli eventi tavolo che sto cercando di tirare dentro, qui ci sono il numero di record e il numero di attributi per ogni:

In [11]: sql.read_sql("SELECT count(*) FROM events", conn) 
Out[11]: 
    count 
0 2711453 

In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns) 
Out[12]: 18 

Aggiornamento # 2:

memoria è sicuramente un collo di bottiglia per l'implementazione corrente di read_sql: quando si tira giù i eventi e il tentativo di eseguire un'altra istanza di ipython il risultato è

[email protected]:~$ sudo ipython 
-bash: fork: Cannot allocate memory 

Aggiornamento n. 3:

Ho provato per la prima volta con un'implementazione read_sql_chunked che sarebbe sufficiente per eturn la matrice di DataFrames parziali:

def read_sql_chunked(query, conn, nrows, chunksize=1000): 
    start = 0 
    dfs = [] 
    while start < nrows: 
     df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn) 
     start += chunksize 
     dfs.append(df) 
     print "Events added: %s to %s of %s" % (start-chunksize, start, nrows) 
    # print "concatenating dfs" 
    return dfs 

event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000) 

e che funziona bene, ma quando si cerca di concatenare i DataFrames, il kernel muore nuovamente.
E questo dopo aver dato alla VM 2 GB di RAM.

Sulla base della spiegazione di Andy di read_sql vs.read_csv differenza di attuazione e le prestazioni, la prossima cosa che ho provato è stato quello di aggiungere i record in un file CSV e poi leggerli tutti in una dataframe:

event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8') 

for df in event_dfs[1:]: 
    df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8') 

Anche in questo caso, la scrittura in formato CSV completato con successo - un file di 657MB - ma la lettura dal CSV non termina mai.

Come si può approssimare la quantità di RAM sufficiente per leggere dire un file CSV da 657 MB, dal momento che 2 GB non sembrano essere sufficienti?


Percepita mi manca un po 'di comprensione fondamentale di una DataFrames o psycopg2, ma mi sono bloccato, non riesco nemmeno a individuare il collo di bottiglia o dove da ottimizzare.

Qual è la strategia corretta per prelevare grandi quantità di dati da un server remoto (postgres)?

+0

Come un'esperienza che schifo Speranza possiamo ottenere questo lavoro per voi in! il futuro Per curiosità di quanto è grande il tuo tavolo/quante righe? –

+0

@AndyHayden aggiornato per aggiungere il numero di record e il numero di attributi per ognuno sulla tabella _eventi_ –

+0

Hai * bisogno * di tutti i dati su una volta in memoria? O è sufficiente avere solo una parte dei dati (ad es. determinate colonne) contemporaneamente in un DataFrame? (ma a parte questo, la tua domanda su quanto è grande un dataframe ovviamente è legittimo) – joris

risposta

4

sospetto che ci sono un paio di (legati) le cose in gioco qui causando lentezza:

  1. read_sql è scritto in Python, quindi è un po 'lento (soprattutto rispetto a read_csv, che è scritto in Cython - e con attenzione implementato per la velocità!) e si basa su sqlalchemy piuttosto che su alcuni (potenzialmente molto più veloci) C-DBAPI. L'impulso a spostarsi su sqlalchmey è stato quello di rendere questa mossa più facile in futuro (così come il supporto della piattaforma cross-sql).
  2. Si può essere a corto di memoria, come troppi oggetti Python sono in memoria (questo è legato a non utilizzare un C-DBAPI), ma potenzialmente potrebbero essere affrontati ...

penso che il immediato la soluzione è un approccio basato su blocchi (e c'è un feature request per avere questo lavoro in modo nativo nei panda read_sql e read_sql_table).

MODIFICA: A partire da Pandas v0.16.2 questo approccio basato su blocchi è implementato in modo nativo in read_sql.


Dal momento che si sta utilizzando Postgres si ha accesso al l'LIMIT and OFFSET queries, che rende suddivisione in blocchi abbastanza facile. (Ho ragione di pensare queste non sono disponibili in tutte le lingue di sql?)

primo luogo, ottenere il numero di righe (o un estimate) nella tabella:

nrows = con.execute('SELECT count(*) FROM users').fetchone()[0] # also works with an sqlalchemy engine 

Utilizzare questo per scorrere l' tavolo (per il debug si potrebbe aggiungere alcune dichiarazioni di stampa per confermare che stava funzionando/non si è schiantato!) e poi unire il risultato:

def read_sql_chunked(query, con, nrows, chunksize=1000): 
    start = 1 
    dfs = [] # Note: could probably make this neater with a generator/for loop 
    while start < nrows: 
     df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con) 
     dfs.append(df) 
    return pd.concat(dfs, ignore_index=True) 

Nota: questo presuppone che il database si inserisce in memoria! Se non è così, dovrai lavorare su ogni chunk (stile mapreduce) ... o investire in più memoria!

+0

La memoria potrebbe essere il collo di bottiglia: sto eseguendo una VM che aveva solo il 512M predefinito. Arrampica rapidamente fino a 1024M e se non funziona, darò la prova Chunked. –

+0

@MariusButuc fammi sapere come questa soluzione fa fiere/se hai problemi! –

+0

aggiunto _Aggiornamento # 3_ con i miei nuovi tentativi (ancora meno riusciti). –

0

tenta di utilizzare i panda:

* mysql_cn = mysql.connector.connect (host = 'localhost', port = 123, user = 'xyz', passwd = '****', db = 'xy_db') **

data = pd.read_sql ('SELECT * FROM tabella;', con = mysql_cn

mysql_cn.close()

ha funzionato per me

Problemi correlati