2015-05-29 9 views
5

C'è un tavolo con due colonne books e readers di questi libri, dove books e readers sono libri e lettori ID, rispettivamente:Spark: Conteggio co-occorrenza - Algoritmo per un efficiente filtraggio multi-pass delle collezioni enormi

books readers 
1:  1  30 
2:  2  10 
3:  3  20 
4:  1  20 
5:  1  10 
6:  2  30 

Il record book = 1, reader = 30 indica che il libro con id = 1 è stato letto dall'utente con id = 30. Per ogni coppia libro ho bisogno di contare il numero di lettori che leggono sia di questi libri, con questo algoritmo:

for each book 
    for each reader of the book 
    for each other_book in books of the reader 
     increment common_reader_count ((book, other_book), cnt) 

Il vantaggio di utilizzare questo algoritmo è che richiede un piccolo numero di operazioni rispetto al conteggio di tutte le combinazioni di libri per due.

Per implementare l'algoritmo di cui sopra organizzo questi dati in due gruppi: 1) con chiave per libro, un RDD contenente lettori di ciascun libro e 2) con chiave per lettore, un RDD contenente libri letti da ciascun lettore, come nel seguente programma:

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 

object Small { 

    case class Book(book: Int, reader: Int) 
    case class BookPair(book1: Int, book2: Int, cnt:Int) 

    val recs = Array(
    Book(book = 1, reader = 30), 
    Book(book = 2, reader = 10), 
    Book(book = 3, reader = 20), 
    Book(book = 1, reader = 20), 
    Book(book = 1, reader = 10), 
    Book(book = 2, reader = 30)) 

    def main(args: Array[String]) { 
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 
    // set up environment 
    val conf = new SparkConf() 
     .setAppName("Test") 
     .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 
    val data = sc.parallelize(recs) 

    val bookMap = data.map(r => (r.book, r)) 
    val bookGrps = bookMap.groupByKey 

    val readerMap = data.map(r => (r.reader, r)) 
    val readerGrps = readerMap.groupByKey 

    // *** Calculate book pairs 
    // Iterate book groups 
    val allBookPairs = bookGrps.map(bookGrp => bookGrp match { 
     case (book, recIter) => 
     // Iterate user groups 
     recIter.toList.map(rec => { 
      // Find readers for this book 
      val aReader = rec.reader 
      // Find all books (including this one) that this reader read 
      val allReaderBooks = readerGrps.filter(readerGrp => readerGrp match { 
      case (reader2, recIter2) => reader2 == aReader 
      }) 
      val bookPairs = allReaderBooks.map(readerTuple => readerTuple match { 
      case (reader3, recIter3) => recIter3.toList.map(rec => ((book, rec.book), 1)) 
      }) 
      bookPairs 
     }) 

    }) 
    val x = allBookPairs.flatMap(identity) 
    val y = x.map(rdd => rdd.first) 
    val z = y.flatMap(identity) 
    val p = z.reduceByKey((cnt1, cnt2) => cnt1 + cnt2) 
    val result = p.map(bookPair => bookPair match { 
     case((book1, book2),cnt) => BookPair(book1, book2, cnt) 
    }) 

    val resultCsv = result.map(pair => resultToStr(pair)) 
    resultCsv.saveAsTextFile("./result.csv") 
    } 

    def resultToStr(pair: BookPair): String = { 
    val sep = "|" 
    pair.book1 + sep + pair.book2 + sep + pair.cnt 
    } 
} 

Questo implemntation in fatto deriva il diverso, l'algoritmo inefficiente!:

for each book 
    find each reader of the book scanning all readers every time! 
    for each other_book in books of the reader 
     increment common_reader_count ((book, other_book), cnt) 

che contraddice l'obiettivo principale della discusso sopra algoritmo perché invece di ridurre, aumenta il numero di operazioni. Per trovare libri degli utenti è necessario filtrare tutti gli utenti per ogni libro. Quindi numero di operazioni ~ N * M dove N - numero di utenti e M - numero di libri.

