Prima di procedere: Questa attività è ancora un altro un'altra groupByKey
. Sebbene abbia più applicazioni legittime, è relativamente costoso, quindi assicurati di usarlo solo quando richiesto.
Non esattamente concisa o efficiente soluzione, ma è possibile utilizzare UserDefinedAggregateFunction
introdotto nel Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
Esempio di utilizzo:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
È inoltre possibile creare un wrapper Python come mostrato in Spark: How to map Python with Scala or Java User Defined Functions?
In pratica può essere fas per estrarre RDD, groupByKey
, mkString
e ricostruire DataFrame.
È possibile ottenere un effetto simile combinando collect_list
funzione (Spark> = 1.6.0) con concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
What If Voglio usarlo in SQL Come posso registrare questa UDF in Spark SQL? –
@MurtazaKanchwala [Esiste il metodo 'register' che accetta UDAFS] (https://github.com/apache/spark/blob/37c617e4f580482b59e1abbe3c0c27c7125cf605/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration .scala # L63-L69) quindi dovrebbe funzionare come UDF standard. – zero323
@ zero323 qualsiasi approccio per fare lo stesso in spark sql 1.4.1 –