2015-10-20 9 views
8

Ho il seguente codice:Spark perdendo println() su stdout

val blueCount = sc.accumulator[Long](0) 
val output = input.map { data => 
    for (value <- data.getValues()) { 
    if (record.getEnum() == DataEnum.BLUE) { 
     blueCount += 1 
     println("Enum = BLUE : " + value.toString() 
    } 
    } 
    data 
}.persist(StorageLevel.MEMORY_ONLY_SER) 

output.saveAsTextFile("myOutput") 

Poi il blueCount non è pari a zero, ma non ho ottenuto println) uscita (! Mi sto perdendo qualcosa qui? Grazie!

risposta

3

sono stato in grado di lavorare intorno di fare UtilityFunction:

object PrintUtiltity { 
    def print(data:String) = { 
     println(data) 
    } 
} 
+5

Perché funziona? – angelcervera

+0

Poiché Spark pensa di chiamare una funzione di utilità anziché chiamare la funzione di stampa. Apparentemente, Spark non ha controllato (e non poteva praticamente) controllare ogni riga nella sua funzione di utilità. – Edamame

+1

Quello che stai facendo è creare un'istanza di un oggetto nel tuo programma driver. Non conterei su questo comportamento senza un chiaro modello di cosa sta succedendo esattamente. Aspettatevi che il comportamento cambi in modo imprevedibile con qualsiasi modifica al vostro programma o come invocate l'oggetto PrintUtility. Se vuoi raccogliere registri, usa metodi standard per farlo, non inventare meccanismi casuali che non capisci. La tua spiegazione del perché funziona è pericolosamente sbagliata: non c'è il divieto di fare ciò che hai fatto; non esiste un controllore del codice per essere sicuro di non imbrogliare: tutto il comportamento segue la progettazione del sistema – David

13

Questa è una domanda concettuale ...

Immaginate di avere un grande gruppo, composto da molti lavoratori diciamo n lavoratori e quei lavoratori memorizzare una partizione di un RDD o DataFrame, immaginate di avviare un'attività tutta map che i dati, e dentro quella map avete una dichiarazione print, prima di tutto:

  • Dove sarà che i dati da stampare fuori?
  • Quale nodo ha priorità e quale partizione?
  • Se tutti i nodi sono in esecuzione in parallelo, chi verrà stampato per primo?
  • Come verrà creata questa coda di stampa?

Queste sono troppe domande, così i progettisti/manutentori di apache-spark deciso logicamente a cadere alcun supporto per print istruzioni all'interno qualsiasi map-reduce operazione (questo includono accumulators e anche broadcast variabili).

Questo ha anche senso perché Spark è una lingua progettata per per dataset molto grandi. Mentre la stampa può essere utile per test e debug, non si vorrebbe stampare ogni riga di un DataFrame o RDD perché sono costruiti per avere milioni o miliardi di righe! Quindi, perché affrontare queste domande complicate quando non si vorrebbe nemmeno stampare in primo luogo?

Per dimostrare questo si può eseguire questo codice Scala, ad esempio:

// Let's create a simple RDD 
val rdd = sc.parallelize(1 to 10000) 

def printStuff(x:Int):Int = { 
    println(x) 
    x + 1 
} 

// It doesn't print anything! because of a logic design limitation! 
rdd.map(printStuff) 

// But you can print the RDD by doing the following: 
rdd.take(10).foreach(println) 
+6

Credo opere println bene: va semplicemente allo stdout/stderr del computer su cui è in esecuzione un programma di esecuzione di scintille. Quindi, a meno che tu non abbia un modo per catturare ciò che è in quei log, non lo vedrai mai. Se stai usando il filato c'è comunque un comando per stamparlo tutto per te. – David

+0

Mentre l'argomentazione è valida, Spark non esegue alcun tipo di analisi statica per rilasciare il codice. L'output non va al driver 'STDOUT' come spiegato da @David –