2014-09-06 10 views
20

Ho un grande dataframe (diverse milioni di righe).Come eseguire iterate su blocchi consecutivi di dataframe Pandas in modo efficiente

Desidero essere in grado di eseguire un'operazione di groupby su di esso, ma semplicemente raggruppando sottoinsiemi di righe consecutivi (preferibilmente di dimensioni uguali) anziché utilizzare una particolare proprietà delle singole righe per decidere a quale gruppo vanno .

Il caso d'uso: voglio applicare una funzione a ciascuna riga tramite una mappa parallela in IPython. Non importa quali file vanno a quale motore di back-end, poiché la funzione calcola un risultato basato su una riga alla volta. (Concettualmente, almeno, in realtà è vettorializzare.)

mi è venuta in mente qualcosa di simile:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to 
max_idx = dataframe.index.max() 
tenths = ((10 * dataframe.index)/(1 + max_idx)).astype(np.uint32) 

# Use this value to perform a groupby, yielding 10 consecutive chunks 
groups = [g[1] for g in dataframe.groupby(tenths)] 

# Process chunks in parallel 
results = dview.map_sync(my_function, groups) 

Ma questo sembra molto prolisso, e non garantisce pezzi uguali dimensioni. Soprattutto se l'indice è scarso o non intero o altro.

Qualche suggerimento per un modo migliore?

Grazie!

risposta

20

In pratica, non è possibile garantire pezzi di uguale dimensione: il numero di righe potrebbe essere primo, dopotutto, nel qual caso le uniche opzioni di chunking sarebbero pezzi di dimensione 1 o un grosso pezzo. Tendo a passare un array a groupby. A partire da:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15) 
>>> df[0] = range(15) 
>>> df 
    0   1   2   3   4 
0 0 0.746300 0.346277 0.220362 0.172680 
0 1 0.657324 0.687169 0.384196 0.214118 
0 2 0.016062 0.858784 0.236364 0.963389 
[...] 
0 13 0.510273 0.051608 0.230402 0.756921 
0 14 0.950544 0.576539 0.642602 0.907850 

[15 rows x 5 columns] 

dove ho volutamente fatto l'indice uninformative impostando a 0, abbiamo semplicemente decidere sulla nostra dimensione (qui 10) e interi dividiamo una matrice da esso:

>>> df.groupby(np.arange(len(df))//10) 
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c> 
>>> for k,g in df.groupby(np.arange(len(df))//10): 
...  print(k,g) 
...  
0 0   1   2   3   4 
0 0 0.746300 0.346277 0.220362 0.172680 
0 1 0.657324 0.687169 0.384196 0.214118 
0 2 0.016062 0.858784 0.236364 0.963389 
[...] 
0 8 0.241049 0.246149 0.241935 0.563428 
0 9 0.493819 0.918858 0.193236 0.266257 

[10 rows x 5 columns] 
1  0   1   2   3   4 
0 10 0.037693 0.370789 0.369117 0.401041 
0 11 0.721843 0.862295 0.671733 0.605006 
[...] 
0 14 0.950544 0.576539 0.642602 0.907850 

[5 rows x 5 columns] 

I metodi basati sull'affinatura del DataFrame possono fallire quando l'indice non è compatibile con quello, sebbene sia sempre possibile usare .iloc[a:b] per ignorare i valori dell'indice e accedere ai dati per posizione.

+0

Questo era quello che avevo in mente! Bene tecnicamente "df.groupby (np.arange (len (df)) // (len (df)/10))" per ottenere un numero fisso di gruppi (1 per core) invece di dimensione fissa. Per qualche ragione non mi era mai venuto in mente che la chiave di raggruppamento non fosse effettivamente correlata all'indice ... –

+1

Vale la pena ricordare che per efficienza è probabilmente meglio leggere il file originale usando un "iteratore" (https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html) e un "chunksize" in modo che la funzione read_csv faccia la lettura e che ciascun frammento possa essere passato a un processo separato come descritto da @ Ryan –

19

Non sono sicuro se questo è esattamente ciò che si desidera, ma ho trovato queste funzioni di cernia su another SO thread abbastanza utili per fare un pool multiprocessore.

Ecco un breve esempio da quel filo, che potrebbe fare qualcosa di simile a ciò che si vuole:

import numpy as np 
import pandas as pds 

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd']) 

def chunker(seq, size): 
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) 

for i in chunker(df,5): 
    print i 

che ti dà qualcosa di simile:

  a   b   c   d 
0 0.860574 0.059326 0.339192 0.786399 
1 0.029196 0.395613 0.524240 0.380265 
2 0.235759 0.164282 0.350042 0.877004 
3 0.545394 0.881960 0.994079 0.721279 
4 0.584504 0.648308 0.655147 0.511390 
      a   b   c   d 
5 0.276160 0.982803 0.451825 0.845363 
6 0.728453 0.246870 0.515770 0.343479 
7 0.971947 0.278430 0.006910 0.888512 
8 0.044888 0.875791 0.842361 0.890675 
9 0.200563 0.246080 0.333202 0.574488 
      a   b   c   d 
10 0.971125 0.106790 0.274001 0.960579 
11 0.722224 0.575325 0.465267 0.258976 
12 0.574039 0.258625 0.469209 0.886768 
13 0.915423 0.713076 0.073338 0.622967 

Mi auguro che aiuta.

EDIT

In questo caso, ho usato questa funzione con pool of processors in (circa) questo modo:

from multiprocessing import Pool 

nprocs = 4 

pool = Pool(nprocs) 

for chunk in chunker(df, nprocs): 
    data = pool.map(myfunction, chunk) 
    data.domorestuff() 

assumo questo dovrebbe essere molto simile all'utilizzo IPython macchine distribuito, ma rifugio lo ho provato

+0

Questo sicuramente farebbe il trucco. Sono ancora un po 'in attesa di qualche bel gruppo di one-liner, ma se qualcosa del genere si materializza, ottieni il premio :-) –

7

Un segno di un buon ambiente è molte scelte, quindi mi aggiungere questo da Anaconda Blaze, molto usare Odo

import blaze as bz 
import pandas as pd 

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]}) 

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2): 
    # Do stuff with chunked dataframe 
6

Usa NumPy ha questo costruito in: np.array_split()

import numpy as np 
import pandas as pd 

data = pd.DataFrame(np.random.rand(10, 3)) 
for chunk in np.array_split(data, 5): 
    assert len(chunk) == len(data)/5 
Problemi correlati