2015-02-19 9 views
10

Qual è il modo corretto per trasformare il famoso modulo ws in una API reattiva in Node.js? Capisco che i soggetti possano aiutare a superare eventi non reattivi a eventi reattivi, ma il loro problema è che hanno un tempo molto più difficile per smaltire i loro oggetti dipendenti.Come trasformare correttamente WS Server in RXJS Api senza soggetti nei NodeJ

var WebSocketServer = require('ws').Server; 
 
var wss = new WebSocketServer({ port: 8080 }); 
 
var Rx = require('rx'); 
 

 

 
var connectionMessageSubject = new Rx.Subject(); 
 

 
wss.on('connection', function connection(client) { 
 
    ws.on('message', function incoming(message) { 
 
    connectionMessageSubject.onNext({ 
 
     client: client, 
 
     message: message 
 
    }); 
 
    }); 
 
});

non posso usare il loro costruito nel metodo fromEvent perché, registra tanti eventi diversi che NodeJS getta un avviso quando 30 o più client si connettono.

Per esempio ...

var WebSocketServer = require('ws').Server; 
 
var wss = new WebSocketServer({port:8080}); 
 

 
var connectionMessageObservable; 
 

 
//this uses a tremendous amount of memory and throws warnings that the event emitter has a maximum of 30 listeners 
 
wss.on('connection', function connection(client){ 
 
    connnectionMessageObservable = Rx.Observable.fromEvent(client, 'message'); 
 
});

+2

Date un'occhiata a [Rx.DOM.fromWebSocket()] (https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/src/websocket.js). Dovresti essere in grado di clonare sostanzialmente quel metodo e cambiarlo per funzionare con il modulo ws nodeJs. Il modello è un attrezzo da usare. – Brandon

+2

FWIW, ho riscritto un sacco di daWebSocket poco fa. Questo pattern è un buon pattern per il wrapping di qualcosa come WS in Node. –

+1

Inoltre, ho un PR per un operatore 'singleInstance' che * potrebbe * aiutare per questo scenario, se trovi che devi pubblicare(). RefCount() il tuo ws wrapper e cancellarlo completamente a volte. –

risposta

0

Il seguente codice simula il comportamento subject.

var WebSocketServer = require('ws').Server; 
var wss = new WebSocketServer({port:8080}); 

var connectionMessage$ = new Rx.Observable(function (observer) { 
    wss.on('connection', function connection(client){ 
     client.on('message', function (message){ 
      observer.next({ 
       client: client, 
       message: message, 
      }) 
     }); 
    });  
}); 

connectionMessage$.subscribe(function (cm) { 
    // cm.client for client 
    // cm.message for message 
}); 
Problemi correlati