2011-08-24 28 views
6

Ho un numero di raccolte MongoDB che prendono un numero di documenti JSON da varie fonti di streaming. In altre parole c'è un numero di processi che inseriscono continuamente dati in un insieme di raccolte MongoDB.MongoDb Streaming dei dati inseriti in tempo reale (o quasi in tempo reale)

Ho bisogno di un modo per trasmettere i dati da MongoDB alle applicazioni downstream. Quindi voglio un sistema che sembra concettualmente simile a questo:

App Stream1 --> 
App Stream2 -->  MONGODB  ---> Aggregated Stream 
App Stream3 --> 

O questo:

App Stream1 -->     ---> MongoD Stream1 
App Stream2 -->  MONGODB  ---> MongoD Stream2 
App Stream3 -->     ---> MongoD Stream3 

La domanda è: come faccio a flusso di dati di Mongo, senza dover interrogare continuamente/interrogare il database?

La risposta ovvia domanda sarebbe "Perché non si cambia quei processi di app in streaming per inviare messaggi a una coda, come coniglio, Zero o ActiveMQ, che ha poi inviarle ai vostri processi Mongo streaming e Mongo in una volta come questo":

    MONGODB 
        /|\ 
        | 
App Stream1 -->  |   ---> MongoD Stream1 
App Stream2 --> SomeMQqueue ---> MongoD Stream2 
App Stream3 -->    ---> MongoD Stream3 

in un mondo ideale sì che sarebbe bene, ma abbiamo bisogno di Mongo per garantire che i messaggi sono salvati in primo luogo, al fine di evitare doppioni e garantire che gli ID sono tutti generati ecc Mongo deve sedersi in mezzo come la persistenza strato.

Così come faccio a trasmettere in streaming i messaggi da una raccolta Mongo (non usando GridFS, ecc.) In queste app downstream. La scuola di pensiero di base è stata semplicemente il polling di nuovi documenti e ogni documento raccolto viene aggiornato aggiungendo un altro campo ai documenti JSON archiviati nel database, proprio come un flag di processo in una tabella SQL che memorizza un timestamp elaborato. Cioè ogni 1 secondo sondaggio per documenti elaborati == null .... aggiungi elaborato = now() .... aggiorna il documento.

Esiste un metodo più efficiente/più efficiente dal punto di vista computazionale?

FYI - Questi sono tutti processi Java.

Cheers!

risposta

3

Se si sta scrivendo su una raccolta con limite (o raccolte), è possibile utilizzare uno tailablecursor per inviare nuovi dati nello stream o su una coda di messaggi da cui è possibile eseguire lo streaming. Tuttavia, questo non funzionerà per una collezione non limitata.

+0

Grazie per il collegamento. Purtroppo non utilizzo le collezioni con limite massimo, non una cattiva funzione per un servizio di messaggistica. Sembra un indice sul flag elaborato e il polling è l'unica opzione ... Se un elemento dell'indice è nullo è ancora referenziato nell'indice o l'interrogazione per null significa ancora scansioni di raccolta? – NightWolf

+1

Oppure mi spiace che potremmo avere una collezione limitata con dimensioni fisse come una cache, quindi estrarre gli elementi da un buy 1 e reinserirli in una raccolta ordinaria. La domanda diventa quindi come salviamo il cursore di posizione tra le esecuzioni di app? Suppongo che usiamo solo un campo _id generato automaticamente da Mongo e selezioniamo tutto più grande di quel campo ID ... Sono tutti gli _ID generati da mongo in ordine crescente? – NightWolf

+1

Gli indici memorizzano le voci per 'null'. Se stai pedinando una collezione con un limite, devi memorizzare l'ultima voce che hai visto (puoi memorizzarla come vuoi, usando un'altra raccolta di mongo funzionerà bene), e quindi iniziare il tuo cursore tailable su quell'elemento usando '$ min 'e' skip (1) 'per riprendere. Vedi http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max – dcrosta