6

Sono un principiante su Spark (La mia versione è 1.6.0) e ora sto cercando di risolvere il problema indicato di seguito:Come per eseguire l'operazione "Ricerca" sulla dataframes Spark dato più condizioni

Supponiamo che ci sono due file di origine:

  • Il primo (A in breve) è uno grande che contiene colonne denominate A1, B1, C1 e altre 80 colonne. All'interno ci sono record di 230K.
  • Il secondo (B in breve) è una piccola tabella di ricerca che contiene le colonne denominate A2, B2, C2 e D2. Ci sono 250 record all'interno.

Ora abbiamo bisogno di inserire una nuova colonna in A, data la logica di seguito:

  • Prima ricerca A1, B1 e C1 in B (colonne corrispondenti sono A2, B2 e C2), se successo , restituisce D2 come valore della nuova colonna aggiunta. Se non è stato trovato nulla ...
  • Quindi cercare A1, B1 in B. In caso di esito positivo, restituire D2. Se non abbiamo trovato nulla ...
  • impostare il valore predefinito "NA"

ho già letto nei file e convertito in frame di dati. Per la prima situazione, ho ottenuto il risultato per esterno sinistro unendoli insieme. Ma non riesco a trovare una buona strada nel prossimo passo.

Il mio attuale tentativo è quello di creare un nuovo frame dati unendo A e B utilizzando una condizione meno rigorosa. Tuttavia non ho idea di come aggiornare il frame dei dati corrente dall'altro. O c'è un altro modo più intuitivo ed efficace per affrontare l'intero problema?

Grazie per tutte le risposte.

----------------------------- Aggiornamento su 20160309 -------------- ------------------

Finalmente accettata la risposta di @mlk. Ancora fantastico grazie a @ zero323 per i suoi fantastici commenti su UDF contro join, la generazione del codice Tungsteno è davvero un altro problema che stiamo affrontando ora. Ma dal momento che abbiamo bisogno di fare decine di ricerca e in media 4 condizioni per ogni ricerca, la prima soluzione è più adatto ...

La soluzione finale è in qualche modo assomiglia a seguito frammento:

``` 
import sqlContext.implicits._ 
import com.github.marklister.collections.io._ 

case class TableType(A: String, B: String, C: String, D: String) 
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("...")) 
val lkupD = udf { 
    (aStr: String, bStr: String, cStr: String) => 
    tableBroadcast.value.find { 
     case TableType(a, b, c, _) => 
     (a == aStr && b == bStr && c == cStr) || 
     (a == aStr && b == bStr) 
    }.getOrElse(TableType("", "", "", "NA")).D 
} 
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C")) 
``` 

risposta

4

Come è B piccolo penso che il modo migliore per farlo sarebbe una variabile di trasmissione e una funzione definita dall'utente.

// However you get the data... 
case class BType(A2: Int, B2: Int, C2 : Int, D2 : String) 
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200")) 

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER") 


// Broadcast B so all nodes have a copy of it. 
val Bbradcast = sc.broadcast(B) 

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {(a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 } 

// Use the UDF in a select 
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show 
+1

Questa probabilmente è la strada da percorrere. Ho fornito anche una soluzione alternativa con 'joins'. – zero323

+0

Grazie mlk. Se la tabella di ricerca è grande (500K * 50), è ancora buono trasmetterlo? –

+0

E la mia altra domanda è, supponiamo di dover eseguire 30 ricerche su colonne diverse e scrivere 50 UDF, la prestazione ne risentirebbe? –

2

Solo per riferimento una soluzione senza FSU:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1")) 
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2")) 

// Match A, B and C 
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1") 
// Match A and B mismatch C 
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2") 

val toDrop = b1.columns ++ b2.columns 

toDrop.foldLeft(a 
    .join(b1, expr1, "leftouter") 
    .join(b2, expr2, "leftouter") 
    // If there is match on A, B, C then D_1 should be not NULL 
    // otherwise we fall-back to D_2 
    .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c)) 

Questo presuppone che è al massimo una corrispondenza di ciascuna categoria (tutti e tre colonne, o le prime due) o duplicare righe nell'output sono desiderato.

UDF vs JOIN:

Ci sono diversi fattori da considerare e non c'è una risposta semplice qui:

Contro:

  • trasmissione joins richiedere il passaggio di dati due volte al nodi operai. Per ora le tabelle broadcasted non vengono memorizzate nella cache (SPARK-3863) ed è improbabile che cambino nel prossimo futuro (Risoluzione: Più tardi).
  • join l'operazione viene applicata due volte anche se è presente una corrispondenza completa.

Pro:

  • join e coalesce sono trasparenti alla ottimizzatore mentre UDF non sono.
  • che opera direttamente con le espressioni SQL può trarre vantaggio da tutte le ottimizzazioni di tungsteno inclusa la generazione del codice mentre l'UDF non può.
Problemi correlati