2012-01-19 13 views
15

Ho un programma jar autoeseguibile che fa molto affidamento su Spring Integration. Il problema che sto avendo è che il programma termina prima che gli altri bean Spring abbiano terminato completamente.In attesa di terminare tutti i thread in Spring Integration

Di seguito è una versione ridotta del codice che sto utilizzando, posso fornire più codice/configurazione, se necessario. Il punto di ingresso è un metodo main(), che bootstrap primavera e avvia il processo di importazione:

public static void main(String[] args) { 
    ctx = new ClassPathXmlApplicationContext("flow.xml"); 
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean"); 
    try { 
     importer.startImport(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } finally { 
     ctx.close(); 
    } 
} 

Il DataImporter contiene un semplice ciclo che spara i messaggi a un gateway di integrazione di primavera. Ciò fornisce un approccio "push" attivo al flusso, piuttosto che l'approccio comune del polling per i dati. Questo è dove il mio problema è disponibile in:

public void startImport() throws Exception { 
    for (Item item : items) { 
     gatewayBean.publish(item); 
     Thread.sleep(200); // Yield period 
    } 
} 

Per completezza, l'XML flusso simile a questa:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" /> 

<splitter input-channel="inChannel" output-channel="splitChannel" /> 

<payload-type-router input-channel="splitChannel"> 
    <mapping type="Item" channel="itemChannel" /> 
    <mapping type="SomeOtherItem" channel="anotherChannel" /> 
</payload-type-router> 

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" /> 

Le partenze di flusso ed elabora in modo efficace gli elementi, ma una volta che il ciclo startImport() termina la il thread principale termina e strappa immediatamente tutti i thread di Spring Integration. Ciò si traduce in una condizione di competizione, gli ultimi (n) elementi non vengono completamente elaborati al termine del programma.

Ho un'idea di mantenere un conteggio dei riferimenti degli articoli che sto elaborando, ma si sta dimostrando piuttosto complicato, poiché il flusso spesso divide/indirizza i messaggi a più attivatori di servizi - il che significa che è difficile determinare se ogni oggetto ha "finito".

Quello che penso di cui ho bisogno è un modo per controllare che nessun bean Spring sia ancora in esecuzione, o per segnalare che tutti gli elementi inviati al gateway sono stati completamente elaborati prima di terminare.

La mia domanda è: come potrei andare a fare una di queste cose o c'è un approccio migliore al mio problema a cui non ho pensato?

risposta

9

Non stai utilizzando uno schema di richiesta-risposta qui.

in uscita-channel-adapter è un'azione di attivazione e dimenticanza, se si desidera attendere la risposta è necessario utilizzare un gateway in uscita che attenderà la risposta e connettere la risposta al gateway originale, quindi in java sendAndReceive non solo pubblicare.

+1

Questo era davvero il mio problema. Ho modificato il flusso per utilizzare anziché e ho collegato il risultato a un gateway asincrono. Ora posso attendere il completamento usando Future.isDone(). Grazie per l'aiuto! – seanhodges

3

Se è possibile ottenere un Item per determinare, se è ancora necessario o no (processingFinished() o qualcosa di simile eseguite in back-end stadi), è possibile registrare tutti i Item s ad un'autorità centrale, che tiene traccia del numero di Item s non rifiniti e determina in modo efficace una condizione di terminazione.

Se questo approccio è fattibile, si potrebbe anche pensare di imballare gli articoli in FutureTask -oggetti o fare uso di concetti simili da java.util.concurrent.

Edit: seconda idea:

Avete pensato di fare i canali più intelligenti? Un mittente chiude il canale quando non invia più dati. In questo scenario, i bean worker non devono essere thread Deamon ma possono determinare il loro criterio di terminazione basato su un canale di input chiuso e vuoto.

+0

Alcune buone idee lì dentro.Tuttavia, nel mio progetto la classe "Articolo" è troppo generica (parte di un livello del modello) per aggiungere la logica di importazione. Potrei cercare modi per fare qualcosa di simile con le intestazioni dei messaggi. FutureTask et al sarebbe bello, ma sto cercando di capire come utilizzare il framework concomitante per il mio problema specifico ... – seanhodges

Problemi correlati