2014-07-18 8 views
5

Sto provando ad elaborare> 10000 oggetti xts salvati su disco ognuno di circa 0,2 GB quando caricato in R. Vorrei usare foreach per elaborarli in parallelo. Il mio codice funziona per qualcosa come 100 oggetti xts che ho pre-caricato in memoria, esportato ecc. Ma dopo> 100 oggetti xts colpisco i limiti di memoria sulla mia macchina.Come superare i vincoli di memoria usando foreach

esempio di quello che sto cercando di fare:

require(TTR) 
require(doMPI) 
require(foreach) 

test.data <- runif(n=250*10*60*24) 

xts.1 <- xts(test.data, order.by=as.Date(1:length(test.data))) 
xts.1 <- cbind(xts.1, xts.1, xts.1, xts.1, xts.1, xts.1) 

colnames(xts.1) <- c("Open", "High", "Low", "Close", "Volume", "Adjusted") 

print(object.size(xts.1), units="Gb") 

xts.2 <- xts.1 
xts.3 <- xts.1 
xts.4 <- xts.1 

save(xts.1, file="xts.1.rda") 
save(xts.2, file="xts.2.rda") 
save(xts.3, file="xts.3.rda") 
save(xts.4, file="xts.4.rda") 

names <- c("xts.1", "xts.2", "xts.3", "xts.4") 

rm(xts.1) 
rm(xts.2) 
rm(xts.3) 
rm(xts.4) 

cl <- startMPIcluster(count=2) # Use 2 cores 
registerDoMPI(cl) 

result <- foreach(name=names, 
        .combine=cbind, 
        .multicombine=TRUE, 
        .inorder=FALSE, 
        .packages=c("TTR")) %dopar% { 
    # TODO: Move following line out of worker. One (or 5, 10, 
    # 20, ... but not all) object at a time should be loaded 
    # by master and exported to worker "just in time" 
    load(file=paste0(name, ".rda")) 

    return(last(SMA(get(name)[, 1], 10))) 
} 

closeCluster(cl) 

print(result) 

Quindi mi chiedo come sarei stato in grado di caricare ogni (o più come 5, 10, 20, 100, ... ma non tutti contemporaneamente) xts oggetto dal disco "just in time" prima di essere inviato/necessario/esportato ai worker. Non riesco a caricare l'oggetto in worker (in base al nome e alla cartella in cui è archiviato su disco) poiché i worker possono essere su macchine remote senza accesso alla cartella in cui gli oggetti sono archiviati su disco. Quindi devo essere in grado di leggerli/caricarli "just in time" nel processo principale ...

Sto usando doMPI e doRedis come back-end parallelo. doMPI sembra più efficiente in termini di memoria ma più lento di doRedis (su 100 oggetti).

Quindi mi piacerebbe capire quale sia una "strategia"/"modello" appropriata per affrontare questo problema.

+0

Sei sicuro che il tuo codice sia associato alla CPU? Sembra che potrebbe essere I/O-bound, il che significa che l'esecuzione su più CPU non sarà di aiuto (soprattutto perché i lavoratori non possono accedere al disco). –

+0

@JoshuaUlrich Beh, in realtà ho in realtà intenzione di eseguire qualcosa come 50 volte quantstrat :: applyIndicators e quantstrat :: applySignals. Quindi le cose I/O hanno sicuramente un impatto, ma mi piacerebbe davvero essere in grado di farlo in parallelo. Questo è ad esempio per test retrospettivi su 10 anni di storia. Nello scenario "tempo reale" ho bisogno di caricare solo la cronologia degli oggetti fino al periodo di ricerca più ampio da un particolare momento di runtime. Sono davvero interessato se il caricamento può essere spostato dai lavoratori in master e gli oggetti vengono quindi forniti uno alla volta (o in gruppi gestibili) ai lavoratori. – Samo

risposta

3

Oltre a utilizzare doMPI o doRedis, è necessario scrivere una funzione che restituisca un iteratore appropriato. Ci sono una serie di esempi nella mia vignetta "Scrittura Iteratori personalizzate" dal pacchetto iteratori che dovrebbero essere utili, ma ecco un rapido tentativo di una tale funzione:

ixts <- function(xtsnames) { 
    it <- iter(xtsnames) 

    nextEl <- function() { 
    xtsname <- nextElem(it) # throws "StopIteration" 
    load(file=paste0(xtsname, ".rda")) 
    get(xtsname) 
    } 

    obj <- list(nextElem=nextEl) 
    class(obj) <- c('ixts', 'abstractiter', 'iter') 
    obj 
} 

questo è davvero semplice dal momento che è fondamentalmente un wrapper un iteratore sulla variabile "nomi". La vignetta utilizza questa tecnica per molti degli esempi.

è possibile utilizzare "ixts" con foreach come segue:

result <- foreach(xts=ixts(names), 
        .combine=cbind, 
        .multicombine=TRUE, 
        .inorder=FALSE, 
        .packages=c("TTR")) %dopar% { 
    last(SMA(xts[, 1], 10)) 
} 

Anche se questo iteratore funziona con qualsiasi backend foreach, non tutti i backend lo chiameranno just-in-time. doMPI e doRedis lo faranno, ma doParallel e doMC ottengono tutti i valori dall'iteratore in primo piano perché clusterApplyLB e mclapply richiedono che i valori siano tutti in un elenco. doMPI e doRedis sono stati progettati per funzionare con gli iteratori al fine di aumentare la memoria.

Problemi correlati