2011-02-06 14 views
7
 var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     var zip = a.Zip(b, (x, y) => x + "-" + y); 
     zip.Subscribe(Console.WriteLine); 

stampe
0 - 5.
: 1 - 6
2 - 7
...Come partecipare a più sequenze IObservable?

Invece, desidero aderire valori identici
5 - 5
6 - 6
7 - 7
8 - 8
...

Questo è un esempio semplificato del problema dell'unione di 100 di sequenze asincrone ordinate. È molto facile unire due IEnumerable, ma non ho trovato un modo per fare qualcosa di simile in Rx. Qualche idea?

Ulteriori informazioni sugli input e su ciò che sto cercando di ottenere. Fondamentalmente, l'intero sistema è una pipeline in tempo reale con più macchine di stato (aggregatori, buffer, filtri di livellamento, ecc.) Collegati da un modello di fork-join. RX è adatto per implementare queste cose? Ogni ingresso può essere rappresentata come

public struct DataPoint 
{ 
    public double Value; 
    public DateTimeOffset Timestamp; 
} 

Ogni bit di ingresso dei dati viene timestamped all'arrivo, quindi tutti gli eventi sono naturalmente ordinate in loro chiave giunzione (timestamp). Man mano che gli eventi viaggiano attraverso il gasdotto, vengono biforcuti e uniti. I join devono essere correlati da timestamp e applicati in ordine predefinito. Ad esempio, join (a, b, c, d) => join (join (join, a, b), c), d).

Modifica Di seguito è quello che potrei trovare in fretta. Speriamo che ci sia una soluzione più semplice basata sugli operatori Rx esistenti.

static void Test() 
    { 
     var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     //var zip = a.Zip(b, (x, y) => x + "-" + y); 
     //zip.Subscribe(Console.WriteLine); 

     var joined = MergeJoin(a,b, (x,y) => x + "-" + y); 
     joined.Subscribe(Console.WriteLine); 
    } 

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
    { 
     return Observable.CreateWithDisposable<string>(o => 
      { 
       Queue<int> a = new Queue<int>(); 
       Queue<int> b = new Queue<int>(); 
       object gate = new object(); 

       left.Subscribe(x => 
        { 
         lock (gate) 
         { 
          if (a.Count == 0 || a.Peek() < x) 
           a.Enqueue(x); 

          while (a.Count != 0 && b.Count != 0) 
          { 
           if (a.Peek() == b.Peek()) 
           { 
            o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
           } 
           else if (a.Peek() < b.Peek()) 
           { 
            a.Dequeue(); 
           } 
           else 
           { 
            b.Dequeue(); 
           } 
          } 
         } 
        }); 

       right.Subscribe(x => 
       { 
        lock (gate) 
        { 
         if (b.Count == 0 || b.Peek() < x) 
          b.Enqueue(x); 

         while (a.Count != 0 && b.Count != 0) 
         { 
          if (a.Peek() == b.Peek()) 
          { 
           o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
          } 
          else if (a.Peek() < b.Peek()) 
          { 
           a.Dequeue(); 
          } 
          else 
          { 
           b.Dequeue(); 
          } 
         } 
        } 
       }); 

       return Disposable.Empty; 
      }); 
+0

Alla domanda stessa domanda sul [forum rx] (http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –

risposta

1

Onestamente non riesco a pensare di una soluzione basata su operatori esistenti che funzionano per sorgenti calde di ordine sconosciuto (ovvero, xs before ys rispetto a ys before xs). La soluzione sembra bene (hey, se funziona), ma mi piacerebbe fare alcuni cambiamenti se fosse il mio codice:

  • cancellazione supportare correttamente utilizzando MutableDisposable e CompositeDisposable
  • chiamata OnError per eccezioni generate dal selettore (rendendolo più coerente con altri operatori)
  • consideri sostenere il completamento se è possibile per una sorgente per completare prima che l'altro

il codice qui sotto è stato testato con il doppio ingresso-Range, il s ingressi AME capovolte, così come con Empty<int> + Never<int>:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
{ 
    return Observable.CreateWithDisposable<string>(o => 
    { 
     Queue<int> a = new Queue<int>(); 
     Queue<int> b = new Queue<int>(); 
     object gate = new object(); 

     bool leftComplete = false; 
     bool rightComplete = false; 

     MutableDisposable leftSubscription = new MutableDisposable(); 
     MutableDisposable rightSubscription = new MutableDisposable(); 

     Action tryDequeue =() => 
     { 
      lock (gate) 
      { 
       while (a.Count != 0 && b.Count != 0) 
       { 
        if (a.Peek() == b.Peek()) 
        { 
         string value = null; 

         try 
         { 
          value = selector(a.Dequeue(), b.Dequeue()); 
         } 
         catch (Exception ex) 
         { 
          o.OnError(ex); 
          return; 
         } 

         o.OnNext(value); 
        } 
        else if (a.Peek() < b.Peek()) 
        { 
         a.Dequeue(); 
        } 
        else 
        { 
         b.Dequeue(); 
        } 
       } 
      } 
     }; 

     leftSubscription.Disposable = left.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (a.Count == 0 || a.Peek() < x) 
        a.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      leftComplete = true; 

      if (a.Count == 0 || rightComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     rightSubscription.Disposable = right.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (b.Count == 0 || b.Peek() < x) 
        b.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      rightComplete = true; 

      if (b.Count == 0 || leftComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     return new CompositeDisposable(leftSubscription, rightSubscription); 
    }); 
} 
3

GroupBy può fare quello che ti serve. Sembra che non ci siano limiti di tempo quando gli oggetti vengono "uniti", hai solo bisogno di elementi simili per stare insieme in qualche modo.

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15)) 
.GroupBy(k => k) 
.Subscribe(go => go.Count().Where(cnt => cnt > 1) 
          .Subscribe(cnt => 
        Console.WriteLine("Key {0} has {1} matches", go.Key, cnt))); 

Due cose da notare su quanto sopra, Unire ha le seguenti sovraccarichi, in modo che il vostro req di avere centinaia di corsi d'acqua unite non presenta un problema:

Merge<TSource>(params IObservable<TSource>[] sources); 
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources); 
Merge<TSource>(this IObservable<IObservable<TSource>> source); 

Inoltre, GroupBy rendimenti IObservable<IGroupedObservable<TKey, TSource>> che significa che puoi reagire a ciascun gruppo e ad ogni nuovo membro di ciascun gruppo appena entrano - non c'è bisogno di aspettare che tutto sia completo.

+0

L'unico problema è che devo essere in grado di unire i valori in ordine. Tuttavia può essere risolto se invece di int togli le tuple del valore dell'indice. –

+0

Cosa intendi con "in ordine"? –

+0

Ricorda che usando 'Merge' +' Count', non otterrai alcuna corrispondenza fino a quando non terminano entrambe le sequenze sorgente. Questo va bene per l'esempio "Range", ma se le tue fonti sono calde/senza fine l'output potrebbe non essere quello che ti aspetti. –

1

Come utilizzare il nuovo operatore di join in v.2838.

var a = Observable.Range(1, 10); 
var b = Observable.Range(5, 10); 

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput, bOutput)) 
    .Where(tupple => tupple.Item1 == tupple.Item2); 

