2016-02-08 20 views
11

Sto tentando di utilizzare redux-saga per collegare eventi da PouchDB all'applicazione React.js, ma non riesco a capire come collegare gli eventi emessi da PouchDB al mio Saga. Poiché l'evento utilizza una funzione di callback (e non riesco a trasmetterlo a un generatore), non posso usare yield put() all'interno del callback, esso genera strani errori dopo la compilazione ES2015 (usando Webpack).Come legare eventi eventi emessi in redux-saga?

Quindi ecco cosa sto cercando di realizzare, la parte che non funziona è all'interno di replication.on('change' (info) => {}).

function * startReplication (wrapper) { 
    while (yield take(DATABASE_SET_CONFIGURATION)) { 
    yield call(wrapper.connect.bind(wrapper)) 

    // Returns a promise, or false. 
    let replication = wrapper.replicate() 

    if (replication) { 
     replication.on('change', (info) => { 
     yield put(replicationChange(info)) 
     }) 
    } 
    } 
} 

export default [ startReplication ] 

risposta

21

Come Nirrek spiegato, quando si ha bisogno di connettersi a spingere le fonti di dati, si dovrà costruire un iteratore evento per quella fonte.

Vorrei aggiungere che il meccanismo di cui sopra potrebbe essere reso riutilizzabile. Quindi non dobbiamo ricreare un iteratore di eventi per ogni diversa fonte.

La soluzione è creare un canale generico con i metodi put e take. È possibile chiamare il metodo take dall'interno del generatore e collegare il metodo put all'interfaccia listener dell'origine dati.

