2016-07-13 30 views
13

Devo eliminare gli accenti dai caratteri in spagnolo e altre lingue da diversi set di dati.Qual è il modo migliore per rimuovere gli accenti con i datafram di scintilla di apache in PySpark?

Ho già eseguito una funzione basata sul codice fornito in questo post che rimuove gli accenti speciali. Il problema è che la funzione è lenta perché usa uno UDF. Mi chiedo se posso migliorare le prestazioni della mia funzione per ottenere risultati in meno tempo, perché ciò è positivo per i piccoli dataframes ma non per quelli grandi.

Grazie in anticipo.

Ecco il codice, si sarà in grado di farlo funzionare come si presenta:

# Importing sql types 
from pyspark.sql.types import StringType, IntegerType, StructType, StructField 
from pyspark.sql.functions import udf, col 
import unicodedata 

# Building a simple dataframe: 
schema = StructType([StructField("city", StringType(), True), 
        StructField("country", StringType(), True), 
        StructField("population", IntegerType(), True)]) 

countries = ['Venezuela', '[email protected]', 'Brazil', 'Spain'] 
cities = ['Maracaibó', 'New York', ' São Paulo ', '~Madrid'] 
population = [37800000,19795791,12341418,6489162] 

# Dataframe: 
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema) 

df.show() 

class Test(): 
    def __init__(self, df): 
     self.df = df 

    def clearAccents(self, columns): 
     """This function deletes accents in strings column dataFrames, 
     it does not eliminate main characters, but only deletes special tildes. 

     :param columns String or a list of column names. 
     """ 
     # Filters all string columns in dataFrame 
     validCols = [c for (c, t) in filter(lambda t: t[1] == 'string', self.df.dtypes)] 

     # If None or [] is provided with column parameter: 
     if (columns == "*"): columns = validCols[:] 

     # Receives a string as an argument 
     def remove_accents(inputStr): 
      # first, normalize strings: 
      nfkdStr = unicodedata.normalize('NFKD', inputStr) 
      # Keep chars that has no other char combined (i.e. accents chars) 
      withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)]) 
      return withOutAccents 

     function = udf(lambda x: remove_accents(x) if x != None else x, StringType()) 
     exprs = [function(col(c)).alias(c) if (c in columns) and (c in validCols) else c for c in self.df.columns] 
     self.df = self.df.select(*exprs) 

foo = Test(df) 
foo.clearAccents(columns="*") 
foo.df.show() 
+0

Dopo aver lavorato in questa funzione, il resto del codice può essere trovato in [libreria di trasformazione] (https://github.com/mood-agency/optimus) –

risposta

4

Un possibile miglioramento consiste nella creazione di una Transformer personalizzata, che gestirà la normalizzazione Unicode e il corrispondente wrapper Python. Dovrebbe ridurre il sovraccarico generale dei dati trasmessi tra JVM e Python e non richiede alcuna modifica in Spark stessa o accesso all'API privata.

Sul lato JVM avrete bisogno di un trasformatore di simile a questo:

package net.zero323.spark.ml.feature 

import java.text.Normalizer 
import org.apache.spark.ml.UnaryTransformer 
import org.apache.spark.ml.param._ 
import org.apache.spark.ml.util._ 
import org.apache.spark.sql.types.{DataType, StringType} 

class UnicodeNormalizer (override val uid: String) 
    extends UnaryTransformer[String, String, UnicodeNormalizer] { 

    def this() = this(Identifiable.randomUID("unicode_normalizer")) 

    private val forms = Map(
    "NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD, 
    "NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD 
) 

    val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)", 
    ParamValidators.inArray(forms.keys.toArray)) 

    def setN(value: String): this.type = set(form, value) 

    def getForm: String = $(form) 

    setDefault(form -> "NFKD") 

    override protected def createTransformFunc: String => String = { 
    val normalizerForm = forms($(form)) 
    (s: String) => Normalizer.normalize(s, normalizerForm) 
    } 

    override protected def validateInputType(inputType: DataType): Unit = { 
    require(inputType == StringType, s"Input type must be string type but got $inputType.") 
    } 

    override protected def outputDataType: DataType = StringType 
} 

corrispondente definizione di compilazione:

name := "unicode-normalization" 

version := "1.0" 

crossScalaVersions := Seq("2.10.6", "2.11.8") 

organization := "net.zero323" 

val sparkVersion = "1.6.2" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion, 
    "org.apache.spark" %% "spark-sql" % sparkVersion, 
    "org.apache.spark" %% "spark-mllib" % sparkVersion 
) 

Sul lato Python avrete bisogno di un involucro simile a questo. Se si utilizza 2.0+ keyword_only è stato spostato nel primo modulo pyspark. pacchetto Scala

from pyspark.ml.param.shared import * 
from pyspark.ml.util import keyword_only 
from pyspark.ml.wrapper import JavaTransformer 

