Can I extend the default one?
Non proprio. L'impostazione predefinita Tokenizer
è una sottoclasse di pyspark.ml.wrapper.JavaTransformer
e, come altri transfromeri e stimatori da pyspark.ml.feature
, delega l'elaborazione effettiva alla sua controparte Scala. Poiché si desidera utilizzare Python, è necessario estendere direttamente pyspark.ml.pipeline.Transformer
.
import nltk
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
class NLTKWordPunctTokenizer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None, stopwords=None):
super(NLTKWordPunctTokenizer, self).__init__()
self.stopwords = Param(self, "stopwords", "")
self._setDefault(stopwords=set())
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, stopwords=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def setStopwords(self, value):
self._paramMap[self.stopwords] = value
return self
def getStopwords(self):
return self.getOrDefault(self.stopwords)
def _transform(self, dataset):
stopwords = self.getStopwords()
def f(s):
tokens = nltk.tokenize.wordpunct_tokenize(s)
return [t for t in tokens if t.lower() not in stopwords]
t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
Esempio di utilizzo (dati ML - Features):
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = NLTKWordPunctTokenizer(
inputCol="sentence", outputCol="words",
stopwords=set(nltk.corpus.stopwords.words('english')))
tokenizer.transform(sentenceDataFrame).show()
Per Python personalizzato Estimator
vedere How to Roll a Custom Estimator in PySpark mllib
⚠ Questa risposta dipende API interna ed è compatibile con Spark 2.0.3, 2.1. 1, 2.2.0 o successivo (SPARK-19348). Per il codice compatibile con le versioni precedenti di Spark, vedere revision 8.
Provato a implementare questo come un passaggio in una pipeline e ottenuto il seguente errore 'AttributeError: 'L'oggetto' NLTKWordPunctTokenizer 'non ha attributo' _to_java''. Ciò si verifica quando provo a scrivere la Pipeline su disco (ho funzionato bene prima di aggiungere questo passaggio). Suppongo che ci siano altri metodi di classe che devono essere definiti? –
@EvanZamir Sì, sia 'Pipeline' che' PipelineModel' si aspettano che ogni fase implementa '_to_java' e possa essere caricato utilizzando l'oggetto Java corrispondente. Sfortunatamente questo funziona presupponendo che tu abbia effettivamente 'JavaWrapper'. Ho questo vago ricordo che ho visto alcuni JIRA legati a questo, ma potrei sbagliarmi. – zero323
È questo @ zero323? https://issues.apache.org/jira/browse/SPARK-17025 A quanto pare il problema è stato letteralmente creato ieri. –