2014-11-19 20 views
5

Sto valutando il connettore di cassandra spark e sto faticando a cercare di far funzionare una query di intervallo sulla chiave di partizione.Connettore Spark Cassandra - Intervallo interrogativo sulla chiave di partizione

In base alla documentazione del connettore, sembra possibile effettuare il filtraggio sul lato server sulla chiave di partizione usando l'uguaglianza o l'operatore IN, ma sfortunatamente la mia chiave di partizione è un timestamp, quindi non posso usarla.

Così ho provato ad utilizzare Spark SQL con la seguente query ('timestamp' è la chiave di partizione):

select * from datastore.data where timestamp >= '2013-01-01T00:00:00.000Z' and timestamp < '2013-12-31T00:00:00.000Z' 

Anche se il processo viene distribuito su 200 operazioni, la query non restituisce alcun dato.

Inoltre posso assicurare che ci sono dati da restituire poiché l'esecuzione della query su cqlsh (facendo la conversione appropriata utilizzando la funzione 'token') restituisce i dati.

Utilizzo la scintilla 1.1.0 in modalità standalone. Cassandra è 2.1.2 e la versione del connettore è "b1.1". Il driver Cassandra è il ramo "master" DataStax. grappolo Cassandra è sovrapposta grappolo scintilla con 3 server con fattore di replicazione del 1.

Here is the job's full log

Qualsiasi indizio chiunque?

Aggiornamento: Nel provare a fare filtro sul lato server sulla base della chiave di partizione (usando il metodo CassandraRDD.where) ottengo la seguente eccezione:

Exception in thread "main" java.lang.UnsupportedOperationException: Range predicates on partition key columns (here: timestamp) are not supported in where. Use filter instead. 

Ma purtroppo io non so che cosa " filtro "è ...

risposta

8

Hai diverse opzioni per ottenere la soluzione che stai cercando.

Il più potente sarebbe utilizzare gli indici di Lucene integrati con Cassandra di Stratio, che consente di effettuare la ricerca per qualsiasi campo indicizzato sul lato server. Il tuo tempo di scrittura sarà aumentato ma, d'altra parte, sarai in grado di interrogare qualsiasi intervallo di tempo. Potete trovare ulteriori informazioni sugli indici Lucene in Cassandra here. Questa versione estesa di Cassandra è completamente integrata nel deep-spark project in modo da poter sfruttare tutti i vantaggi degli indici Lucene in Cassandra. Ti consigliamo di utilizzare gli indici Lucene quando esegui una query con restrizioni che recupera un set di risultati medio-piccoli, se vuoi recuperare una grande parte del tuo set di dati, dovresti utilizzare la terza opzione sottostante.

Un altro approccio, a seconda di come funziona l'applicazione, potrebbe essere quello di troncare il campo di data e ora in modo da poterlo cercare utilizzando un operatore IN.Il problema è che, per quanto ne so, non è possibile utilizzare il connettore spark-cassandra per quello, si dovrebbe usare il driver diretto Cassandra che non è integrato con Spark, oppure si può dare un'occhiata al progetto deep-spark dove una nuova funzionalità che consente questo sta per essere rilasciata molto presto. La query sarebbe simile a questa:

select * from datastore.data where timestamp IN ('2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', ... , '2013-12-31') 

, ma, come ho detto prima, non so se si adatta alle vostre esigenze poiché potrebbe non essere in grado di troncare i dati e di gruppo che per data/tempo.

L'ultima opzione disponibile, ma la meno efficiente, consiste nel portare l'intero set di dati nel cluster di accensione e applicare un filtro sull'RDD.

Disclaimer: Lavoro per Stratio :-) Non esitate a contattarci se avete bisogno di aiuto.

Spero che aiuti!

+0

Ciao a tutti, sto attualmente sperimentando con l'operatore IN (è supportato), ma sono un po 'preoccupato di dover recuperare al massimo 1 anno di dati, il che porterà a un'espressione IN con 365 valori. Comunque, controllerò la stratio. Grazie! – tsouza

8

penso che l'errore CassandraRDD stia dicendo che la query che si sta tentando di fare non è consentita in Cassandra e si deve caricare tutta la tabella in un CassandraRDD e quindi effettuare un'operazione di filtro spark su questo CassandraRDD.

Così il vostro codice (in scala) dovrebbe qualcosa di simile:

val cassRDD= sc.cassandraTable("keyspace name", "table name").filter(row=> row.getDate("timestamp")>=DateFormat('2013-01-01T00:00:00.000Z')&&row.getDate("timestamp") < DateFormat('2013-12-31T00:00:00.000Z')) 

Se siete interessati a fare questo tipo di chiarimento si può prendere uno sguardo agli altri connettori Cassandra, come quello sviluppato da Stratio

+0

Salve, caricare tutti i dati è qualcosa che non voglio fare. Controllerò questo connettore, grazie per il suggerimento! – tsouza

Problemi correlati