class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol): 

    @keyword_only 
    def __init__(self, form="NFKD", inputCol=None, outputCol=None): 
     super(UnicodeNormalizer, self).__init__() 
     self._java_obj = self._new_java_obj(
      "net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid) 
     self.form = Param(self, "form", 
      "unicode form (one of NFC, NFD, NFKC, NFKD)") 
     kwargs = self.__init__._input_kwargs 
     self.setParams(**kwargs) 

    @keyword_only 
    def setParams(self, form="NFKD", inputCol=None, outputCol=None): 
     kwargs = self.setParams._input_kwargs 
     return self._set(**kwargs) 

    def setForm(self, value): 
     return self._set(form=value) 

    def getForm(self): 
     return self.getOrDefault(self.form) 

Corporatura:

sbt +package 

comprendono quando si avvia shell o inviare. Ad esempio per Spark build con Scala 2.10:

bin/pyspark --jars path-to/target/scala-2.10/unicode-normalization_2.10-1.0.jar \ 
--driver-class-path path-to/target/scala-2.10/unicode-normalization_2.10-1.0.jar 

e si dovrebbe essere pronti per andare. Tutto ciò che resta è un po 'di magia regexp:

from pyspark.sql.functions import regexp_replace 

normalizer = UnicodeNormalizer(form="NFKD", 
    inputCol="text", outputCol="text_normalized") 

df = sc.parallelize([ 
    (1, "Maracaibó"), (2, "New York"), 
    (3, " São Paulo "), (4, "~Madrid") 
]).toDF(["id", "text"]) 

(normalizer 
    .transform(df) 
    .select(regexp_replace("text_normalized", "\p{M}", "")) 
    .show()) 

## +--------------------------------------+ 
## |regexp_replace(text_normalized,\p{M},)| 
## +--------------------------------------+ 
## |        Maracaibo| 
## |        New York| 
## |       Sao Paulo | 
## |        ~Madrid| 
## +--------------------------------------+ 

Si prega di notare che questo segue le stesse convenzioni costruito nei trasformatori di testo e non è nullo sicura.Puoi facilmente correggerlo controllando per null in createTransformFunc.

1

Questa soluzione è solo Python, ma è utile solo se il numero di possibili accenti è basso (ad esempio, una sola lingua come lo spagnolo) e le sostituzioni dei caratteri sono specificate manualmente.

Sembra che non ci sia alcun modo incorporato per fare ciò che è stato richiesto direttamente senza UDF, tuttavia è possibile concatenare molte chiamate regexp_replace per sostituire ogni carattere accentato possibile. Ho testato le prestazioni di questa soluzione e si scopre che funziona solo più velocemente se si dispone di un set di accenti molto limitato da sostituire. In tal caso può essere più veloce di UDF perché è ottimizzato al di fuori di Python.

from pyspark.sql.functions import col, regexp_replace 

accent_replacements_spanish = [ 
    (u'á', 'a'), (u'Á', 'A'), 
    (u'é', 'e'), (u'É', 'E'), 
    (u'í', 'i'), (u'Í', 'I'), 
    (u'ò', 'o'), (u'Ó', 'O'), 
    (u'ú|ü', 'u'), (u'Ú|Ű', 'U'), 
    (u'ñ', 'n') 
    # see http://stackoverflow.com/a/18123985/3810493 for other characters 

    # this will convert other non ASCII characters to a question mark: 
    ('[^\x00-\x7F]', '?') 
] 

def remove_accents(column): 
    r = col(column) 
    for a, b in accent_replacements_spanish: 
     r = regexp_replace(r, a, b) 
    return r.alias('remove_accents(' + column + ')') 

df = sqlContext.createDataFrame([['Olà'], ['Olé']], ['str']) 
df.select(remove_accents('str')).show() 

non ho confrontato le prestazioni con le altre risposte e questa funzione non è così generale, ma è almeno opportuno prendere in considerazione, perché non c'è bisogno di aggiungere Scala o Java per il processo di compilazione.

+0

Ho rimosso i miei commenti ma potresti aggiungere una nota che non è equivalente al codice in que bustione? – zero323

+0

Ho aggiornato la mia risposta. Dovrebbe essere chiaro ora che non è esattamente equivalente e che ha solo un uso limitato. Grazie per il feedback. –

4

Un altro modo per fare con python Unicode Database:

import unicodedata 
import sys 

from pyspark.sql.functions import translate, regexp_replace 

def make_trans(): 
    matching_string = "" 
    replace_string = "" 

    for i in range(ord(" "), sys.maxunicode): 
     name = unicodedata.name(chr(i), "") 
     if "WITH" in name: 
      try: 
       base = unicodedata.lookup(name.split(" WITH")[0]) 
       matching_string += chr(i) 
       replace_string += base 
      except KeyError: 
       pass 

    return matching_string, replace_string 

def clean_text(c): 
    matching_string, replace_string = make_trans() 
    return translate(
     regexp_replace(c, "\p{M}", ""), 
     matching_string, replace_string 
    ).alias(c) 

Così ora cerchiamo di testarlo:

df = sc.parallelize([ 
(1, "Maracaibó"), (2, "New York"), 
(3, " São Paulo "), (4, "~Madrid"), 
(5, "São Paulo"), (6, "Maracaibó") 
]).toDF(["id", "text"]) 

df.select(clean_text("text")).show() 
## +---------------+ 
## |   text| 
## +---------------+ 
## |  Maracaibo| 
## |  New York| 
## | Sao Paulo | 
## |  ~Madrid| 
## |  Sao Paulo| 
## |  Maracaibo| 
## +---------------+ 

riconoscono @ zero323

Problemi correlati