joinedStream.Subscribe(output => Trace.WriteLine(output)); 

Questo è il mio primo sguardo Join e non sono sicuro se sarebbe saggio usare l'operatore Never come questo. Quando si tratta di un gran numero di input in quanto avrebbe gernerato una quantità enorme di operazioni, più input sono stati revisuati. Penserei che si potrebbe fare del lavoro per chiudere le finestre come sono fatti e rendere la soluzione più efficiente. Detto questo l'esempio sopra funziona come da tua domanda.

Per la cronaca, penso che la risposta di Scott sia probabilmente la strada da percorrere in questo caso. Sto solo gettando questo come un'alternativa potenziale.

+0

+1 per soluzione con join. Ho passato un'ora ieri e non sono riuscito a farlo funzionare. Condivido le tue preoccupazioni per le prestazioni. Inoltre, il codice risultante è molto più criptico e difficile da seguire rispetto al semplice join LINQ. Sto iniziando a pensare che Rx non sia una buona soluzione per questo tipo di problemi. –

+0

@Serger - Sono sicuro che questo potrebbe essere reso più efficiente emettendo i valori di durata come le corrispondenze sono fatte (cioè sostituendo l'Observable.Never con qualcosa di un po 'più intelligente). Dipenderà tutto da cosa si tratta per quando è sicuro completare la durata. –

2

Questa risposta è copiato dal Rx forums, solo in modo che esso sarà archiviato qui così:

var xs = Observable.Range(1, 10); 
var ys = Observable.Range(5, 10); 

var joined = from x in xs 
    from y in ys 
    where x == y 
    select x + "-" + y; 

o senza l'utilizzo di espressioni di query:

var joined = 
    xs.SelectMany(x => ys, (x, y) => new {x, y}) 
    .Where(t => t.x == t.y) 
    .Select(t => t.x + "-" + t.y); 
+2

L'unico problema con questa soluzione è che richiede che 'ys' sia hot (o' Multicast') e non supporta lo scenario in cui il valore 'ys' viene visualizzato prima del valore' xs'. –

Problemi correlati