2016-04-20 12 views
5

Ho un flusso di integrazione molto semplice, in cui una richiesta RESTful viene inoltrata a due provider utilizzando un canale di sottoscrizione di pubblicazione. Il risultato di entrambi i servizi RESTful viene quindi aggregato in un singolo array. Lo schizzo del flusso di integrazione è la seguente:Integrazione molla Java DSL - Configurazione dell'aggregatore

@Bean 
IntegrationFlow flow() throws Exception { 
    return IntegrationFlows.from("inputChannel") 
      .publishSubscribeChannel(s -> s.applySequence(true) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider1.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
       ).subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider2.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class) 
         ) 
       ) 
      ) 
      .aggregate() 
      .get(); 
} 

Tuttavia, quando si esegue il mio codice, l'array risultante contiene gli oggetti restituiti da uno solo dei servizi RESTful. C'è qualche passaggio di configurazione che mi manca?

UPDATE

La seguente versione corrisponde alla soluzione completa, tenendo conto dei commenti di Artem.

@Bean 
IntegrationFlow flow() throws Exception { 
    return IntegrationFlows.from("inputChannel-scatter") 
      .publishSubscribeChannel(s -> s.applySequence(true) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider1.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
         .channel("inputChannel-gather")) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider2.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
         .channel("inputChannel-gather"))) 
      .get(); 
} 

@Bean 
IntegrationFlow gatherFlow() { 
    return IntegrationFlows.from("inputChannel-gather") 
      .aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
         g.getMessages().stream() 
           .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload())) 
           .collect(Collectors.toList()).toArray(new ItemDTO[0])))) 
      .get(); 
} 

risposta

3

In realtà non funziona in questo modo.

Il .aggregate() è un terzo abbonato a quello publishSubscribeChannel.

È necessario interrompere il flusso su due di essi. Come questo:

@Bean 
    public IntegrationFlow publishSubscribeFlow() { 
     return flow -> flow 
       .publishSubscribeChannel(s -> s 
         .applySequence(true) 
         .subscribe(f -> f 
           .handle((p, h) -> "Hello") 
           .channel("publishSubscribeAggregateFlow.input")) 
         .subscribe(f -> f 
           .handle((p, h) -> "World!") 
           .channel("publishSubscribeAggregateFlow.input")) 
       ); 
    } 

    @Bean 
    public IntegrationFlow publishSubscribeAggregateFlow() { 
     return flow -> flow 
       .aggregate(a -> a.outputProcessor(g -> g.getMessages() 
         .stream() 
         .<String>map(m -> (String) m.getPayload()) 
         .collect(Collectors.joining(" ")))) 
       .channel(c -> c.queue("subscriberAggregateResult")); 
    } 

Prestare attenzione, per favore, per l'utilizzo .channel("publishSubscribeAggregateFlow.input") da entrambi gli utenti.

Per essere onesti questo è un punto di qualsiasi publish-subscribe. Dobbiamo sapere dove inviare il risultato di tutti gli abbonati se stiamo per aggregarli.

Il tuo caso di utilizzo mi ricorda il modello EIP Scatter-Gather.

Non abbiamo ancora la sua implementazione nella DSL. Sentiti libero di sollevare un GH issue sull'argomento e cercheremo di gestirlo nella prossima versione 1.2.

UPDATE

problema Il GH sulla questione: https://github.com/spring-projects/spring-integration-java-dsl/issues/75

+0

Grazie mille per aiutare Artem. In effetti, ho provato a utilizzare i canali e il flusso separato in precedenza, senza successo perché avevo anche il problema con l'aggregatore. La tua risposta mi ha dato anche suggerimenti su come scrivere l'aggregatore. – user3329862

+0

Ottimo! Ma Scatter-Gather sarebbe una buona aggiunta al DSL. Quindi, non essere timido a sollevare un problema di GH in ogni caso! –

Problemi correlati