2015-10-29 22 views
5

Sto cercando disperatamente di collegare Cassandra a pyspark ma non riesco a farlo funzionare. Sono abbastanza nuovo da accendere e cassandra, quindi potrei perdere qualcosa di piuttosto semplice.Collegamento/integrazione di Cassandra con Spark (pyspark)

Sono un po 'confuso da tutte le diverse spiegazioni online, tuttavia da quello che ho capito, il modo più semplice sarebbe utilizzare "pacchetti Spark"? (http://spark-packages.org/package/TargetHolding/pyspark-cassandra)

Così, con il seguente comando:

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py 

Ho ragione di capire che non ho bisogno di scaricare tutti i pacchetti se uso pacchetti scintilla come descritto sopra il mio?

nel myPysparkFile.py ho provato le due versioni seguenti, nessuno dei quali ho lavorato per me:

versione 1, che ho avuto da pagina 14 in http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra:

"SparkCassandraTest.py" 
from pyspark import SparkContext, SparkConf 
from pyspark_cassandra import CassandraSparkContext,Row 

conf = SparkConf() 
conf.setMaster("local[4]") 
conf.setAppName("Spark Cassandra") 
conf.set("spark.cassandra.connection.host","http://127.0.0.1") 

sc = CassandraSparkContext(conf=conf) 

rdd = sc.cassandraTable("test", "words") 

come un errore ottengo:

ImportError: No module named pyspark_cassandra 

versione 2 (che si ispira da: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md):

012.351.
"SparkCassandraTest.py" 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf() 
conf.setMaster("local[4]") 
conf.setAppName("Spark Cassandra") 
conf.set("spark.cassandra.connection.host","http://127.0.0.1") 

sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

sqlContext.read\ 
    .format("org.apache.spark.sql.cassandra")\ 
    .options(table="kv", keyspace="test")\ 
    .load().show() 

che mi dà il seguente errore:

py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. 
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; 
    at org.apache.spark.sql.cassandra.DefaultSource$.<init>(DefaultSource.scala:138) 
    at org.apache.spark.sql.cassandra.DefaultSource$.<clinit>(DefaultSource.scala) 
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

Io davvero non so cosa sto facendo male e gradirebbe qualsiasi aiuto. Inoltre, qual è la differenza tra l'utilizzo della versione 1 o della versione 2? Ci sono vantaggi o svantaggi tra le due versioni?

Inoltre, qualsiasi ulteriore riferimento su come integrare e utilizzare al meglio la scintilla con cassandra sarebbe molto apprezzato.

Btw, Cassandra è in esecuzione sul mio pc con le configurazioni di base sulla porta 7000.

Grazie.

+0

qual è la versione scintilla – Abhi

risposta

8

Pyspark_Cassandra è un pacchetto diverso rispetto al connettore Cassandra. Include una versione di SCC ma non è intercambiabile. L'installazione di SCC non installa pyspark_cassandra. Questo pacchetto è necessario se si desidera utilizzare sc.cassandraTable() da pyspark.

L'installazione di SCC ti dà la possibilità di utilizzare i Dataframes in pyspark che è il modo più efficiente per gestire C * da pyspark. Questo sarebbe lo stesso del tuo esempio V2. In caso contrario, sembra che tu non abbia lanciato V2 usando il comando --package.

Il motivo può essere fallendo è che si specifica la versione Scala 2.11 della libreria qui

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py 

e sono molto probabilmente non esegue una 2.10 versione Scala di Spark (il download di default è 2.10)

+1

avevi ragione, utilizzando lo stesso comando con 2.10 risolto il problema. Molte grazie. – Kito

+0

Posso collegarmi a Cassandra 3.3 con lo stesso driver @RussS – Abhi

+0

lo sai che è possibile creare un tavolo dal connettore Cassandra? –

Problemi correlati