2015-07-10 9 views
7

Ho una classe in questo modo:Come posso controllare il flusso del programma usando eventi e promesse?

import net from 'net'; 
import {EventEmitter} from 'events'; 
import Promise from 'bluebird'; 

class MyClass extends EventEmitter { 
    constructor(host = 'localhost', port = 10011) { 
     super(EventEmitter); 
     this.host = host; 
     this.port = port; 
     this.socket = null; 
     this.connect(); 
    } 
    connect() { 
     this.socket = net.connect(this.port, this.host); 
     this.socket.on('connect', this.handle.bind(this)); 
    } 
    handle(data) { 
     this.socket.on('data', data => { 

     }); 
    } 
    send(data) { 
     this.socket.write(data); 
    } 
} 

Come dovrei girare il metodo send in una promessa, che restituisce un valore dall'evento del socket data? Il server invia dati solo quando i dati vengono inviati ad esso, a parte un messaggio di connessione che può essere facilmente soppresso.

Ho provato qualcosa di simile:

handle(data) { 
    this.socket.on('data', data => { 
     return this.socket.resolve(data); 
    }); 
    this.socket.on('error', this.socket.reject.bind(this)); 
} 
send(data) { 
    return new Promise((resolve, reject) => { 
     this.socket.resolve = resolve; 
     this.socket.reject = reject; 
     this.socket.write(data); 
    }); 
} 

Ovviamente questo non funzionerà perché resolve/reject sovrascriverà l'altro quando concatenamento e/o chiamando send più volte in parallelo.

C'è anche il problema di chiamare lo send due volte in parallelo e risolve prima la risposta.

Attualmente ho un'implementazione che utilizza una coda e un file defers, ma ci si sente in disordine poiché la coda viene costantemente controllata.

mi piacerebbe essere in grado di effettuare le seguenti operazioni:

let c = new MyClass('localhost', 10011); 
c.send('foo').then(response => { 
    return c.send('bar', response.param); 
    //`response` should be the data returned from `this.socket.on('data')`. 
}).then(response => { 
    console.log(response); 
}).catch(error => console.log(error)); 

Giusto per aggiungere, non ho alcun controllo sui dati che si riceve, il che significa che non può essere modificato al di fuori di il flusso.

Modifica: Quindi sembra che questo sia piuttosto impossibile, a causa del fatto che TCP non ha un flusso di richiesta-risposta. Come può essere implementato usando ancora promesse, ma usando una catena di promesse a esecuzione singola (una richiesta alla volta) o una coda.

+0

Vuoi dire come una chat a due vie? Invia un messaggio e aspetta di ricevere un messaggio, in questo modo? – thefourtheye

+0

@thefourtheye Quasi, tranne che potrebbe essere necessario chiamare 'send' in parallelo e la promessa dovrebbe restituire la risposta corretta a seconda di ciò che è stato inviato. Sebbene tutti i dati ricevuti provengano da un flusso, quindi non è esattamente tracciabile. –

+0

Sto indovinando qui ... potresti impostare una specie di oggetto observer in 'socket' con un metodo' .add() ', quindi chiamare' this.socket.observer.add ({ \t \t \t rifiuto: rifiutare, \t \t \t determinazione: risolvere \t \t}; 'da' send() ' –

risposta

3

ho distillata il problema al minimo indispensabile e ne ha fatto il browser eseguibile:

  1. classe Socket è deriso.
  2. Informazioni rimosse su porta, host ed ereditarietà da EventEmitter.

La soluzione funziona aggiungendo nuove richieste alla catena di promessa, ma consentendo al massimo una richiesta aperta/non risposta in qualsiasi dato momento. .send restituisce una nuova promessa ogni volta che viene chiamato e la classe si occupa di tutta la sincronizzazione interna. Pertanto, .send può essere chiamato più volte e l'elaborazione delle richieste corrette (FIFO) è garantita. Una funzionalità aggiuntiva che ho aggiunto è il taglio della catena di promesse, se non ci sono richieste in sospeso.


Caveat ho omesso la gestione degli errori del tutto, butit dovrebbe essere adattata per il vostro particolare caso d'uso in ogni caso.


DEMO

class SocketMock { 

    constructor(){ 
    this.connected = new Promise((resolve, reject) => setTimeout(resolve,200)); 
    this.listeners = { 
    // 'error' : [], 
    'data' : [] 
    } 
    } 

    send(data){ 

    console.log(`SENDING DATA: ${data}`); 
    var response = `SERVER RESPONSE TO: ${data}`; 
    setTimeout(() => this.listeners['data'].forEach(cb => cb(response)),    
       Math.random()*2000 + 250); 
    } 

