Sto cercando disperatamente di collegare Cassandra a pyspark ma non riesco a farlo funzionare. Sono abbastanza nuovo da accendere e cassandra, quindi potrei perdere qualcosa di piuttosto semplice.Collegamento/integrazione di Cassandra con Spark (pyspark)
Sono un po 'confuso da tutte le diverse spiegazioni online, tuttavia da quello che ho capito, il modo più semplice sarebbe utilizzare "pacchetti Spark"? (http://spark-packages.org/package/TargetHolding/pyspark-cassandra)
Così, con il seguente comando:
./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py
Ho ragione di capire che non ho bisogno di scaricare tutti i pacchetti se uso pacchetti scintilla come descritto sopra il mio?
nel myPysparkFile.py ho provato le due versioni seguenti, nessuno dei quali ho lavorato per me:
versione 1, che ho avuto da pagina 14 in http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra:
"SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark_cassandra import CassandraSparkContext,Row
conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")
sc = CassandraSparkContext(conf=conf)
rdd = sc.cassandraTable("test", "words")
come un errore ottengo:
ImportError: No module named pyspark_cassandra
versione 2 (che si ispira da: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md):
012.351."SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="kv", keyspace="test")\
.load().show()
che mi dà il seguente errore:
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at org.apache.spark.sql.cassandra.DefaultSource$.<init>(DefaultSource.scala:138)
at org.apache.spark.sql.cassandra.DefaultSource$.<clinit>(DefaultSource.scala)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Io davvero non so cosa sto facendo male e gradirebbe qualsiasi aiuto. Inoltre, qual è la differenza tra l'utilizzo della versione 1 o della versione 2? Ci sono vantaggi o svantaggi tra le due versioni?
Inoltre, qualsiasi ulteriore riferimento su come integrare e utilizzare al meglio la scintilla con cassandra sarebbe molto apprezzato.
Btw, Cassandra è in esecuzione sul mio pc con le configurazioni di base sulla porta 7000.
Grazie.
qual è la versione scintilla – Abhi