2015-07-26 14 views

risposta

32

Prima di procedere: Questa attività è ancora un altro un'altra groupByKey. Sebbene abbia più applicazioni legittime, è relativamente costoso, quindi assicurati di usarlo solo quando richiesto.


Non esattamente concisa o efficiente soluzione, ma è possibile utilizzare UserDefinedAggregateFunction introdotto nel Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType().add("buff", ArrayType(StringType)) 
    def dataType = StringType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, ArrayBuffer.empty[String]) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) 
     buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0)) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)) 
    } 

    def evaluate(buffer: Row) = UTF8String.fromString(
     buffer.getSeq[String](0).mkString(",")) 
} 

Esempio di utilizzo:

val df = sc.parallelize(Seq(
    ("username1", "friend1"), 
    ("username1", "friend2"), 
    ("username2", "friend1"), 
    ("username2", "friend3") 
)).toDF("username", "friend") 

df.groupBy($"username").agg(GroupConcat($"friend")).show 

## +---------+---------------+ 
## | username|  friends| 
## +---------+---------------+ 
## |username1|friend1,friend2| 
## |username2|friend1,friend3| 
## +---------+---------------+ 

È inoltre possibile creare un wrapper Python come mostrato in Spark: How to map Python with Scala or Java User Defined Functions?

In pratica può essere fas per estrarre RDD, groupByKey, mkString e ricostruire DataFrame.

È possibile ottenere un effetto simile combinando collect_list funzione (Spark> = 1.6.0) con concat_ws:

import org.apache.spark.sql.functions.{collect_list, udf, lit} 

df.groupBy($"username") 
    .agg(concat_ws(",", collect_list($"friend")).alias("friends")) 
+0

What If Voglio usarlo in SQL Come posso registrare questa UDF in Spark SQL? –

+0

@MurtazaKanchwala [Esiste il metodo 'register' che accetta UDAFS] (https://github.com/apache/spark/blob/37c617e4f580482b59e1abbe3c0c27c7125cf605/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration .scala # L63-L69) quindi dovrebbe funzionare come UDF standard. – zero323

+0

@ zero323 qualsiasi approccio per fare lo stesso in spark sql 1.4.1 –

2

Un modo per farlo con pyspark < 1.6, che purtroppo non supporta dall'utente definita funzione di aggregazione:

byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y) 

e se si vuole fare un nuovo dataframe:

sqlContext.createDataFrame(byUsername, ["username", "friends"]) 

A partire dal 1.6, è possibile utilizzare collect_list e poi unirsi alla lista creata:

from pyspark.sql import functions as F 
from pyspark.sql.types import StringType 
join_ = F.udf(lambda x: ", ".join(x), StringType()) 
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends")) 
10

si può provare la funzione collect_list

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A 

Oppure si può regieter un qualcosa di UDF come

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b)) 

ed è possibile utilizzare questa funzione nella query

sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A") 
+1

L'ho provato ma funziona solo con HiveContext –

2

Lingua: Scala Spark versione: 1.5.2

Ho avuto lo stesso problema e ha anche cercato di risolverlo utilizzando udfs ma, purtroppo, questo ha portato a problemi più avanti nel codice a causa di incoerenze di tipo.Sono stato in grado di lavorare il mio modo per aggirare questo convertendo prima il DF a un RDD poi raggruppamento per e manipolare i dati nel modo desiderato e poi la conversione del RDD di nuovo ad un DF come segue:

val df = sc 
    .parallelize(Seq(
     ("username1", "friend1"), 
     ("username1", "friend2"), 
     ("username2", "friend1"), 
     ("username2", "friend3"))) 
    .toDF("username", "friend") 

+---------+-------+ 
| username| friend| 
+---------+-------+ 
|username1|friend1| 
|username1|friend2| 
|username2|friend1| 
|username2|friend3| 
+---------+-------+ 

val dfGRPD = df.map(Row => (Row(0), Row(1))) 
    .groupByKey() 
    .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))} 
    .toDF("username", "groupOfFriends") 

+---------+---------------+ 
| username| groupOfFriends| 
+---------+---------------+ 
|username1|friend2,friend1| 
|username2|friend3,friend1| 
+---------+---------------+