    on(event, callback){ 
    this.listeners[event].push(callback); 
    } 

} 

class SingleRequestCoordinator { 

    constructor() { 
     this._openRequests = 0; 
     this.socket = new SocketMock(); 
     this._promiseChain = this.socket 
      .connected.then(() => console.log('SOCKET CONNECTED')); 
     this.socket.on('data', (data) => { 
     this._openRequests -= 1; 
     console.log(this._openRequests); 
     if(this._openRequests === 0){ 
      console.log('NO PENDING REQUEST --- trimming the chain'); 
      this._promiseChain = this.socket.connected 
     } 
     this._deferred.resolve(data); 
     }); 

    } 

    send(data) { 
     this._openRequests += 1; 
     this._promiseChain = this._promiseChain 
     .then(() => { 
      this._deferred = Promise.defer(); 
      this.socket.send(data); 
      return this._deferred.promise; 
     }); 
     return this._promiseChain; 
    } 
} 

var sender = new SingleRequestCoordinator(); 

sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); 
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); 
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); 

setTimeout(() => sender.send('data-4') 
    .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000); 
0

Se le vostre chiamate send() sono confuse, è necessario salvarle nella cache. Per essere sicuri che il messaggio ricevuto corrisponda a quello inviato, devi assegnare un numero unico id per ogni messaggio in payload.

Così il vostro mittente del messaggio sarà simile a questa

class MyClass extends EventEmitter { 
    constructor() { 
    // [redacted] 
    this.messages = new Map(); 
    } 

    handle(data) { 
    this.socket.on('data', data => { 
     this.messages.get(data.id)(data); 
     this.messages.delete(data.id); 
    }); 
    } 

    send(data) { 
    return return new Promise((resolve, reject) => { 
     this.messages.set(data.id, resolve); 
     this.socket.write(data); 
    }); 
    } 
} 

Questo codice non sarà ragionevole ordine messaggio e si otterrà API, che si desidera.

+0

Non c'è modo di assegnare un 'id' al di fuori dello script. 'Id' non verrà restituito dal flusso, il che significa che lo script non saprà quale risposta è stata ricevuta. –

+0

Quindi, vuoi risolvere la promessa con un prossimo data frame dal socket? Questa soluzione non sembra affidabile, ma puoi farlo, se crei array 'messages' invece dell'oggetto –

0

socket.write(data[, encoding][, callback]) effettua una richiamata. Puoi rifiutare o risolvere in questo callback.

class MyClass extends EventEmitter { 
    constructor(host = 'localhost', port = 10011) { 
    super(EventEmitter); 
    this.host = host; 
    this.port = port; 
    this.socket = null; 
    this.requests = null; 
    this.connect(); 
    } 
    connect() { 
    this.socket = net.connect(this.port, this.host); 
    this.socket.on('connect',() => { 
     this.requests = []; 
     this.socket.on('data', this.handle.bind(this)); 
     this.socket.on('error', this.error.bind(this)); 
    }); 
    } 
    handle(data) { 
    var [request, resolve, reject] = this.requests.pop(); 
    // I'm not sure what will happen with the destructuring if requests is empty 
    if(resolve) { 
     resolve(data); 
    } 
    } 
    error(error) { 
    var [request, resolve, reject] = this.requests.pop(); 
    if(reject) { 
     reject(error); 
    } 
    } 
    send(data) { 
    return new Promise((resolve, reject) => { 
     if(this.requests === null) { 
     return reject('Not connected'); 
     } 
     this.requests.push([data, resolve, reject]); 
     this.socket.write(data); 
    }); 
    } 
} 

Non testato e quindi non sicuro delle firme dei metodi, ma questa è l'idea di base.

Ciò presuppone che ci sarà un evento handle o error per richiesta effettuata.

Più ci penso, questo sembra impossibile senza ulteriori informazioni nei dati dell'applicazione, come i numeri di pacchetto per abbinare una risposta a una richiesta.

Il modo in cui è implementato ora (e anche il modo in cui è nella tua domanda), non è nemmeno sicuro che una risposta corrisponda esattamente a un evento handle.

+0

La richiamata serve solo a verificare se i dati sono stati inviati, voglio risolvere la promessa quando i dati sono ricevuti. Gli unici dati ricevuti provengono dall'evento 'data'. –

+0

Allora sei sfortunato con una promessa. Il punto è di eseguirlo solo una volta. La Promessa è di farlo se è già successo o se potrebbe accadere in futuro ... – amiuhle

+1

Ma la promessa sarà risolta solo una volta. Fondamentalmente voglio che il flusso sia "send -> create promise -> return promise -> receive data -> resolve promise". –

Problemi correlati