2011-01-19 11 views
7

Desidero utilizzare le estensioni reattive per trasformare alcuni messaggi e inoltrarli dopo un breve ritardo.Ritardo e deduplicazione utilizzando le estensioni reattive (Rx)

I messaggi sembrano qualcosa di simile:

class InMsg 
{ 
    int GroupId { get; set; } 
    int Delay { get; set; } 
    string Content { get; set; } 
} 

L'output simile a questa:

class OutMsg 
{ 
    int GroupId { get; set; } 
    string Content { get; set; } 
    OutMsg(InMsg in) 
    { 
     GroupId = in.GroupId; 
     Content = Transform(in.Content); // function omitted 
    } 
} 

ci sono un paio di requisiti:

  • La lunghezza del ritardo dipende dal contenuto del messaggio.
  • Ogni messaggio ha un ID gruppo
  • Se un nuovo messaggio arriva con lo stesso GroupId di un messaggio in ritardo in attesa di trasmissione, il primo messaggio deve essere eliminato e solo il secondo trasmesso dopo un nuovo periodo di ritardo.

Dato un'osservabile <inMsg> e una funzione di invio:

IObservable<InMsg> inMsgs = ...; 

void Send(OutMsg o) 
{ 
    ... // publishes transformed messages 
} 

ho capito che posso usare Selezionare per eseguire la trasformazione.

void SetUp() 
{ 
    inMsgs.Select(i => new OutMsg(i)).Subscribe(Send); 
} 
  • Come posso applicare un messaggio specificano ritardare? (Si noti che questo potrebbe/dovrebbe comportare la consegna fuori servizio dei messaggi.)
  • Come posso deduplicare i messaggi con lo stesso GroupId?
  • Rx è in grado di risolvere questo problema?
  • C'è un altro modo per risolvere questo?

risposta

7

È possibile utilizzare GroupBy per fare un IGroupedObservable, Delay a ritardare l'uscita, e Switch per assicurarsi che i valori più recenti sostituiscono i valori precedenti nel loro gruppo:

IObservable<InMsg> inMessages; 

inMessages 
    .GroupBy(msg => msg.GroupId) 
    .Select(group => 
     { 
      return group.Select(groupMsg => 
       { 
        TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay); 
        OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here 

        return Observable.Return(outMsg).Delay(delay); 
       }) 
       .Switch(); 
     }) 
     .Subscribe(outMsg => Console.Write("OutMsg received")); 

Una nota sulla realizzazione: se un valore raggruppati arrivato dopo il messaggio viene inviato (es. dopo il ritardo), inizierà un nuovo ritardo

+0

Ho avuto un gioco con questo e non fa proprio quello che mi aspetterei. La sottoscrizione riceve un "System.Collections.Generic.AnonymousObservable'1 [OutMsg]" – chillitom

+0

Sembra che tu non stia chiamando 'Switch'.Se si esegue il "passaggio del mouse" in "Seleziona" in Visual Studio, è necessario indicare che restituisce un IObservable . Se restituisce 'IObservable >', non stai chiamando Switch –

0

il quadro Rx risolve il ritardo tramite the Delay metodo di estensione. L'accodamento con la deduplicazione potrebbe essere risolto applicando un normale ordinamento LINQ dopo il Ritardo, quindi facendo un DistinctUntilChanged.

Aggiornamento: Ammetto che l'approccio di ritardo qui non funzionerà da solo. In qualche modo devi accodare i messaggi in arrivo entro il ritardo che stai cercando. Questo è realizzato dal metodo di estensione BufferWithTime. Questo metodo restituirà elenchi di messaggi, che è quindi possibile rimuovere i duplicati prima di pubblicarli in coda al successivo osservatore.

Problemi correlati