2014-07-25 9 views
6

Ho un database MongoDB da 0,7 GB contenente tweet che sto tentando di caricare in un dataframe. Tuttavia, ottengo un errore.Un modo migliore per caricare dati MongoDB su un DataFrame usando Pandas e PyMongo?

MemoryError:  

Il mio codice è simile al seguente:

cursor = tweets.find() #Where tweets is my collection 
tweet_fields = ['id'] 
result = DataFrame(list(cursor), columns = tweet_fields) 

Ho provato i metodi nei seguenti risposte, che ad un certo punto di creare un elenco di tutti gli elementi del database prima di caricarla.

Tuttavia, in un'altra risposta che parla di lista(), la persona che ha detto che è bene per piccoli insiemi di dati, perché tutto viene caricato in memoria.

Nel mio caso, penso che sia l'origine dell'errore. Sono troppi i dati da caricare nella memoria. Quale altro metodo posso usare?

risposta

7

Ho modificato il mio codice al seguente:

cursor = tweets.find(fields=['id']) 
tweet_fields = ['id'] 
result = DataFrame(list(cursor), columns = tweet_fields) 

Aggiungendo la campi parametro nella funzione find() ho limitato l'uscita. Il che significa che non sto caricando tutti i campi ma solo i campi selezionati in DataFrame. Adesso funziona tutto bene.

4

Il modo più veloce, e probabilmente più efficiente in termini di memoria, per creare un DataFrame da una query di mongodb, come nel tuo caso, sarebbe utilizzare monary.

This post ha una spiegazione semplice e concisa.

0

un modo elegante di farlo sarebbe come segue:

import pandas as pd 
def my_transform_logic(x): 
    if x : 
     do_something 
     return result 

def process(cursor): 
    df = pd.DataFrame(list(cursor)) 
    df['result_col'] = df['col_to_be_processed'].apply(lambda value: my_transform_logic(value)) 

    #making list off dictionaries 
    db.collection_name.insert_many(final_df.to_dict('records')) 

    # or update 
    db.collection_name.update_many(final_df.to_dict('records'),upsert=True) 


#make a list of cursors.. you can read the parallel_scan api of pymongo 

cursors = mongo_collection.parallel_scan(6) 
for cursor in cursors: 
    process(cursor) 

ho provato il processo di cui sopra su una collezione di MongoDB con 2,6 milioni di record utilizzando Joblib sul codice di cui sopra. Il mio codice non ha generato alcun errore di memoria e l'elaborazione è terminata in 2 ore.

Problemi correlati