2014-12-17 12 views
11

Sto usando la scintilla e il mongo. Sono in grado di connettermi a mongo usando il seguente codice:Come eseguire una query su mongo usando la scintilla?

val sc = new SparkContext("local", "Hello from scala") 

val config = new Configuration() 
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName") 
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject]) 

sopra il codice mi dà tutti i documenti dalla collezione.

Ora voglio applicare alcune condizioni sulla query.

Per questo ho usato

config.set("mongo.input.query","{customerId: 'some mongo id'}") 

Questo ha avuto una sola condizione alla volta. Voglio aggiungere una condizione se 'uso'> 30

1) Come posso aggiungere più condizioni alla query mongo (incluso maggiore di e minore di) usando spark e mongo ??

Anche io voglio scorrere su ogni documento di risultato di query usando scala ??

2) Come faccio a scorrere il risultato usando scala ??

+0

qualche lato-bandiera qui: il formato di Hadoop per Mongo ha problema di gestione delle risorse che mantiene le connessioni aperte. È stata una combinazione esplosiva quando l'abbiamo mescolata con Spark. * Evita * – maasg

+0

@maasg C'è qualche altra opzione per la connessione di mongo con scintilla ?? – Vishwas

risposta

10

Hi si può provare questo:

c'è un progetto che si integra MongoDB con Spark

https://github.com/Stratio/deep-spark/tree/develop

1) fare un git clone

2) andare dentro profonda scintilla, quindi al genitore profondo

3) mvn install

4) aprire la shell con questa opzione:

./spark-shell --jars YOUR_PATH/deep-core-0.7.0-SNAPSHOT.jar, YOUR_PATH/deep-commons-0.7.0-SNAPSHOT. vaso, YOUR_PATH/deep-mongodb-0.7.0-SNAPSHOT.jar, YOUR_PATH/mongo-java-driver-2.12.4-sources.jar

ricordiamo di sovrascrivere "YOUR_PATH" con il reale percorso

5) Eseguire un semplice esempio nella shell scintilla:

import com.stratio.deep.mongodb.config.MongoDeepJobConfig 
import com.stratio.deep.mongodb.extractor.MongoNativeDBObjectExtractor 
import com.stratio.deep.core.context.DeepSparkContext 
import com.mongodb.DBObject 
import org.apache.spark.rdd.RDD 
import com.mongodb.QueryBuilder 
import com.mongodb.BasicDBObject 

val host = "localhost:27017" 


val database = "test" 

val inputCollection = "input"; 

val deepContext: DeepSparkContext = new DeepSparkContext(sc) 

val inputConfigEntity: MongoDeepJobConfig[DBObject] = new MongoDeepJobConfig[DBObject](classOf[DBObject]) 


val query: QueryBuilder = QueryBuilder.start(); 

query.and("number").greaterThan(27).lessThan(30); 


inputConfigEntity.host(host).database(database).collection(inputCollection).filterQuery(query).setExtractorImplClass(classOf[MongoNativeDBObjectExtractor]) 


val inputRDDEntity: RDD[DBObject] = deepContext.createRDD(inputConfigEntity) 

la cosa migliore di questo è che è possibile utilizzare un QueryBui lder oggetto per rendere le vostre domande

Inoltre è possibile passare un DBOBJECT come questo:

{ "number" : { "$gt" : 27 , "$lt" : 30}} 

Se si desidera iterare è possibile utilizzare il metodo yourRDD.collect(). Inoltre puoi usare yourRDD.foreach, ma devi fornire una funzione.

C'è un altro modo per aggiungere barattoli alla scintilla. È possibile modificare spark-env.sh e mettere questa riga alla fine:

CONFDIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 
for jar in $(ls $CONFDIR/../lib/*.jar); do 
    SPARK_CLASSPATH=$SPARK_CLASSPATH:${jar} 
done 

All'interno della cartella lib si mettono le librerie e questo è tutto.

Disclaimer: Attualmente sto lavorando su Stratio

+0

Questo progetto è stato dichiarato obsoleto e non funziona più. Questa risposta dovrebbe essere rimossa. – rjurney

2

1) Al fine di aggiungere condizioni alla query semplicemente aggiungere nel dizionario fornito con 'mongo.input.query':

config.set("mongo.input.query","{customerId: 'some mongo id', usage: {'$gt': 30}") 

Per capire meglio come funzionano le query si riferiscono a:

http://docs.mongodb.org/manual/tutorial/query-documents/

http://docs.mongodb.org/getting-started/python/query/

2) Per scorrere il risultato si consiglia di dare un'occhiata a suscitare metodo RDD 'raccogliere', maggiori informazioni a questo link, basta guardare per il metodo di raccolta:

http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD

Problemi correlati