2015-04-24 11 views
11

Primo: so che non è una buona idea eseguire una scansione completa in Cassandra, tuttavia, al momento, è ciò di cui ho bisogno.Problema nella scansione completa della tabella in cassandra

Quando ho iniziato a cercare di fare qualcosa di simile, leggo persone che dicono che non era possibile fare una scansione completa in Cassandra e lui non è stato fatto per fare questo tipo di cose.

Non soddisfatto, Continuo a guardare fino a quando ho trovato questo articolo: http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

assomigliare abbastanza ragionevole e ho dato una prova. Poiché eseguirò questa scansione completa solo una volta e l'ora e le prestazioni non sono un problema, ho scritto la query e l'ho messa in un lavoro semplice per cercare tutti i record che voglio. Da 2 miliardi di file di record, qualcosa come 1000 era il mio output atteso, tuttavia, avevo solo 100 record.

Il mio lavoro:

public void run() { 
    Cluster cluster = getConnection(); 
    Session session = cluster.connect("db"); 

    LOGGER.info("Starting ..."); 

    boolean run = true; 
    int print = 0; 

    while (run) { 
     if (maxTokenReached(actualToken)) { 
      LOGGER.info("Max Token Reached!"); 
      break; 
     } 
     ResultSet resultSet = session.execute(queryBuilder(actualToken)); 

     Iterator<Row> rows = resultSet.iterator(); 
     if (!rows.hasNext()){ 
      break; 
     } 

     List<String> rowIds = new ArrayList<String>(); 

     while (rows.hasNext()) { 
      Row row = rows.next(); 

      Long leadTime = row.getLong("my_column"); 
      if (myCondition(myCollumn)) { 
       String rowId = row.getString("key"); 
       rowIds.add(rowId); 
      } 

      if (!rows.hasNext()) { 
       Long token = row.getLong("token(rowid)"); 
       if (!rowIds.isEmpty()) { 
        LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds)); 
       } 
       actualToken = nextToken(token); 
      } 

     } 

    } 
    LOGGER.info("Done!"); 
    cluster.shutdown(); 
} 

public boolean maxTokenReached(Long actualToken){ 
    return actualToken >= maxToken; 
} 

public String queryBuilder(Long nextRange) { 
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
} 

public Long nextToken(Long token){ 
    return token + 1; 
} 

Fondamentalmente quello che faccio è cercare il minimo consentito e Token incrementale andare fino all'ultimo.

Non so, ma è come se il lavoro non avesse eseguito completamente la scansione completa o la mia query avesse avuto accesso solo a un nodo o qualcosa del genere. Non so se sto facendo qualcosa di sbagliato, o non è davvero possibile fare una scansione completa.

Oggi ho quasi 2 TB di dati, solo una tabella in un cluster di sette nodi.

Qualcuno è già stato in questa situazione o ha qualche raccomandazione?

+0

qual è lo schema delle chiavi per "mytable"? la query viene eseguita più volte (a causa del ciclo while) e l'ultima query potrebbe restituire 100 anziché 1000 – turbo

+0

Schema: http://pastebin.com/DyWAc1wa. E sì, la query è in esecuzione più volte e restituisce tutte le righe impostate sulla clausola LIMIT. – bcfurtado

risposta

5

È possibile eseguire una scansione completa della tabella in Cassandra, in effetti è abbastanza comune per cose come Spark. Tuttavia, non è in genere "veloce", quindi è scoraggiato a meno che tu non sappia perché lo stai facendo. Per le vostre domande effettive:

1) Se si utilizza CQL, si utilizza quasi certamente il partizionatore Murmur3, quindi il token minimo è -9223372036854775808 (e il token massimo è 9223372036854775808).

2) Si sta utilizzando session.execute(), che utilizzerà una consistenza predefinita di ONE, che potrebbe non restituire tutti i risultati nel cluster, specialmente se si sta scrivendo anche su ONE, che sospetto Potresti essere. Sollevare che a TUTTI, e utilizzare le istruzioni preparate per accelerare il parsing CQL:

