2014-10-06 13 views
12

Desidero conservare solo i dipendenti che hanno un ID del reparto referenziato nella seconda tabella.Filtro basato su un altro RDD in Spark

Employee table 
LastName DepartmentID 
Rafferty 31 
Jones 33 
Heisenberg 33 
Robinson 34 
Smith 34 

Department table 
DepartmentID 
31 
33 

Ho provato il seguente codice che non funziona:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
employee.filter(lambda e: e[1] in department).collect() 

Py4JError: An error occurred while calling o344.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 

Tutte le idee? Sto usando Spark 1.1.0 con Python. Tuttavia, accetterei una risposta Scala o Python.

+0

Avete bisogno la vostra lista reparto di essere un RDD? – maasg

+0

Non proprio. L'elenco del dipartimento viene caricato da HDFS ma non è molto grande. – poiuytrez

risposta

19

In questo caso, ciò che si vorrebbe raggiungere è quello di filtrare in ogni partizione con i dati contenuti nella tabella dipartimento: questa sarebbe la soluzione di base:

val dept = deptRdd.collect.toSet 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)} 

Se i dati reparto è di grandi dimensioni, una variabile trasmissione migliorerà le prestazioni, fornendo i dati una volta per tutti i nodi, invece di dover serializzare con ogni compito

val deptBC = sc.broadcast(deptRdd.collect.toSet) 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)} 

Anche se l'utilizzo di join potrebbe funzionare, è una soluzione molto costosa in quanto richiede un shuffle distribuito dei dati (byKey) per raggiungere il join. Dato che il requisito è un semplice filtro, l'invio dei dati a ciascuna partizione (come mostrato sopra) fornirà prestazioni molto migliori.

+0

perdonami se ho torto qui, ma non partizionareBy() risolvere il shuffle distribuito per chiave? Non dire che risolverebbe il problema di unire più costosi, perché non credo che lo farei, stavo solo affermando che l'unione non richiede uno shuffle il 100% delle volte. – TurnipEntropy

10

Ho finalmente implementato una soluzione utilizzando un join. Ho dovuto aggiungere un valore 0 al reparto per evitare un'eccezione da Spark:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
# invert id and name to get id as the key 
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0])) 
# add a 0 value to avoid an exception 
department = sc.parallelize(department).map(lambda d: (d,0)) 

employee.join(department).map(lambda e: (e[1][0], e[0])).collect() 

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)] 
0

Filtering più valori in più colonne:

Nel caso in cui si sta tirando i dati da un database (alveare o di tipo SQL db per questo esempio) e la necessità di filtrare su più colonne, potrebbe essere solo più facile per caricare la tabella con il primo filtro, poi iterare i filtri attraverso i RDD (più piccole iterazioni è il modo incoraggiato di programmazione Spark):

{ 
    import org.apache.spark.sql.hive.HiveContext 
    val hc = new HiveContext(sc) 

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)") 
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20") 
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500") 

} 

Naturalmente è necessario conoscere i dati un po 'per filtrare i valori giusti, ma fa parte del processo di analisi.

0

per lo stesso exm di cui sopra, vorrei mantenere solo i dipendenti che contenevano o in un ID del reparto di riferimento nella seconda tabella. Ma deve non essere unirsi operazione, vorrei vederlo in "contenuto" o "in", voglio dire 33 è "in" 334 e 335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
Problemi correlati