2011-07-04 5 views
5

Sto sviluppando un'applicazione multiprocesso utilizzando Node.js. In questa applicazione, un processo padre genera un processo secondario e comunica con esso utilizzando un protocollo di messaggistica basato su JSON su una pipe. Ho scoperto che i grandi messaggi JSON possono essere "tagliati", in modo tale che un singolo "blocco" emesso nel listener dei dati sulla pipe non contenga il messaggio JSON completo. Inoltre, piccoli messaggi JSON possono essere raggruppati nello stesso blocco. Ogni messaggio JSON sarà delimitato da un carattere di nuova riga, quindi mi chiedo se esiste già un'utilità che bufferizzerà il flusso di lettura della pipe in modo che emetta una riga alla volta (e quindi, per la mia applicazione, un documento JSON Al tempo). Sembra che sarebbe un caso d'uso abbastanza comune, quindi mi chiedo se sia già stato fatto.Streaming orientato alla linea in Node.js

Apprezzerei qualsiasi consiglio chiunque potesse offrire. Grazie.

+0

Avete pensato solo usando il buon vecchio HTTP ? Perché stai inventando un nuovo protocollo IPC? L'invio di messaggi JSON su HTTP è un problema risolto e il nodo è ottimo su HTTP. –

+0

Puoi descrivere come lo faresti con HTTP?In questo momento, non vedo come questo cambierebbe la natura del problema, poiché credo che tu stia ancora leggendo pezzi da un flusso. – jbeard4

+0

Si codifica il processo figlio come un normale server HTTP node.js. La capacità di accettare e inviare messaggi JSON è fornita dal middleware express.js/connect.js bodyParser. https://github.com/senchalabs/connect/blob/master/lib/middleware/bodyParser.js. Questo gestisce i blocchi nel modo standard guidato da evento node.js. Non c'è bisogno di reinventare questo meccanismo. –

risposta

4

Forse Pedro's carrier può aiutarti?

Carrier consente di implementare i nuovi protocolli terminati su node.js.

Il client può inviare blocchi di linee e il gestore comunicherà l'utente solo a su ciascuna linea completata.

1

soluzione più semplice è quella di inviare lunghezza dei dati JSON prima di ciascun messaggio come prefisso di lunghezza fissa (4 byte?) E hanno un parser semplice un-inquadratura che tampona piccoli pezzi o divide più grandi.

È possibile provare node-binary per evitare di scrivere manualmente il parser. Guarda l'esempio di documentazione scan(key, buffer) - fa esattamente la lettura riga per riga.

2

La mia soluzione a questo problema è di inviare messaggi JSON terminati con qualche carattere unicode speciale. Un personaggio che non si otterrebbe mai normalmente nella stringa JSON. Chiamalo TERM.

Quindi il mittente esegue semplicemente "JSON.stringify (message) + TERM;" e lo scrive Il ricevitore quindi divide i dati in entrata sul TERM e analizza le parti con JSON.parse() che è piuttosto veloce. Il trucco è che l'ultimo messaggio potrebbe non essere analizzato, quindi salviamo semplicemente quel frammento e lo aggiungiamo all'inizio del prossimo messaggio quando arriva. Che riceve il codice va in questo modo:

 s.on("data", function (data) { 
     var info = data.toString().split(TERM); 
     info[0] = fragment + info[0]; 
     fragment = ''; 

     for (var index = 0; index < info.length; index++) { 
      if (info[index]) { 
       try { 
        var message = JSON.parse(info[index]); 
        self.emit('message', message); 
       } catch (error) { 
        fragment = info[index]; 
        continue; 
       } 
      } 
     } 
    }); 

Dove "frammento" è definito somwhere dove persisterà tra i blocchi di dati.

Ma che cos'è TERM? Ho usato il carattere di sostituzione unicode '\ uFFFD'. Si potrebbe anche usare la tecnica usata da Twitter dove i messaggi sono separati da '\ r \ n' e i tweet usano '\ n' per le nuove linee e non contengono mai '\ r \ n'

Trovo che sia molto più semplice di fare confusione con lunghezze e simili.

0

Finché nuove righe (o qualsiasi delimitatore si utilizza) si delimitano solo i messaggi JSON e non essere incorporati in essi, è possibile utilizzare il seguente schema:

const buf = '' 
s.on('data', data => { 
    buf += data.toString() 
    const idx = buf.indexOf('\n') 
    if (idx < 0) { return } // No '\n', no full message 
    let lines = buf.split('\n') 
    buf = lines.pop() // if ends in '\n' then buf will be empty 
    for (let line of lines) { 
    // Handle the line 
    } 
}) 
Problemi correlati