2016-02-01 21 views
6

SituazioneClojure: Group-by troppo lento (file 13 milioni di linee)

Ho un 13 milioni di linee CSV su cui desidera eseguire la regressione logistica (incanter) per ogni gruppo. Il mio file è come quella (i valori sono solo di esempio)

ID Max Probability 
1 1 0.5 
1 5 0.6 
1 10 0.99 
2 1 0.1 
2 7 0.95 

Così ho letto con un CSV-reader, everithing va bene.

devo poi qualcosa di simile:

({"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc. 

Voglio gruppo-da questi valori per Id, se non ricordo male, ci sono circa 1,2 milioni di Ids. (L'ho fatto in Python con i panda ed è super veloce)

Questa è la mia funzione per leggere e formattare il file (funziona benissimo sul set di dati più piccoli):

(defn read-file 
    [] 
    (let [path (:path-file @config) 
      content-csv (take-csv path \,)] 
     (->> (group-by :Id content-csv) 
      (map (fn [[k v]] 
       [k {:x (mapv :Max v) :y (mapv :Probability v)}])) 
      (into {})))) 

voglio finalmente avere qualcosa come quella per eseguire la regressione logistica (sono flessibile a tale proposito, non ha bisogno di vettori di: x e: y, seguenti del regolamento provvisorio sono ok)

{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc. 

Problema

Ho problemi con l'operazione group-by. L'ho provato separatamente sull'output da CSV e questo sta prendendo per sempre quando non si esaurisce a causa della memoria Java Heap Space. Pensavo che il problema fosse la mia cosa mapv ma questo è il gruppo.

Ho pensato di utilizzare ridurre o ridurre-kv ma non so come utilizzare queste funzioni per questo tipo di scopi.

Non mi interessa l'ordine di ": x" e ": y" (non appena sono uguali tra loro, voglio dire che x e y hanno lo stesso indice ... non è un problema perché loro sono sulla stessa linea) e di Id sul risultato finale e leggo quel gruppo, mantenendo l'ordine. Forse è quello che costa per l'operazione?

Vi do i dati di esempio, se una persona ha riscontrato che:

(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95})) 

Altre alternative

ho altre idee, ma non sono sicuro che siano "Clojure" -friendly.

  • In Python, a causa della natura della funzione e perché il file è già ordinato, invece di utilizzare gruppo-by, ho scritto in un dataframe inizio e gli indici finali per ciascun gruppo in modo che ho dovuto selezionare direttamente il sub-datatab.

  • Posso anche caricare un elenco di ID anziché calcolarlo da Clojure. Come

    (def ids ("1" "2" ecc.

Così forse è possibile cominciare con:

{"1" {:x [] :y []} "2" {:x [] :y []} etc. 

dalla ss precedente e quindi abbinare il file di grandi dimensioni su ogni ID.

Non so se è più efficiente in effetti.

Ho tutte le altre funzioni per la regressione logistica, mi manca solo questa parte! Grazie!

EDIT

Grazie per le risposte, ho finalmente questa soluzione.

Nel mio file project.clj

:jvm-opts ["-Xmx13g"]) 

Codice:

(defn data-group->map [group] 
    {(:Id (first group)) 
    {:x (map :Max group) 
    :y (map :Probability group)}}) 


(defn prob-cumsum [data] 
    (cag/fmap 
    (fn [x] 
     (assoc x :y (reductions + (x :y)))) 
    data)) 


(defn process-data-splitter [data] 
    (->> (partition-by :Id data) 
     (map data-group->map) 
     (into {}) 
     (prob-cumsum))) 

ho avvolto tutto il mio codice e funziona. La divisione richiede circa 5 minuti, ma non ho bisogno di megapixel. L'utilizzo della memoria può arrivare a tutta la memoria per la lettura dei file, quindi meno per sigmoid.

+1

È la cardinalità del ids alta o bassa? Gli ID in csv sono ordinati? Se è così, potresti essere in grado di fare il raggruppamento mentre stai leggendo sopra il CSV in un unico passaggio. –

+0

Ciao, grazie per la risposta. Ho circa 1,2-1,3 milioni di ID (10 volte meno dei dati effettivi). Il file è ordinato come il mio esempio, vale a dire: primo livello = ID, secondo livello = Max (Probabilità e Max sono ordinati allo stesso modo perché sono collegati da una curva crescente). Quindi forse la tua idea è buona, ancora non so come farlo. Un loop è una buona idea? Non mi piace trarre vantaggio dal multielaborazione. Proverò qualcosa con l'unione con la prima riformattazione dei dati. –

risposta

6

se il file è ordinato per ID, è possibile utilizzare partition-by anziché group-by.

allora il codice sarebbe simile a questa:

(defn data-group->map [group] 
    [(:Id (first group)) 
    {:x (mapv :Max group) 
    :y (mapv :Probability group)}]) 

(defn read-file [] 
    (let [path (:path-file @config) 
     content-csv (take-csv path \,)] 
    (->> content-csv 
     (partition-by :Id) 
     (map data-group->map) 
     (into {})))) 

che dovrebbe accelerarlo. Poi si può probabilmente renderlo più veloce utilizzando trasduttori

(defn read-file [] 
    (let [path (:path-file @config) 
     content-csv (take-csv path \,)] 
    (into {} (comp (partition-by :Id) 
        (map data-group->map)) 
      content-csv))) 

facciamo alcuni test:

prima generare una enorme di dati come la vostra:

(def huge-data 
    (doall (mapcat #(repeat 
        1000000 
        {:Id % :Max 1 :Probability 10}) 
      (range 10)))) 

abbiamo dieci milioni di pezzi di dati, con milioni di {:Id 0 :Max 1 :Probability 10}, milioni di {:Id 1 :Max 1 :Probability 10} e così via.

ora funzioni da testare:

(defn process-data-group-by [data] 
    (->> (group-by :Id data) 
     (map (fn [[k v]] 
       [k {:x (mapv :Max v) :y (mapv :Probability v)}])) 
     (into {}))) 

(defn process-data-partition-by [data] 
    (->> data 
     (partition-by :Id) 
     (map data-group->map) 
     (into {}))) 

(defn process-data-transducer [data] 
    (into {} (comp (partition-by :Id) (map data-group->map)) data)) 

e ora test in tempo:

(do (time (dorun (process-data-group-by huge-data))) 
    (time (dorun (process-data-partition-by huge-data))) 
    (time (dorun (process-data-transducer huge-data)))) 

"Elapsed time: 3377.167645 msecs" 
"Elapsed time: 3707.03448 msecs" 
"Elapsed time: 1462.955152 msecs" 

comunicazione, che produce partition-by sequenza artificiale, mentre il gruppo-by dovrebbe realizzare l'intera raccolta. Quindi, se avete bisogno del vostro gruppo di dati dal gruppo, non l'intera mappa, è possibile rimuovere (into {}) e accedere ad ognuno più veloce:

(defn process-data-partition-by [data] 
    (->> data 
     (partition-by :Id) 
     (map data-group->map))) 

controllo:

user> (time (def processed-data (process-data-partition-by huge-data))) 
"Elapsed time: 0.06079 msecs" 
#'user/processed-data 
user> (time (let [f (first processed-data)])) 
"Elapsed time: 302.200571 msecs" 
nil 
user> (time (let [f (second processed-data)])) 
"Elapsed time: 500.597153 msecs" 
nil 
user> (time (let [f (last processed-data)])) 
"Elapsed time: 2924.588625 msecs" 
nil 
user.core> (time (let [f (last processed-data)])) 
"Elapsed time: 0.037646 msecs" 
nil 
+0

Ciao, grazie per la risposta. Ho provato la tua soluzione con i tuoi dati di esempio ed è molto più veloce. Con il mio CSV, è molto lento. Quindi forse la causa è la lettura del file con slurp. Non so come risolverlo ma sembra che il gruppo non sia il vero problema (anche se ho imparato una soluzione migliore con il tuo post). Ma il problema è che ho un problema di Java Heap Space quando uso def, beacuase strano ho 16 go ram. –

+0

Ciao! Come si carica e analizza il file css? potresti aggiornare la tua domanda? – leetwinski

+0

Il problema con lo spazio dell'heap java potrebbe probabilmente essere risolto con jvm tuning, impostando il valore 'Xmx'. http://stackoverflow.com/questions/14763079/what-are-the-xms-and-xmx-parameters-when-starting-jvms. Ma il vero problema, può essere collegato al fatto che si mantengono tutti i dati caricati (anche non necessari). – leetwinski