2014-07-22 13 views
8

Ecco il mio esempio di codice:Apache Spark: distinto non funziona?

case class Person(name:String,tel:String){ 
     def equals(that:Person):Boolean = that.name == this.name && this.tel == that.tel} 

val persons = Array(Person("peter","139"),Person("peter","139"),Person("john","111")) 
sc.parallelize(persons).distinct.collect 

Esso restituisce

res34: Array[Person] = Array(Person(john,111), Person(peter,139), Person(peter,139)) 

Perché distinta non funziona voglio il risultato come Persona ("John", 111), Person ("Peter"? , 139)

+1

Mi chiedo se ha qualcosa a che fare con "Peter" non essere lo stesso di " Perter "? – kviiri

+0

Quanto tempo hai dedicato a esaminare il problema prima di pubblicarlo? Cosa ti aspetti da questo test? – maasg

+3

Segnalato per chiusura in quanto sembra essere causato da un semplice errore tipografico. – kviiri

risposta

0

Come altri hanno sottolineato, questo è un bug nella scintilla 1.0.0. La mia teoria da dove è venuta da è che se si guarda il diff di 1.0.0 per 9,0 vedete

- def repartition(numPartitions: Int): RDD[T] = { 
+ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { 

E se si esegue

case class A(i:Int) 
implicitly[Ordering[A]] 

si ottiene un errore

<console>:13: error: No implicit Ordering defined for A. 
       implicitly[Ordering[A]] 

Quindi penso che la soluzione è definire un ordinamento implicito per una classe il caso, purtroppo io non sono un esperto di scala, ma questo answer seems to do it correctly

+0

In spark-shell 1.0.1, la persona estesa non ha funzionato ... – edwardsbean

+0

@MrQuestion onestamente questa è solo un'ipotesi, non sono completamente sicuro di come la risoluzione implicita funzioni in scala – aaronman

2

Estendendosi ulteriormente dall'osservazione di @aaronman, è possibile ovviare a questo problema. Sul RDD, Ci sono due definizioni per distinct:

/** 
    * Return a new RDD containing the distinct elements in this RDD. 
    */ 
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = 
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) 

    /** 
    * Return a new RDD containing the distinct elements in this RDD. 
    */ 
    def distinct(): RDD[T] = distinct(partitions.size) 

È evidente dalla firma del primo distinct che ci deve essere un ordinamento implicito degli elementi e si presume nullo se assente, che è ciò che il la versione breve .distinct() fa.

Non c'è Ordinamento predefinito implicito per classi case, ma è facile da implementare uno:

case class Person(name:String,tel:String) extends Ordered[Person] { 
    def compare(that: Person): Int = this.name compare that.name 
} 

Ora, provando lo stesso esempio fornisce i risultati attesi (si noti che sto paragonando i nomi):

val ps5 = Array(Person("peter","138"),Person("peter","55"),Person("john","138")) 
sc.parallelize(ps5).distinct.collect 

res: Array[P5] = Array(P5(john,111), P5(peter,139)) 

Si noti che le classi di casi implementano già equals e hashCode, quindi l'impl sull'esempio fornito non è necessario e non è corretto. La firma corretta per equals è: equals(arg0: Any): Boolean - BTW, per prima cosa ho pensato che il problema riguardava la firma equals errata, che mi ha mandato a cercare nel percorso sbagliato.

+0

In spark-shell 1.0, seguo ogni passo della tua guida, ma restituisco 'res2: Array [Person] = Array (Person (john, 138), Person (peter, 138), Person (peter, 55)) ' – edwardsbean

+0

@MrQuestion quale versione esatta di Spark usi? Proverò di nuovo su qualunque cosa tu stia usando. – maasg

+0

Ho provato con la scintilla 1.0.0 e 1.0.1, nessuno di loro funziona – edwardsbean

1

Per me il problema era relativo all'uguaglianza degli oggetti, come menzionato da Martin Odersky in Programmazione in Scala (capitolo 30), anche se ho una classe normale (non una classe del caso). Per un test di uguaglianza corretto, è necessario ridefinire (sovrascrivere) hashCode() se si dispone di un oggetto personalizzato equals(). Inoltre è necessario disporre di un metodo canEqual() per il 100% di correttezza. Non ho esaminato i dettagli di implementazione di un RDD, ma poiché si tratta di una raccolta, probabilmente utilizza alcune variazioni complesse/parallele di un HashSet o altra struttura di dati basata su hash per confrontare gli oggetti e garantire la distinzione.

Dichiarare HashSet(), equals(), canEqual() e confronta() metodi risolto il problema:

override def hashCode(): Int = { 
    41 * (41 + name.hashCode) + tel.hashCode 
} 

override def equals(other: Any) = other match { 
    case other: Person => 
    (other canEqual this) && 
    (this.name == other.name) && (this.tel == other.tel) 
    case _ => 
    false 
} 

def canEqual(other: Any) = other.isInstanceOf[Person] 

def compare(that: Person): Int = { 
    this.name compare that.name 
}