2016-03-08 16 views
7

Sto usando RxJava 1.1 a comporre una sequenza osservabile da all'interno di un'applicazione primavera che è simile al seguente:transazione rollback in un'applicazione reattivo

@Transaction 
public Observable<Event> create(Event event) { 
    return Observable.just(event) 
      .flatMap(event -> { 
       //save event to db (blocking JPA operation) 
       Event event = eventRepository.save(event); 
       return Observable.just(event); 
      }) 
      //async REST call to service A 
      .flatMap(this::sendEventToServiceA) <---- may execute on different thread 
      //async REST call to service B 
      .flatMap(this::sendEventToServiceB) <---- may execute on different thread 
      .doOnError(throwable -> { 
       // ? rollback initally created transaction? 
      }) 
} 

Un evento raggiunge il livello di servizio della mia applicazione da qualche classe controller e questo si propaga attraverso una catena di operazioni costruite con la funzione flatMap() di RxJava. L'evento viene prima archiviato nel database (Spring Data) e le successive due richieste HTTP asincrone vengono eseguite una dopo l'altra utilizzando la libreria Spring's AsyncRestTemplate dietro le quinte.

In caso di errore/eccezione in qualsiasi punto della pipeline, vorrei poter eseguire il rollback della transazione del database in modo che l'evento NON sia memorizzato nel database. Ho trovato che non è facile da fare poiché in primavera il contesto della transazione è associato al particolare thread di esecuzione. Pertanto, se il codice raggiunge il callback doOnError su un thread diverso (AsyncRestTemplate utilizza il proprio AsyncTaskExecutor), non è possibile eseguire il rollback della transazione inizialmente creata.

Si può consigliare qualsiasi meccanismo per realizzare transazioni attraverso un'applicazione multi-thread composta da diverse operazioni asincrone scritte in questo modo?

Ho anche cercato di creare una transazione a livello di codice con:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); 

e poi inviare l'oggetto transactionStatus insieme con l'evento attraverso il gasdotto, ma ancora una volta, quando si verifica un errore e invoco "platformTransactionManager.rollback (stato); ", ottengo" la sincronizzazione delle transazioni non è attiva "poiché questo è in esecuzione su un thread diverso, immagino.

p.s. I metodi sendEventToServiceA/sendEventToServiceB simile al seguente:

public Observable<Event> sendEventToServiceA(event) { 
    .......... 
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
       "/serviceA/create?event_id=" + event.id, 
       HttpMethod.POST, requestEntity, String.class); 

    return ObservableUtil.toRxObservable(listenableFuture); 
} 

risposta

3

Un modo per farlo è quello di garantire che l'errore si osserva sullo stesso thread come il db salvare:

@Transaction 
public Observable<Event> create(Event event) { 

    Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 
    return Observable.just(event) 
      .flatMap(event -> { 
       //save event to db (blocking JPA operation) 
       Event event = eventRepository.save(event); 
       return Observable.just(event); 
      }) 
      .subscribeOn(scheduler) 
      //async REST call to service A 
      .flatMap(this::sendEventToServiceA) <---- may execute on different thread 
      //async REST call to service B 
      .flatMap(this::sendEventToServiceB) <---- may execute on different thread 
      .observeOn(scheduler) 
      .doOnError(throwable -> { 
       // ? rollback initally created transaction? 
      }) 
} 
+0

Grazie Dave! La tua soluzione con lo schedulatore sembra funzionare correttamente. – odybour

+0

Un problema minore che ho riscontrato è che il metodo flatMap che esegue il salvataggio nel database verrà eseguito su un thread diverso rispetto a quello che ha creato la transazione in primo luogo a causa dell'annotazione. Per ovviare a ciò ho creato la transazione a livello di codice all'interno del metodo flatMap appena prima dell'operazione di salvataggio, quindi ho memorizzato la transazione su un oggetto di contesto che ho passato alla pipeline osservabile e all'interno di doOnError faccio qualcosa del tipo: 'transactionManager.rollback (context. getTransaction()); '. – odybour

Problemi correlati