Ecco una possibile implementazione. Si noti che il canale buffer messaggi se nessuno è in attesa per loro (per esempio il generatore è occupato a fare un po 'di chiamata remota)

function createChannel() { 
    const messageQueue = [] 
    const resolveQueue = [] 

    function put (msg) { 
    // anyone waiting for a message ? 
    if (resolveQueue.length) { 
     // deliver the message to the oldest one waiting (First In First Out) 
     const nextResolve = resolveQueue.shift() 
     nextResolve(msg) 
    } else { 
     // no one is waiting ? queue the event 
     messageQueue.push(msg) 
    } 
    } 

    // returns a Promise resolved with the next message 
    function take() { 
    // do we have queued messages ? 
    if (messageQueue.length) { 
     // deliver the oldest queued message 
     return Promise.resolve(messageQueue.shift()) 
    } else { 
     // no queued messages ? queue the taker until a message arrives 
     return new Promise((resolve) => resolveQueue.push(resolve)) 
    } 
    } 

    return { 
    take, 
    put 
    } 
} 

Poi il canale di cui sopra può essere utilizzato in qualsiasi momento si desidera ascoltare una sorgente di dati spinta esterna. Per il vostro esempio

function createChangeChannel (replication) { 
    const channel = createChannel() 

    // every change event will call put on the channel 
    replication.on('change', channel.put) 
    return channel 
} 

function * startReplication (getState) { 
    // Wait for the configuration to be set. This can happen multiple 
    // times during the life cycle, for example when the user wants to 
    // switch database/workspace. 
    while (yield take(DATABASE_SET_CONFIGURATION)) { 
    let state = getState() 
    let wrapper = state.database.wrapper 

    // Wait for a connection to work. 
    yield apply(wrapper, wrapper.connect) 

    // Trigger replication, and keep the promise. 
    let replication = wrapper.replicate() 

    if (replication) { 
     yield call(monitorChangeEvents, createChangeChannel(replication)) 
    } 
    } 
} 

function * monitorChangeEvents (channel) { 
    while (true) { 
    const info = yield call(channel.take) // Blocks until the promise resolves 
    yield put(databaseActions.replicationChange(info)) 
    } 
} 
+0

Che funziona perfettamente, grazie mille. Mi sono preso la libertà di aggiornare la tua risposta con il codice dalla mia app, quindi ora è testata, codice funzionante. – mikl

+0

@ yassine-elouafi quali sono gli svantaggi di strappare il callback e trasformarlo in action creatore '(informazioni) => ({tipo: 'ON_CHANGE", info}) ', e guardare separatamente per' ON_CHANGE 'e fare' yield put (replicationChange (informazioni) 'lì? – biosckon

5

Il problema fondamentale che dobbiamo risolvere è che gli emettitori di eventi sono "basati su push", mentre le saghe sono "pull-based".

Se si sottoscrive un evento in questo modo: replication.on('change', (info) => {}), quindi la funzione di callback viene eseguito ogni volta che l'emettitore replication evento decide di spinta un nuovo valore.

Con le saghe, dobbiamo capovolgere il controllo. È la saga che deve avere il controllo di quando decide di rispondere alle nuove informazioni sui cambiamenti disponibili. In altre parole, una saga ha bisogno di pull le nuove informazioni.

Di seguito è un esempio di un modo per raggiungere questo obiettivo:

function* startReplication(wrapper) { 
    while (yield take(DATABASE_SET_CONFIGURATION)) { 
    yield apply(wrapper, wrapper.connect); 
    let replication = wrapper.replicate() 
    if (replication) 
     yield call(monitorChangeEvents, replication); 
    } 
} 

function* monitorChangeEvents(replication) { 
    const stream = createReadableStreamOfChanges(replication); 

    while (true) { 
    const info = yield stream.read(); // Blocks until the promise resolves 
    yield put(replicationChange(info)); 
    } 
} 

// Returns a stream object that has read() method we can use to read new info. 
// The read() method returns a Promise that will be resolved when info from a 
// change event becomes available. This is what allows us to shift from working 
// with a 'push-based' model to a 'pull-based' model. 
function createReadableStreamOfChanges(replication) { 
    let deferred; 

    replication.on('change', info => { 
    if (!deferred) return; 
    deferred.resolve(info); 
    deferred = null; 
    }); 

    return { 
    read() { 
     if (deferred) 
     return deferred.promise; 

     deferred = {}; 
     deferred.promise = new Promise(resolve => deferred.resolve = resolve); 
     return deferred.promise; 
    } 
    }; 
} 

C'è un JSbin dell'esempio sopra qui: http://jsbin.com/cujudes/edit?js,console

Si dovrebbe anche dare un'occhiata a risposta di Yassine Elouafi a un simile domanda: Can I use redux-saga's es6 generators as onmessage listener for websockets or eventsource?

2

Grazie alla @Yassine Elouafi

ho creato breve licenza mit implementazione canali generalisti come estensione Redux-saga per il linguaggio tipografico sulla base di soluzione @Yassine Elouafi.

// redux-saga/channels.ts 
import { Saga } from 'redux-saga'; 
import { call, fork } from 'redux-saga/effects'; 

export interface IChannel<TMessage> { 
    take(): Promise<TMessage>; 
    put(message: TMessage): void; 
} 

export function* takeEvery<TMessage>(channel: IChannel<TMessage>, saga: Saga) { 
    while (true) { 
     const message: TMessage = yield call(channel.take); 
     yield fork(saga, message); 
    } 
} 

export function createChannel<TMessage>(): IChannel<TMessage> { 
    const messageQueue: TMessage[] = []; 
    const resolveQueue: ((message: TMessage) => void)[] = []; 

    function put(message: TMessage): void { 
     if (resolveQueue.length) { 
      const nextResolve = resolveQueue.shift(); 
      nextResolve(message); 
     } else { 
      messageQueue.push(message); 
     } 
    } 

    function take(): Promise<TMessage> { 
     if (messageQueue.length) { 
      return Promise.resolve(messageQueue.shift()); 
     } else { 
      return new Promise((resolve: (message: TMessage) => void) => resolveQueue.push(resolve)); 
     } 
    } 

    return { 
     take, 
     put 
    }; 
} 

E esempio di utilizzo simile a redux-saga * costruzione takeEvery

// example-socket-action-binding.ts 
import { put } from 'redux-saga/effects'; 
import { 
    createChannel, 
    takeEvery as takeEveryChannelMessage 
} from './redux-saga/channels'; 

export function* socketBindActions(
    socket: SocketIOClient.Socket 
) { 
    const socketChannel = createSocketChannel(socket); 
    yield* takeEveryChannelMessage(socketChannel, function* (action: IAction) { 
     yield put(action); 
    }); 
} 

function createSocketChannel(socket: SocketIOClient.Socket) { 
    const socketChannel = createChannel<IAction>(); 
    socket.on('action', (action: IAction) => socketChannel.put(action)); 
    return socketChannel; 
} 
0

Ho avuto lo stesso problema anche utilizzando PouchDB e trovato le risposte fornite estremamente utile e interessante. Tuttavia ci sono molti modi per fare la stessa cosa in PouchDB e ho scavato un po 'e ho trovato un approccio diverso che forse è più facile ragionare.

Se non si allegano i listener alla richiesta db.change, vengono restituiti i dati di modifica direttamente al chiamante e l'aggiunta di continuous: true all'opzione causerà l'emissione di una longpoll e non il ritorno fino a quando alcune modifiche non si sono verificate. Quindi lo stesso risultato può essere ottenuto con il seguente

export function * monitorDbChanges() { 
    var info = yield call([db, db.info]); // get reference to last change 
    let lastSeq = info.update_seq; 

    while(true){ 
    try{ 
     var changes = yield call([db, db.changes], { since: lastSeq, continuous: true, include_docs: true, heartbeat: 20000 }); 
     if (changes){ 
     for(let i = 0; i < changes.results.length; i++){ 
      yield put({type: 'CHANGED_DOC', doc: changes.results[i].doc}); 
     } 
     lastSeq = changes.last_seq; 
     } 
    }catch (error){ 
     yield put({type: 'monitor-changes-error', err: error}) 
    } 
    } 
} 

C'è una cosa che non ho ottenuto fino in fondo. Se sostituisco il ciclo for con change.results.forEach((change)=>{...}), viene visualizzato un errore di sintassi non valido su yield. Suppongo che abbia a che fare con lo scontro nell'uso degli iteratori.

4

Possiamo usare eventChannel di redux-saga

Ecco il mio esempio

// fetch history messages 
function* watchMessageEventChannel(client) { 
    const chan = eventChannel(emitter => { 
    client.on('message', (message) => emitter(message)); 
    return() => { 
     client.close().then(() => console.log('logout')); 
    }; 
    }); 
    while (true) { 
    const message = yield take(chan); 
    yield put(receiveMessage(message)); 
    } 
} 

function* fetchMessageHistory(action) { 
    const client = yield realtime.createIMClient('demo_uuid'); 
    // listen message event 
    yield fork(watchMessageEventChannel, client); 
} 

Nota Bene:

messaggi sul un eventChannel non vengono memorizzati per impostazione predefinita. Se si desidera elaborare message event solo uno per uno, non è possibile utilizzare il blocco delle chiamate dopo const message = yield take(chan);

o si deve fornire un buffer alla fabbrica eventChannel al fine di precisare la strategia di buffering per il canale (ad es eventChannel (abbonato, buffer)). Vedi redux-saga API docs per ulteriori informazioni