Sono un principiante su Spark (La mia versione è 1.6.0) e ora sto cercando di risolvere il problema indicato di seguito:Come per eseguire l'operazione "Ricerca" sulla dataframes Spark dato più condizioni
Supponiamo che ci sono due file di origine:
- Il primo (A in breve) è uno grande che contiene colonne denominate A1, B1, C1 e altre 80 colonne. All'interno ci sono record di 230K.
- Il secondo (B in breve) è una piccola tabella di ricerca che contiene le colonne denominate A2, B2, C2 e D2. Ci sono 250 record all'interno.
Ora abbiamo bisogno di inserire una nuova colonna in A, data la logica di seguito:
- Prima ricerca A1, B1 e C1 in B (colonne corrispondenti sono A2, B2 e C2), se successo , restituisce D2 come valore della nuova colonna aggiunta. Se non è stato trovato nulla ...
- Quindi cercare A1, B1 in B. In caso di esito positivo, restituire D2. Se non abbiamo trovato nulla ...
- impostare il valore predefinito "NA"
ho già letto nei file e convertito in frame di dati. Per la prima situazione, ho ottenuto il risultato per esterno sinistro unendoli insieme. Ma non riesco a trovare una buona strada nel prossimo passo.
Il mio attuale tentativo è quello di creare un nuovo frame dati unendo A e B utilizzando una condizione meno rigorosa. Tuttavia non ho idea di come aggiornare il frame dei dati corrente dall'altro. O c'è un altro modo più intuitivo ed efficace per affrontare l'intero problema?
Grazie per tutte le risposte.
----------------------------- Aggiornamento su 20160309 -------------- ------------------
Finalmente accettata la risposta di @mlk. Ancora fantastico grazie a @ zero323 per i suoi fantastici commenti su UDF contro join, la generazione del codice Tungsteno è davvero un altro problema che stiamo affrontando ora. Ma dal momento che abbiamo bisogno di fare decine di ricerca e in media 4 condizioni per ogni ricerca, la prima soluzione è più adatto ...
La soluzione finale è in qualche modo assomiglia a seguito frammento:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
(aStr: String, bStr: String, cStr: String) =>
tableBroadcast.value.find {
case TableType(a, b, c, _) =>
(a == aStr && b == bStr && c == cStr) ||
(a == aStr && b == bStr)
}.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
Questa probabilmente è la strada da percorrere. Ho fornito anche una soluzione alternativa con 'joins'. – zero323
Grazie mlk. Se la tabella di ricerca è grande (500K * 50), è ancora buono trasmetterlo? –
E la mia altra domanda è, supponiamo di dover eseguire 30 ricerche su colonne diverse e scrivere 50 UDF, la prestazione ne risentirebbe? –