2016-07-08 32 views
5

Considerare il seguente problema. Dato:Algebra dinamica dinamica su Spark

  1. Una collezione di set
  2. un'espressione booleana su di loro che si riceve in modo dinamico

Ritorna l'insieme risultante.

Spark dispone di algoritmi o librerie efficienti per risolvere questo problema generale?

Ecco un giocattolo esempio per illustrare il problema concettualmente:

val X = Set("A1", "A2", "A3", "A4") 
val Y = Set("A2", "A4", "A5") 

val collection = Set(X, Y) 
val expression = "X and Y" 

Sto cercando un modo per implementare un generale solve_expression in modo tale che, nell'esempio di cui sopra:

output = solve_expression(expression, collection) 

Risultati:

Set("A2", "A5") 

Sto lavorando con insiemi con milioni di elementi e espressioni booleane che vengono come stringhe. Ciò che è importante è che ogni atomo nell'espressione (ad esempio "X" e "Y" sopra) sono insiemi. Le espressioni e i set sono dinamici (le operazioni non possono essere hardcoded, poiché li riceviamo come input e non sappiamo cosa siano in anticipo).

Sono flessibile con la rappresentazione del problema. I set effettivi possono essere di tipo Set, ad es. tenendo stringhe (ad esempio "A1", "A2"), codificate come vettori binari, o qualsiasi altra cosa che renda questo compatibile con Spark.

fa Spark ha nessun librerie a analizzare e risolvere espressioni booleane generali sul set?

+0

Qual è il problema con l'uso di 'X.union (Y)'? O vuoi soluzioni out-of-heap? – ipoteka

+0

Perché il downvote? Ti dispiacerebbe elaborare? –

+0

Grazie @ipoteka Le espressioni sono dinamiche (non possono essere codificate in anticipo). –

risposta

2

OK. Supponiamo che tu voglia farlo in Spark. Inoltre, poiché si tratta di set giganti, supponiamo che non siano ancora in memoria, sono ciascuno in un file - con ogni riga in un file che denota una voce nel set.

Rappresenteremo i set con RDD s - Il modo standard di Spark di memorizzare i dati.

Utilizzando questo parser (adattato e fissato da here)

import scala.util.parsing.combinator.JavaTokenParsers 
import org.apache.spark.rdd.RDD 

case class Query[T](setMap: Map[String, RDD[T]]) extends JavaTokenParsers { 
    private lazy val expr: Parser[RDD[T]] 
    = term ~ rep("union" ~ term) ^^ { case f1 ~ fs => (f1 /: fs)(_ union _._2) } 
    private lazy val term: Parser[RDD[T]] 
    = fact ~ rep("inter" ~ fact) ^^ { case f1 ~ fs => (f1 /: fs)(_ intersection _._2) } 
    private lazy val fact: Parser[RDD[T]] 
    = vari | ("(" ~ expr ~ ")" ^^ { case "(" ~ exp ~ ")" => exp }) 
    private lazy val vari: Parser[RDD[T]] 
    = setMap.keysIterator.map(Parser(_)).reduceLeft(_ | _) ^^ setMap 

    def apply(expression: String) = this.parseAll(expr, expression).get.distinct 
} 

osservare quanto segue spark-shell interazione dopo aver incollato sopra nel guscio (ho omesso alcune delle risposte per brevità):

> val x = sc.textFile("X.txt").cache \\ contains "1\n2\n3\n4\n5" 
> val y = sc.textFile("Y.txt").cache \\ contains "3\n4\n5\n6\n7" 
> val z = sc.textFile("Z.txt").cache \\ contains "3\n9\n\10" 
> val sets = Map("x" -> x, "y" -> y, "z" -> z) 
> val query = Query[Int](sets) 

Ora, posso chiamare query con espressioni diverse. Notare che qui sto usando collect per attivare la valutazione (quindi vediamo cosa c'è dentro il set), ma se i set sono veramente grandi, normalmente manterrai il RDD così com'è (e lo salverai su disco).

> query("a union b").collect 
res: Array[Int] = Array("1", "2", "3", "4", "5", "6", "7") 
> query("a inter b").collect 
res: Array[Int] = Array("3", "4", "5") 
> query("a inter b union ((a inter b) union a)").collect 
res: Array[Int] = Array("1", "2", "3", "4", "5") 
> query("c union a inter b").collect 
res: Array[Int] = Array("3", "4", "5", "9", "10") 
> query("(c union a) inter b").collect 
res: Array[Int] = Array("3", "4", "5") 

Sebbene non ho disturbato per attuarlo, impostare differenza dovrebbe essere una sola linea aggiunta (molto simile a union e inter). Penso che i complementi di set siano una cattiva idea ... non hanno sempre senso (quale è il complemento del set vuoto, come lo si rappresenta?).

+0

Anche se sono d'accordo che si dovrebbe usare Parser Combinator, questo sembra strano. Genera trilioni di rdds, giusto? Ma a un certo livello dovresti passare al set di raccolta scala. – ipoteka

+1

@ipoteka Non sono sicuro di capire cosa intendi per "trilioni di rdds" - farà tanti RDD quanti hai gli operandi nella tua espressione, quindi nell'ordine di solo una manciata. Certo, OP dovrebbe usare scala collezioni se i suoi set sono abbastanza piccoli - ma la domanda chiede esplicitamente a Spark ... – Alec