2011-11-09 15 views
5

Sto provando a creare un IObservable<bool> che restituisce true se un messaggio UDP è stato ricevuto negli ultimi 5 secondi e se si verifica un timeout, viene restituito un falso.Timeout di estensioni reattive che non interrompe la sequenza?

Finora ho questo:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    var udp = BaseComms.UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Select(s => true); 

    return Observable 
     .Timeout(udp, TimeSpan.FromSeconds(5)) 
     .Catch(Observable.Return(false)); 
} 

I problemi con questo sono: -

  • Una volta che un falso viene restituito, la sequenza si ferma
  • Ho solo davvero bisogno true o false su cambiamenti di stato.

ho potuto utilizzare un Subject<T> ma ho bisogno di stare attenti a smaltire l'UDPBaseStringListener osservabile quando non ci sono più abbonati.

Aggiornamento

Ogni volta che ricevo un messaggio UDP vorrei che per restituire un true. Se non ho ricevuto un messaggio UDP negli ultimi 5 secondi, mi piacerebbe che restituisse un false.

+0

FYI, 'Timeout' ha un overload che accetta un supplente osservabile per quando il tempo il timeout si verifica anziché "lanciare" e richiede 'Catch'. –

+0

Anche i lettori potrebbero essere interessati a [1] (http://stackoverflow.com/q/23394441/1267663), [2] (http://stackoverflow.com/q/12786901/1267663) e [3] (http://stackoverflow.com/q/35873244/1267663). – Whymarrh

risposta

3

Come indicato da Bj0, la soluzione con BufferWithTime non restituirà il punto dati non appena viene ricevuto e il timeout del buffer non viene ripristinato dopo la ricezione di un punto dati.

Con Rx Extensions 2.0, il tuo grado di risolvere entrambi i problemi con un nuovo buffer di sovraffaticamento accettare sia un timeout e un taglia:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return BaseComms 
     .UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Buffer(TimeSpan.FromSeconds(5), 1) 
     .Select(s => s.Count > 0) 
     .DistinctUntilChanged(); 
} 
1

Vorrei suggerire di evitare l'uso di Timeout - causa eccezioni e la codifica con eccezioni è errata.

Inoltre, sembra avere senso solo che le vostre fermate osservabili dopo un valore. Potrebbe essere necessario spiegare di più su ciò che si desidera che il comportamento sia.

mia soluzione attuale al vostro problema è:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return Observable.Create<bool>(o => 
    { 
     var subject = new AsyncSubject<bool>(); 
     return new CompositeDisposable(
      Observable.Amb(
       BaseComms 
        .UDPBaseStringListener(localEP) 
        .Where(msg => msg.Data.Contains("running")) 
        .Select(s => true), 
       Observable 
        .Timer(TimeSpan.FromMilliseconds(10.0)) 
        .Select(_ => false) 
      ).Take(1).Subscribe(subject), subject.Subscribe(o)); 
    }); 
} 

fa questo aiuto?

+0

Questo non viene compilato. Observable.Create si aspetta un'azione da restituire ... – Tim

+0

'Timeout' causa solo eccezioni per impostazione predefinita. Ha un sovraccarico che richiede un osservabile alternativo da utilizzare se viene raggiunto il timeout, che il sovraccarico predefinito chiama con "Observable.Throw" come alternativa. –

+0

@Jim - È necessario utilizzare una versione diversa di Rx, compilata correttamente con la versione 1.1.12121. – Enigmativity

1

Se non si desidera la sequenza di fermarsi, basta avvolgerlo in Defer + Repeat:

Observable.Defer(() => GettingUDPMessages(endpoint) 
    .Repeat(); 
2

Il problema di buffer è che l'intervallo di "timeout" non viene reimpostato quando si ottiene un nuovo valore, le finestre buffer sono solo intervalli di tempo (5 in questo caso) che si susseguono. Ciò significa che, a seconda di quando si riceve l'ultimo valore, potrebbe essere necessario attendere quasi il doppio del valore di timeout. Questo può anche mancare timeout:

   should timeout here 
         v 
0s   5s   10s  15s 
|x - x - x | x - - - - | - - - x -| ... 
      true  true  true 

IObservable.Throttle, tuttavia, si resetta ogni volta che un nuovo valore entra e produce solo un valore dopo il periodo è trascorso (l'ultimo valore in ingresso). Questo può essere usato come un timeout e unito al IObservable per inserire i valori "timeout" nel flusso:

var obs = BaseComms.UDPBaseStringListener(localEP) 
      .Where(msg => msg.Data.Contains("running")); 

return obs.Merge(obs.Throttle(TimeSpan.FromSeconds(5)) 
         .Select(x => false)) 
      .DistinctUntilChanged(); 

Un lavoro LINQPad esempio:

var sub = new Subject<int>(); 

var script = sub.Timestamp() 
    .Merge(sub.Throttle(TimeSpan.FromSeconds(2)).Select(i => -1).Timestamp()) 
    .Subscribe(x => 
{ 
    x.Dump("val"); 
}); 


Thread.Sleep(1000); 

sub.OnNext(1); 
sub.OnNext(2); 

Thread.Sleep(10000); 

sub.OnNext(5); 

A -1 viene inserito nel flusso dopo un timeout di 2 secondi.

Problemi correlati