2015-02-16 11 views
8

Ho una serie di record, che ho bisogno di:Spark: ordina i record nei gruppi?

1) Gruppo da 'Data', 'città' e 'tipo'

2) Ordina ogni gruppo da 'premio

Nella mia codice:

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

object Sort { 

    case class Record(name:String, day: String, kind: String, city: String, prize:Int) 

    val recs = Array (
     Record("n1", "d1", "k1", "c1", 10), 
     Record("n1", "d1", "k1", "c1", 9), 
     Record("n1", "d1", "k1", "c1", 8), 
     Record("n2", "d2", "k2", "c2", 1), 
     Record("n2", "d2", "k2", "c2", 2), 
     Record("n2", "d2", "k2", "c2", 3) 
    ) 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf() 
     .setAppName("Test") 
     .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 
    val rs = sc.parallelize(recs) 
    val rsGrp = rs.groupBy(r => (r.day, r.kind, r.city)).map(_._2) 
    val x = rsGrp.map{r => 
     val lst = r.toList 
     lst.map{e => (e.prize, e)} 
     } 
    x.sortByKey() 
    } 

} 

Quando cerco di ordinare gruppo ottengo un errore:

value sortByKey is not a member of org.apache.spark.rdd.RDD[List[(Int, 
Sort.Record)]] 

Cosa c'è di sbagliato? Come ordinare?

+0

Se si crea la parte relativa agli argomenti di ordinamento della chiave, sembra che si potrebbe anche essere in grado di utilizzare repartitionAndSortWithinPartitions() per ottenere la scala "tera-sort". Vedi http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions – steamer25

risposta

10

È necessario definire una chiave e quindi mapValue per ordinarli.

import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkContext._ 

    object Sort { 

    case class Record(name:String, day: String, kind: String, city: String, prize:Int) 

    // Define your data 

    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf() 
     .setAppName("Test") 
     .setMaster("local") 
     .set("spark.executor.memory", "2g") 
     val sc = new SparkContext(conf) 
     val rs = sc.parallelize(recs) 

     // Generate pair RDD neccesary to call groupByKey and group it 
     val key: RDD[((String, String, String), Iterable[Record])] = rs.keyBy(r => (r.day, r.city, r.kind)).groupByKey 

     // Once grouped you need to sort values of each Key 
     val values: RDD[((String, String, String), List[Record])] = key.mapValues(iter => iter.toList.sortBy(_.prize)) 

     // Print result 
     values.collect.foreach(println) 
    } 
} 
+1

Ho letto nella documentazione di spark che groupBy è costoso.C'è qualche altro metodo attraverso il quale possiamo ottenere questo in modo più efficiente. – Sohaib

+0

Non conosco altro metodo più efficiente per l'ordinamento dei valori. Group By Key di solito non viene utilizzato da solo perché ridurrai o altre operazioni con i valori. – gasparms

0

Sostituire map con flatMap

val x = rsGrp.map{r => 
    val lst = r.toList 
    lst.map{e => (e.prize, e)} 
    } 

questo vi darà un

org.apache.spark.rdd.RDD[(Int, Record)] = FlatMappedRDD[10] 

e quindi è possibile chiamare sortby (_._ 1) sul RDD sopra.

0

In alternativa alla soluzione @gasparms, penso che si possa provare un filtro seguito dall'operazione rdd.sortyBy. Filtrate ogni record che soddisfa i criteri chiave. Pre-requisito è che è necessario tenere traccia di tutte le chiavi (combinazioni di filtri). Puoi anche costruirlo mentre attraversi i record.

7

groupByKey è costoso, ha 2 implicazioni:

  1. maggioranza dei dati rimescolata nelle rimanenti N-1 partizioni in media.
  2. Tutti i record della stessa chiave vengono caricati in memoria nel singolo executor causando potenzialmente errori di memoria.

A seconda del vostro caso d'uso si hanno diverse opzioni migliori:

  1. Se non si preoccupano l'ordinamento, utilizzare reduceByKey o aggregateByKey.
  2. Se si desidera solo raggruppare e ordinare senza alcuna trasformazione, è preferibile utilizzare repartitionAndSortWithinPartitions (Spark 1.3.0+ http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions) ma prestare molta attenzione al partizionatore specificato e testarlo perché ora si basa su effetti collaterali che possono modificare il comportamento in un ambiente diverso. Vedi anche esempi in questo repository: https://github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala.
  3. Se si sta applicando una trasformazione o un'aggregazione non riducibile (piegatura o scansione) applicata al iterable dei record ordinati, controllare questa libreria: spark-sorted https://github.com/tresata/spark-sorted. Fornisce 3 API per rdds accoppiati: mapStreamByKey, foldLeftByKey e scanLeftByKey.
Problemi correlati