Sono nuovo di MassTransit e mi manca qualcosa nella mia comprensione.MassTransit e pubblicazione di eventi e comandi
Diciamo che ho una server farm in cui tutti i nodi possono fare lo stesso lavoro. Il framework dell'applicazione è in stile CQRS. Questo significa che ho due tipi base del messaggio di pubblicare:
- Comandi: deve essere gestito da esattamente uno dei server, uno di essi (il primo con slot per lavoro gratuito)
- Eventi: devono essere gestiti dal tutti i server
Ho creato un prototipo di MassTransit estremamente semplice (un'applicazione console che invia un messaggio ogni X secondi).
Nell'API, vedo che esiste un metodo di "pubblicazione". Come posso specificare che tipo di messaggio è (uno rispetto a tutti i server)?
Se guardo una configurazione "gestore", posso specificare la coda uri. Se si specifica la stessa coda per tutti gli host, tutti gli host riceveranno il messaggio, ma non posso limitare l'esecuzione a un solo server.
Se ascolto da una coda dedicata host, solo un server gestirà i messaggi, ma non so come trasmettere l'altro tipo di messaggio.
Per favore aiutami a capire cosa mi manca.
PS: se ci tiene, il mio sistema di messaggistica è rabbitmq.
Al fine di testare, ho creare una libreria di classi comune con questo classi:
public static class ActualProgram
{
private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();
private static readonly Random g_Random = new Random();
public static void ActualMain(int delay, int instanceName)
{
Thread.Sleep(delay);
SetupBus(instanceName);
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
Console.WriteLine("Press enter at any time to exit");
Console.ReadLine();
g_Shutdown.Cancel();
Bus.Shutdown();
}
private static void PublishRandomMessage()
{
Bus.Instance.Publish(new Message
{
Id = g_Random.Next(),
Body = "Some message",
Sender = Assembly.GetEntryAssembly().GetName().Name
});
if (!g_Shutdown.IsCancellationRequested)
{
Thread.Sleep(g_Random.Next(500, 10000));
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
}
}
private static void SetupBus(int instanceName)
{
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
sbc.Subscribe(subs =>
{
subs.Handler<Message>(MessageHandled);
});
});
}
private static void MessageHandled(Message msg)
{
ConsoleColor color = ConsoleColor.Red;
switch (msg.Sender)
{
case "test_app1":
color = ConsoleColor.Green;
break;
case "test_app2":
color = ConsoleColor.Blue;
break;
case "test_app3":
color = ConsoleColor.Yellow;
break;
}
Console.ForegroundColor = color;
Console.WriteLine(msg.ToString());
Console.ResetColor();
}
private static void MessageConsumed(Message msg)
{
Console.WriteLine(msg.ToString());
}
}
public class Message
{
public long Id { get; set; }
public string Sender { get; set; }
public string Body { get; set; }
public override string ToString()
{
return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
}
}
Ho anche applicazioni 3 console che basta eseguire il metodo ActualMain:
internal class Program
{
private static void Main(string[] args)
{
ActualProgram.ActualMain(0, 1);
}
}
grazie. Questo mi aiuta molto a capire la differenza. L'unico pensiero che mi spaventa è "avrai bisogno di un processo separato per i gestori di comandi rispetto ai gestori di eventi". Ciò avrà un impatto sull'architettura globale, ma vivrò con quello se non ci fosse altra scelta. –
Per i consumatori in competizione, avrete bisogno di istanze di bus separate per ogni consumatore.Il motivo per cui i comandi sono in genere su un'istanza bus separata è in genere che vengono elaborati in modo sincrono dove i consumatori vengono elaborati su più thread. MT non consente di specificare la concorrenza in base al tipo di messaggio. Penso che sia possibile ospitare più bus in un unico processo ma non l'ho provato. Uso topshelf (degli stessi ragazzi) per ospitare ciascuno dei miei "servizi". Permette di scegliere in processo (appdomains) o processi separati/macchine separate al momento dell'implementazione molto facilmente –
thx. Darò un'occhiata –