2014-12-12 18 views
20

Ho un piccolo programma Scala che gira bene su un nodo singolo. Tuttavia, lo sto ridimensionando in modo che venga eseguito su più nodi. Questo è il mio primo tentativo del genere. Sto solo cercando di capire come funzionano gli RDD in Spark, quindi questa domanda si basa sulla teoria e potrebbe non essere corretta al 100%.Spark RDD's - come funzionano

Diciamo che creare un RDD: val rdd = sc.textFile(file)

Ora, una volta l'ho fatto, vuol dire che il file in file è ora diviso tra i nodi (assumendo tutti i nodi hanno accesso al percorso del file) ?

In secondo luogo, voglio contare il numero di oggetti nel RDD (abbastanza semplice), però, ho bisogno di usare quel numero in un calcolo che deve essere applicata agli oggetti nel RDD - un esempio di pseudocodice:

rdd.map(x => x/rdd.size) 

Diciamo ci sono 100 oggetti rdd, e dicono che ci sono 10 nodi, quindi un conteggio di 10 oggetti per nodo (supponendo che questa sia il concetto funziona RDD), ora quando chiamo il metodo è ciascun nodo andando eseguire il calcolo con rdd.size come 10 o 100? Perché, nel complesso, l'RDD è la dimensione 100 ma localmente su ciascun nodo è solo 10. Devo eseguire una variabile di trasmissione prima di eseguire il calcolo? Questa domanda è collegata alla domanda seguente.

Infine, se eseguo una trasformazione su RDD, ad es. rdd.map(_.split("-")), e quindi volevo il nuovo size del RDD, è necessario eseguire un'azione sull'RDD, ad esempio count(), in modo che tutte le informazioni vengano inviate al nodo del driver?

+1

'Questa domanda è collegata alla domanda seguente.' -> ?? – gsamaras

+0

Penso che intendessi 'rdd.flatMap (_. Split (" - "))' – lovasoa

risposta

6

In genere, il file (o parti del file, se è troppo grande) viene replicato in N nodi nel cluster (per impostazione predefinita N = 3 su HDFS). Non è intenzione dividere ogni file tra tutti i nodi disponibili.

Tuttavia, per te (vale a dire il client) che lavora con il file utilizzando Spark dovrebbe essere trasparente - non dovresti notare alcuna differenza in rdd.size, indipendentemente dal numero di nodi suddivisi e/o replicati. Ci sono metodi (almeno in Hadoop) per scoprire su quali nodi (parti del) file possono essere localizzati al momento. Tuttavia, in casi semplici probabilmente non avrai bisogno di usare questa funzionalità.

UPDATE: un articolo che descrive RDD interni: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

+0

Grazie per la risposta. Quindi, per un calcolo come: 'rdd.filter (...). Map (x => x * rdd.count)' è il passo 'filter' eseguito su ciascun nodo prima che qualsiasi nodo possa eseguire il passo' map'? Perché, chiaramente, il passo 'map' dipende dal passo' filtro' già eseguito su ciascun nodo dato che 'map' contiene' rdd.count'. Grazie ancora. – monster

+0

Naturalmente, perché 'map' è costruito su' filter' (leggi sul concetto di "lineage" nell'articolo). – Ashalynd

+0

Grazie per l'informazione, è una buona lettura, tuttavia, ora mi sto chiedendo qual è lo scopo di una variabile Broadcast? Grazie ancora, apprezzato! – monster

18
val rdd = sc.textFile(file) 

vuol dire che il file è ora diviso tra i nodi?

Il file rimane dovunque fosse. Gli elementi della risultante RDD[String] sono le linee del file. Il RDD è partizionato per abbinare il partizionamento naturale del file system sottostante. Il numero di partizioni non dipende dal numero di nodi che hai.

È importante capire che quando viene eseguita questa riga, non legge i file. Il RDD è un oggetto pigro e farà qualcosa solo quando è necessario. Questo è ottimo perché evita l'utilizzo di memoria non necessario.

Ad esempio, se si scrive val errors = rdd.filter(line => line.startsWith("error")), non accade ancora nulla.Se si scrive quindi val errorCount = errors.count ora sarà necessario eseguire la sequenza di operazioni perché il risultato di count è un numero intero. Quello che ciascun core del worker (thread dell'esecutore) farà in parallelo, viene letto un file (o un pezzo di file), lo itera attraverso le sue linee e conta le righe che iniziano con "error". Buffering e GC a parte, solo una singola riga per core sarà in memoria alla volta. Ciò rende possibile lavorare con dati molto grandi senza utilizzare molta memoria.

voglio contare il numero di oggetti nel RDD, però, ho bisogno di usare quel numero in un calcolo che deve essere applicata agli oggetti nel RDD - un esempio di pseudocodice:

rdd.map(x => x/rdd.size) 

Non esiste il metodo rdd.size. C'è rdd.count, che conta il numero di elementi nel RDD. rdd.map(x => x/rdd.count) non funzionerà. Il codice tenterà di inviare la variabile rdd a tutti i lavoratori e non riuscirà con uno NotSerializableException. Che cosa si può fare è:

val count = rdd.count 
val normalized = rdd.map(x => x/count) 

Questo funziona, perché è un countInt e può essere serializzato.

Se eseguo una trasformazione su RDD, ad es. rdd.map(_.split("-")), e quindi volevo la nuova dimensione dell'RDD, devo eseguire un'azione sull'RDD, ad esempio count(), in modo che tutte le informazioni vengano inviate al nodo del driver?

map non modifica il numero di elementi. Non so cosa intendi per "taglia". Ma sì, è necessario eseguire un'azione, ad esempio count per ottenere qualcosa dall'RDD. Vedete, nessun lavoro viene eseguito fino a quando non si esegue un'azione. (Quando esegui count, solo il conteggio delle partizioni verrà reinviato al driver, ovviamente non "tutte le informazioni".)

+0

Ho creato un esempio [tag: Python] in base alla risposta in [la documentazione] (http://stackoverflow.com/documentation/apache-spark/833/introduction-to-apache-spark#t=20160817171702245426), se ti piace, puoi includerlo nella tua risposta! – gsamaras

+0

Questa dovrebbe essere la risposta accettata. Risponde a tutte le parti completamente e correttamente. – tejaskhot