Domande:

  1. C'è un modo per implementare l'algoritmo originale in Spark senza filtrare collezione completa reader per ogni libro?
  2. Eventuali altri algoritmi per calcolare i conteggi delle coppie di libri in modo efficiente?
  3. Inoltre, quando effettivamente eseguo questo codice ottengo filter exception quale motivo non riesco a capire. Qualche idea?

Si prega, si veda log eccezioni di seguito:

15/05/29 18:24:05 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0) 
15/05/29 18:24:05 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
15/05/29 18:24:09 INFO slf4j.Slf4jLogger: Slf4jLogger started 
15/05/29 18:24:10 INFO Remoting: Starting remoting 
15/05/29 18:24:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:38910] 
15/05/29 18:24:10 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:38910] 
15/05/29 18:24:12 ERROR executor.Executor: Exception in task 0.0 in stage 6.0 (TID 4) 
java.lang.NullPointerException 
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282) 
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:58) 
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:54) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at Small$$anonfun$4.apply(Small.scala:54) 
    at Small$$anonfun$4.apply(Small.scala:51) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) 
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) 
    at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:744) 

Aggiornamento:

Questo codice:

val df = sc.parallelize(Array((1,30),(2,10),(3,20),(1,10)(2,30))).toDF("books","readers") 
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"), 
$"readers" === $"r_readers" and $"books" < $"r_books" 
) 
.groupBy($"books", $"r_books") 
.agg($"books", $"r_books", count($"readers")) 

dà il seguente risultato:

books r_books COUNT(readers) 
1  2  2  

Quindi COUNT qui è un numero di volte in cui due libri (qui 1 e 2) sono stati letti insieme (numero di coppie).

risposta

7

Questo genere di cose è molto più facile se si converte l'originale RDD ad un dataframe:

val df = sc.parallelize(
    Array((1,30),(2,10),(3,20),(1,10), (2,30)) 
).toDF("books","readers") 

Una volta fatto questo, basta fare un self-join sulla dataframe di creare delle coppie del libro, quindi contano quanti lettori hanno letto ogni coppia libro:

val results = df.join(
    df.select($"books" as "r_books", $"readers" as "r_readers"), 
    $"readers" === $"r_readers" and $"books" < $"r_books" 
).groupBy(
    $"books", $"r_books" 
).agg(
    $"books", $"r_books", count($"readers") 
) 

quanto per ulteriori spiegazioni su che unire, notare che io sto unendo df su se stesso - un self-join: df.join(df.select(...), ...). Quello che stai cercando di fare è cucire insieme il libro n. 1 - $"books" - con un secondo libro - $"r_books", dallo stesso lettore - $"reader" === $"r_reader". Ma se ti unissi solo con $"reader" === $"r_reader", avresti riavuto lo stesso libro su se stesso. Invece, io uso $"books" < $"r_books" per garantire che l'ordine nelle coppie di libri sia sempre (<lower_id>,<higher_id>).

Una volta eseguito il join, si ottiene un DataFrame con una riga per ogni lettore di ogni coppia di libri. Le funzioni groupBy e agg eseguono il conteggio effettivo del numero di lettori per associazione di libri.

Per inciso, se un lettore legge lo stesso libro due volte, credo che si finirebbe con un doppio conteggio, che può o non può essere quello che vuoi. Se questo non è quello che vuoi, basta cambiare count($"readers") a countDistinct($"readers").

Se volete saperne di più sulle funzioni aggcount() e countDistinct() e un mucchio di altre cose divertenti, controlla la scaladoc per org.apache.spark.sql.functions

+0

Molte grazie! Sono nuovo di dataframes e udfs. Potresti spiegare i criteri di adesione qui? Verbale .. – zork

+1

Dimenticare l'UDF - Sono un idiota. Ho appena trovato un modo per sbarazzarsi dell'UDF e del distinto –

+0

Grande! ... Cercando di raggiungere i frame di dati ed eseguire questo codice, non li abbiamo mai usati prima. – zork

Problemi correlati