public void run() { 
    Cluster cluster = getConnection(); 
    Session session = cluster.connect("db"); 
    LOGGER.info("Starting ..."); 
    actualToken = -9223372036854775808; 
    boolean run = true; 
    int print = 0; 

    while (run) { 
     if (maxTokenReached(actualToken)) { 
      LOGGER.info("Max Token Reached!"); 
      break; 
     } 
     SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken)); 
     stmt.setConsistencyLevel(ConsistencyLevel.ALL); 
     ResultSet resultSet = session.execute(stmt); 

     Iterator<Row> rows = resultSet.iterator(); 
     if (!rows.hasNext()){ 
      break; 
     } 

     List<String> rowIds = new ArrayList<String>(); 

     while (rows.hasNext()) { 
      Row row = rows.next(); 

      Long leadTime = row.getLong("my_column"); 
      if (myCondition(myCollumn)) { 
       String rowId = row.getString("key"); 
       rowIds.add(rowId); 
      } 

      if (!rows.hasNext()) { 
       Long token = row.getLong("token(rowid)"); 
       if (!rowIds.isEmpty()) { 
        LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds)); 
       } 
      actualToken = nextToken(token); 
      } 
     } 
     } 
    LOGGER.info("Done!"); 
    cluster.shutdown(); 
    } 

public boolean maxTokenReached(Long actualToken){ 
    return actualToken >= maxToken; 
} 

public String queryBuilder(Long nextRange) { 
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
} 

public Long nextToken(Long token) { 
    return token + 1; 
} 
+0

Ehi, Jeff, innanzitutto, grazie per il tuo aiuto! Sto usando CQL con Murmur e sono a conoscenza dei valori dei token max e min. Il lavoro oggi riceve una serie di token che cercheranno nelle file. In questo modo potrei lanciare quelle gamme in thread per accelerare questo. – bcfurtado

+0

In secondo luogo, ho implementato la tua congestione, ma non ho avuto molta differenza da ciò che avevo fatto, in realtà, il lavoro era tornato per ultimo la prima volta. Ma una volta ho notato che il carico della macchina era basso rispetto a prima, era più distribuito tra il cluster per tutto il tempo in cui il lavoro era in esecuzione. Prima che il carico diventi alto solo macchine specifiche e in tempi diversi. – bcfurtado

+0

Aumentare la consistenza dovrebbe causare più carico, perché sta interrogando più repliche per assicurarsi che non manchi alcun dato. Per essere chiari: quante righe ha restituito e quante righe ti aspetti che ritorni? –

1

E 'questo per una cosa comune che devi fare? O uno scenario di un caso? Sono d'accordo che questa non è una cosa consigliabile che si vuole fare su base regolare, ma ho anche avuto un problema in cui ho dovuto leggere tutte le righe da una ColumnFamily e mi sono basato su AllRowsReader recipe da Astyanax client. Sto vedendo che stai usando il driver Datastax CQL per connetterti al tuo cluster, ma se quello che stai cercando è qualcosa che ha dimostrato di funzionare, potresti non preoccuparti di gestire il problema usando la libreria Astyanax.

Nel mio caso ho letto tutte le chiavi di riga e poi ho avuto un altro lavoro per interagire con ColumnFamily con le chiavi che ho raccolto.

import com.netflix.astyanax.Keyspace; 
import com.netflix.astyanax.model.ColumnFamily; 
import com.netflix.astyanax.model.ConsistencyLevel; 
import com.netflix.astyanax.recipes.reader.AllRowsReader; 

import java.util.concurrent.CopyOnWriteArrayList; 

...   

private final Keyspace keyspace; 
private final ColumnFamily<String, byte[]> columnFamily; 

public List<String> getAllKeys() throws Exception { 

    final List<String> rowKeys = new CopyOnWriteArrayList<>(); 

    new AllRowsReader.Builder<>(keyspace, columnFamily).withColumnRange(null, null, false, 0) 
     .withPartitioner(null) // this will use keyspace's partitioner 
     .withConsistencyLevel(ConsistencyLevel.CL_ONE).forEachRow(row -> { 
     if (row == null) { 
      return true; 
     } 

     String key = row.getKey(); 

     rowKeys.add(key); 

     return true; 
    }).build().call(); 

    return rowKeys; 
} 

ci sono diverse opzioni di configurazione per eseguire questo in diversi thread e molte altre cose, come ho detto ho appena eseguito una volta nel mio codice e ha lavorato molto bene, sarei felice di aiutare se è stato eseguito in problemi cercando di farlo funzionare.

Spero che questo aiuti,

José Luis

1

Se avete regolarmente bisogno di fare scansione completa della tabella di una tabella di Cassandra, diciamo per analisi a Spark, quindi consiglio vivamente di prendere in considerazione la memorizzazione dei dati utilizzando una connessione dati modello che è ottimizzato per la lettura. Puoi dare un'occhiata a http://github.com/tuplejump/FiloDB per un esempio di installazione ottimizzata per la lettura su Cassandra.

Problemi correlati