2014-06-17 14 views
6

Sto utilizzando l'SDK AWS per Java e sto utilizzando il client SQL asincronizzato per le richieste batch in modo da ridurre i costi.Come posso forzare AmazonSQSBufferedAsyncClient a svuotare i messaggi?

Quando la mia applicazione si spegne, voglio assicurarmi che nessun messaggio sia in attesa nel buffer, ma non c'è il metodo .flush() che posso vedere sul client.

AmazonSQSBufferedAsyncClient.shutdown() svuota i messaggi quando viene chiamato? Ho guardato lo source code e non è chiaro. Il metodo chiama shutdown() su ogni QueueBuffer che ha, ma dentro QueueBuffer.shutdown() dice

public void shutdown() { 
    //send buffer does not require shutdown, only 
    //shut down receive buffer 
    receiveBuffer.shutdown(); 
} 

Inoltre, la documentazione per .shutdown() dice:

Chiude questo oggetto cliente, liberando tutte le risorse che potrebbe essere tenuto aperto. Questo è un metodo facoltativo e non è necessario chiamare il numero per chiamarlo, ma è possibile se si desidera rilasciare esplicitamente qualsiasi risorsa aperta . Una volta che un client è stato arrestato, non dovrebbe essere utilizzato per fare altre richieste.

Per questa applicazione, è necessario assicurarsi che nessun messaggio venga perso durante il buffering. Devo gestirlo manualmente usando il normale AmazonSQSClient invece del buffering/async?

+1

ho trovato questo, che sembra essere la stessa domanda, ed è attualmente senza risposta: https://forums.aws.amazon.com/thread.jspa?threadID=122189 – Daenyth

risposta

1

Con la versione 1.11.37 dell'SDK, esiste un parametro di configurazione solo per questo scopo in QueueBufferConfig.

AmazonSQSBufferedAsyncClient bufClient = 
    new AmazonSQSBufferedAsyncClient(
     realAsyncClient, 
     new QueueBufferConfig() 
      .withFlushOnShutdown(true) 
    ); 
1

Esiste un metodo per chiamare esplicitamente lo svuotamento ma non è accessibile e in realtà non sono riuscito a trovare alcuna chiamata a tale metodo nel codice amazzonico. Sembra che manchi qualcosa.

Quando si chiama l'arresto sul client asincrono esegue il seguente codice:

public void shutdown() { 
    for(QueueBuffer buffer : buffers.values()) { 
     buffer.shutdown(); 
    } 
    realSQS.shutdown(); 
} 

E QueueBuffer # shutdown() si presenta così:

/** 
* Shuts down the queue buffer. Once this method has been called, the 
* queue buffer is not operational and all subsequent calls to it may fail 
* */ 
public void shutdown() { 
    //send buffer does not require shutdown, only 
    //shut down receive buffer 
    receiveBuffer.shutdown(); 
} 

Così sembra che essi non sono intenzionalmente chiamando a sendBuffer.shutdown() che è il metodo che svuota ogni messaggio nel buffer che non è ancora stato inviato.

Hai trovato un caso quando hai arrestato il client SQS e hai perso i messaggi? Sembra che siano consapevoli di ciò e questo caso non dovrebbe accadere, ma se si vuole essere sicuri di poter chiamare quel metodo con la riflessione che è davvero brutto, ma soddisferà le vostre esigenze.

AmazonSQSBufferedAsyncClient asyncSqsClient = <your initialization code of the client>; 
    Field buffersField = ReflectionUtils.findField(AmazonSQSBufferedAsyncClient.class, "buffers"); 
    ReflectionUtils.makeAccessible(buffersField); 
    LinkedHashMap<String, Object> buffers = (LinkedHashMap<String, Object>) ReflectionUtils.getField(buffersField, asyncSqsClient); 
    for (Object buffer : buffers.values()) { 
     Class<?> clazz = Class.forName("com.amazonaws.services.sqs.buffered.QueueBuffer"); 
     SendQueueBuffer sendQueueBuffer = (SendQueueBuffer) ReflectionUtils.getField(ReflectionUtils.findField(clazz, "sendBuffer"), buffer); 
     sendQueueBuffer.flush();//finally 
    } 

Qualcosa del genere dovrebbe funzionare, credo. Fammi sapere!

Problemi correlati