var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
stampe
0 - 5.
: 1 - 6
2 - 7
...Come partecipare a più sequenze IObservable?
Invece, desidero aderire valori identici
5 - 5
6 - 6
7 - 7
8 - 8
...
Questo è un esempio semplificato del problema dell'unione di 100 di sequenze asincrone ordinate. È molto facile unire due IEnumerable, ma non ho trovato un modo per fare qualcosa di simile in Rx. Qualche idea?
Ulteriori informazioni sugli input e su ciò che sto cercando di ottenere. Fondamentalmente, l'intero sistema è una pipeline in tempo reale con più macchine di stato (aggregatori, buffer, filtri di livellamento, ecc.) Collegati da un modello di fork-join. RX è adatto per implementare queste cose? Ogni ingresso può essere rappresentata come
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
Ogni bit di ingresso dei dati viene timestamped all'arrivo, quindi tutti gli eventi sono naturalmente ordinate in loro chiave giunzione (timestamp). Man mano che gli eventi viaggiano attraverso il gasdotto, vengono biforcuti e uniti. I join devono essere correlati da timestamp e applicati in ordine predefinito. Ad esempio, join (a, b, c, d) => join (join (join, a, b), c), d).
Modifica Di seguito è quello che potrei trovare in fretta. Speriamo che ci sia una soluzione più semplice basata sugli operatori Rx esistenti.
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});
Alla domanda stessa domanda sul [forum rx] (http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –