2015-01-05 6 views
10

Desidero utilizzare la libreria Spark mllib.recommendation per creare un sistema di raccomandazione per prototipi. Tuttavia, il formato dei dati utili che ho è qualche cosa del seguente formato:Come utilizzare mllib.recommendation se gli ID utente sono una stringa anziché interi contigui?

AB123XY45678 
CD234WZ12345 
EF345OOO1234 
GH456XY98765 
.... 

Se voglio utilizzare la libreria mllib.recommendation, secondo l'API della classe Rating, gli ID utente devono essere interi (deve anche essere contiguo?)

Sembra che debba essere eseguita una sorta di conversione tra gli ID utente reali e quelli numerici utilizzati da Spark. Ma come dovrei farlo?

risposta

10

Spark non richiede realmente un ID numerico, deve solo essere un valore univoco, ma per l'implementazione hanno scelto Int.

Si può fare di nuovo semplice e indietro di trasformazione per userId:

case class MyRating(userId: String, product: Int, rating: Double) 

    val data: RDD[MyRating] = ??? 

    // Assign unique Long id for each userId 
    val userIdToInt: RDD[(String, Long)] = 
    data.map(_.userId).distinct().zipWithUniqueId() 

    // Reverse mapping from generated id to original 
    val reverseMapping: RDD[(Long, String)] 
    userIdToInt map { case (l, r) => (r, l) } 

    // Depends on data size, maybe too big to keep 
    // on single machine 
    val map: Map[String, Int] = 
    userIdToInt.collect().toMap.mapValues(_.toInt) 

    // Transform to MLLib rating 
    val rating: RDD[Rating] = data.map { r => 
    Rating(userIdToInt.lookup(r.userId).head.toInt, r.product, r.rating) 
    // -- or 
    Rating(map(r.userId), r.product, r.rating) 
    } 

    // ... train model 

    // ... get back to MyRating userId from Int 

    val someUserId: String = reverseMapping.lookup(123).head 

Si può anche provare 'data.zipWithUniqueId()', ma non sono sicuro che in questo caso sarà .toInt trasformazione sicuro anche se la dimensione del set di dati è piccola.

+1

Questo non assegna un indice univoco a ciascuno dei rating, non a ciascuno degli utenti? Non penso che funzionerà se un utente ha più valutazioni. – PBJ

+0

@PBJ, sì, hai ragione, ho aggiornato il codice in risposta –

+1

L'approccio 'lookup' non è un codice Spark valido. Compilerà ma soffierà sul runtime. Puoi sistemarlo (rimuoverlo)? – zero323

1

La soluzione di cui sopra potrebbe non funzionare sempre come ho scoperto. Spark non è in grado di eseguire trasformazioni RDD da altri RDD. Uscita dell'errore:

org.apache.spark.SparkException: trasformazioni e azioni RDD può solo essere inserite codice hereinvoked dal conducente, non all'interno di altre trasformazioni; ad esempio, rdd1.map (x => rdd2.values.count() * x) non è valido perché la trasformazione dei valori e l'azione di conteggio non possono essere eseguite all'interno della trasformazione rdd1.map. Per ulteriori informazioni su , vedere SPARK-5063.

Come soluzione si potrebbe aderire al userIdToInt RDD con i dati originali RDD per memorizzare il rapporto tra userid e l'UniqueId. Successivamente, è possibile unire nuovamente i risultati RDD con questo RDD.

// Create RDD with the unique id included 
val dataWithUniqueUserId: RDD[(String, Int, Int, Double)] = 
    data.keyBy(_.userId).join(userIdToInt).map(r => 
     (r._2._1.userId, r._2._2.toInt, r._2._1.productId, 1)) 
3

è necessario eseguire StringIndexer attraverso i vostri ID utente per convertire la stringa di indice intero univoco. Non devono essere continui.

usiamo questo per il nostro motore di raccomandazione in oggetto https://www.aihello.com

df è (utente: String, prodotti, rating)

val stringindexer = new StringIndexer() 
     .setInputCol("user") 
     .setOutputCol("userNumber") 
    val modelc = stringindexer.fit(df) 
    val df = modelc.transform(df) 
1

@Ganesh Krishnan è giusto, StringIndexer risolvere questo problema.

from pyspark.ml.feature import OneHotEncoder, StringIndexer 
from pyspark.sql import SQLContext 
>>> spark = SQLContext(sc)                    
>>> df = spark.createDataFrame(
...  [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], 
...  ["id", "category"]) 

| id|category| 
+---+--------+ 
| 0|  a| 
| 1|  b| 
| 2|  c| 
| 3|  a| 
| 4|  a| 
| 5|  c| 
+---+--------+ 
>>> stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex") 
>>> model = stringIndexer.fit(df) 
>>> indexed = model.transform(df) 
>>> indexed.show() 
+---+--------+-------------+ 
| id|category|categoryIndex| 
+---+--------+-------------+ 
| 0|  a|   0.0| 
| 1|  b|   2.0| 
| 2|  c|   1.0| 
| 3|  a|   0.0| 
| 4|  a|   0.0| 
| 5|  c|   1.0| 
+---+--------+-------------+ 

>>> converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory") 
>>> converted = converter.transform(indexed) 
>>> converted.show() 
+---+--------+-------------+----------------+ 
| id|category|categoryIndex|originalCategory| 
+---+--------+-------------+----------------+ 
| 0|  a|   0.0|    a| 
| 1|  b|   2.0|    b| 
| 2|  c|   1.0|    c| 
| 3|  a|   0.0|    a| 
| 4|  a|   0.0|    a| 
| 5|  c|   1.0|    c| 
+---+--------+-------------+----------------+ 

>>> converted.select("id", "originalCategory").show() 
+---+----------------+ 
| id|originalCategory| 
+---+----------------+ 
| 0|    a| 
| 1|    b| 
| 2|    c| 
| 3|    a| 
| 4|    a| 
| 5|    c| 
+---+----------------+ 
Problemi correlati