2015-01-12 12 views
7

Devo memorizzare circa 250 valori numerici al secondo, per client, ovvero circa 900k di numeri all'ora. Probabilmente non sarà una registrazione di un'intera giornata (probabilmente tra 5-10 ore al giorno), ma dividerò i miei dati in base all'id del client e al giorno in cui viene effettuata la lettura. La lunghezza massima della fila arriva a circa 22-23 M, che è ancora gestibile. Neverteless, il mio schema assomiglia a questo:Cluster Cassandra con prestazioni errate nell'inserto e stabilità dell'inserto

CREATE TABLE measurement (
    clientid text, 
    date text, 
    event_time timestamp, 
    value int, 
    PRIMARY KEY ((clientid,date), event_time) 
); 

lo spazio delle chiavi ha un fattore di replica di 2, solo per il test, il boccino è GossipingPropertyFileSnitch e NetworkTopologyStrategy. So che il fattore di replicazione 3 è più standard di produzione.

Successivamente ho creato un piccolo cluster sui server delle società, tre macchine virtualizzate bare metal con 2 CPU x 2 core e 16 GB di RAM e molto spazio. Sono in gigabit LAN con loro. Il cluster è operativo, basato sul nodetool.

Ecco il codice che sto usando per testare la mia messa a punto:

 Cluster cluster = Cluster.builder() 
       .addContactPoint("192.168.1.100") 
       .addContactPoint("192.168.1.102") 
       .build(); 
     Session session = cluster.connect(); 
     DateTime time = DateTime.now(); 
     BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true); 

    try { 

     ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts 

     String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)"; 
     PreparedStatement preparedStatement = session.prepare(insertQuery); 
     BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also 

     //generating the entries 
     for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements 
      time = time.plus(4); //4ms between each entry 
      BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important 
      batch.add(bound); 

      //The batch statement must have 65535 statements at most 
      if (batch.size() >= 65534) { 
       queryQueue.put(batch); 
       batch = new BatchStatement(); 
      } 
     } 
     queryQueue.put(batch); //the last batch, perhaps shorter than 65535 

     //storing the data 
     System.out.println("Starting storing"); 
     while (!queryQueue.isEmpty()) { 
      pool.execute(() -> { 
       try { 

        long threadId = Thread.currentThread().getId(); 
        System.out.println("Started: " + threadId); 
        BatchStatement statement = queryQueue.take(); 
        long start2 = System.currentTimeMillis(); 
        session.execute(statement); 
        System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2)); 
       } catch (Exception ex) { 
        System.out.println(ex.toString()); 
       } 
      }); 

     } 
     pool.shutdown(); 
     pool.awaitTermination(120,TimeUnit.SECONDS); 


    } catch (Exception ex) { 
     System.out.println(ex.toString()); 
    } finally { 
     session.close(); 
     cluster.close(); 
    } 

mi si avvicinò con il codice leggendo i messaggi qui e su altri blog e siti web. Come ho capito, è importante che il client utilizzi più thread, ecco perché l'ho fatto. Ho anche provato a utilizzare le operazioni asincrone.

Il risultato finale è questo, indipendentemente dall'approccio che utilizzo, un batch viene eseguito in 5-6 secondi, anche se potrebbe richiedere fino a 10. Prende lo stesso se inserisco solo un batch (quindi, solo ~ 65k colonne) o se uso un'applicazione a thread singolo stupida. Onestamente, mi aspettavo un po 'di più. Tanto più che ottengo prestazioni più o meno simili sul mio portatile con un'istanza locale.

Il secondo problema, forse più importante, sono le eccezioni che sto affrontando in modo imprevedibile. Questi due:

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout durante richiesta Write a consistenza UN (1 repliche erano tenuti ma solo 0 riconosciuto la scrittura)

e

com.datastax.driver.core.exceptions.NoHostAvailableException: Tutti host (s) provato per la query non riuscita (provato: /192.168.1.102:9042 (com.datastax.dri ver.core.TransportException: [/192.168.1.102:9042] La connessione è stata chiusa), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Connessione ha stato chiuso), /192.168.1.101:9042 (com.datastax.driver.core.TransportException: [/192.168.1.101:9042] connessione è stata chiusa))

In linea di fondo, sono io fare qualcosa di sbagliato? Devo riorganizzare il modo in cui carico i dati o modificare lo schema. Ho provato a ridurre la lunghezza della riga (quindi ho 12 ore di righe) ma ciò non ha fatto una grande differenza.

============================== Aggiornamento:

ero rude e dimenticato di incollare un esempio del codice che ho usato dopo la risposta alla domanda. Funziona abbastanza bene, tuttavia sto continuando la mia ricerca con KairosDB e il trasferimento binario con Astyanax.Sembra che io possa ottenere prestazioni molto migliori con loro rispetto a CQL, sebbene KairosDB possa avere alcuni problemi quando è in sovraccarico (ma ci sto lavorando) e Astyanax è un po 'prolisso da usare per i miei gusti. Tuttavia, ecco il codice, forse sto sbagliando da qualche parte.

Il numero di slot del semaforo non ha alcun effetto sulle prestazioni quando supera i 5000, è quasi costante.

String insertQuery = "insert into keyspace.measurement  (userid,time_by_hour,time,value) values (?, ?, ?, ?)"; 
     PreparedStatement preparedStatement =  session.prepare(insertQuery); 
     Semaphore semaphore = new Semaphore(15000); 

    System.out.println("Starting " + Thread.currentThread().getId()); 
    DateTime time = DateTime.parse("2015-01-05T12:00:00"); 
    //generating the entries 
    long start = System.currentTimeMillis(); 

    for (int i = 0; i < 900000; i++) { 

     BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important 
     semaphore.acquire(); 
     ResultSetFuture resultSetFuture = session.executeAsync(statement); 
     Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { 
      @Override 
      public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) { 

       semaphore.release(); 
      } 

      @Override 
      public void onFailure(Throwable throwable) { 
       System.out.println("Error: " + throwable.toString()); 
       semaphore.release(); 
      } 
     }); 
     time = time.plus(4); //4ms between each entry 
    } 

risposta

4

Quali sono i risultati utilizzando il batch non loggato? Sei sicuro di voler utilizzare le dichiarazioni batch? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

+0

Non drasticamente diverso. Sono abbastanza sicuro di voler usare batch, perché ho già lavorato su cose simili in altri progetti, e le dichiarazioni uno a uno di solito erano più lente. Non ha senso che sia più veloce, comunque. –

+1

Spod ha ragione. I batch in Cassandra non sono un'ottimizzazione delle prestazioni. I batch registrati devono essere utilizzati solo se è necessaria l'atomicità e c'è una penalizzazione delle prestazioni per ottenere le scritture atomiche. Persino i batch non loggati sono spesso più lenti delle semplici query asincrone, in pratica impongono un coordinamento non necessario (a meno che non si stia eseguendo il batching in base alla chiave e si utilizzi il token consapevole, forse sei qui). Tendo a raccomandare comunque le scritture dirette asincrone. Ecco un altro articolo per confermare questa visione: http: //lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/ – phact

+1

Per quanto riguarda i timeout, questo accadrà una volta che inizi a travolgere i tuoi c * nodi con troppe scritture. È facile farlo con query asincrone mentre il programma sta generando le scritture alla velocità massima possibile. Dopo aver rimosso i tuoi lotti (in particolare la registrazione) dovresti vedere un miglioramento, ma potresti dover limitare le tue scritture o addirittura aumentare i tuoi timeout se le tue SLA lo consentono. – phact

Problemi correlati