2012-07-27 14 views
8

Sono nuovo di MassTransit e mi manca qualcosa nella mia comprensione.MassTransit e pubblicazione di eventi e comandi

Diciamo che ho una server farm in cui tutti i nodi possono fare lo stesso lavoro. Il framework dell'applicazione è in stile CQRS. Questo significa che ho due tipi base del messaggio di pubblicare:

  • Comandi: deve essere gestito da esattamente uno dei server, uno di essi (il primo con slot per lavoro gratuito)
  • Eventi: devono essere gestiti dal tutti i server

Ho creato un prototipo di MassTransit estremamente semplice (un'applicazione console che invia un messaggio ogni X secondi).

Nell'API, vedo che esiste un metodo di "pubblicazione". Come posso specificare che tipo di messaggio è (uno rispetto a tutti i server)?

Se guardo una configurazione "gestore", posso specificare la coda uri. Se si specifica la stessa coda per tutti gli host, tutti gli host riceveranno il messaggio, ma non posso limitare l'esecuzione a un solo server.

Se ascolto da una coda dedicata host, solo un server gestirà i messaggi, ma non so come trasmettere l'altro tipo di messaggio.

Per favore aiutami a capire cosa mi manca.

PS: se ci tiene, il mio sistema di messaggistica è rabbitmq.

Al fine di testare, ho creare una libreria di classi comune con questo classi:

public static class ActualProgram 
{ 
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource(); 

    private static readonly Random g_Random = new Random(); 

    public static void ActualMain(int delay, int instanceName) 
    { 
     Thread.Sleep(delay); 
     SetupBus(instanceName); 

     Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 

     Console.WriteLine("Press enter at any time to exit"); 
     Console.ReadLine(); 
     g_Shutdown.Cancel(); 

     Bus.Shutdown(); 
    } 

    private static void PublishRandomMessage() 
    { 
     Bus.Instance.Publish(new Message 
     { 
      Id = g_Random.Next(), 
      Body = "Some message", 
      Sender = Assembly.GetEntryAssembly().GetName().Name 
     }); 

     if (!g_Shutdown.IsCancellationRequested) 
     { 
      Thread.Sleep(g_Random.Next(500, 10000)); 
      Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 
     } 
    } 

    private static void SetupBus(int instanceName) 
    { 
     Bus.Initialize(sbc => 
     { 
      sbc.UseRabbitMqRouting(); 
      sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName); 
      sbc.Subscribe(subs => 
      { 
       subs.Handler<Message>(MessageHandled); 
      }); 
     }); 
    } 

    private static void MessageHandled(Message msg) 
    { 
     ConsoleColor color = ConsoleColor.Red; 
     switch (msg.Sender) 
     { 
      case "test_app1": 
       color = ConsoleColor.Green; 
       break; 

      case "test_app2": 
       color = ConsoleColor.Blue; 
       break; 

      case "test_app3": 
       color = ConsoleColor.Yellow; 
       break; 
     } 
     Console.ForegroundColor = color; 
     Console.WriteLine(msg.ToString()); 
     Console.ResetColor(); 
    } 

    private static void MessageConsumed(Message msg) 
    { 
     Console.WriteLine(msg.ToString()); 
    } 
} 

public class Message 
{ 
    public long Id { get; set; } 

    public string Sender { get; set; } 

    public string Body { get; set; } 

    public override string ToString() 
    { 
     return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body); 
    } 
} 

Ho anche applicazioni 3 console che basta eseguire il metodo ActualMain:

internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     ActualProgram.ActualMain(0, 1); 
    } 
} 

risposta

9

ciò che si vuole è noto come Competing Consumers (cerca SO per questo troverai maggiori informazioni) Uso di RabbitMQ semplifica la vita, tutto ciò che devi fare è specificare lo stesso nome di coda per ogni utente che avvii, il messaggio verrà elaborato solo da uno loro. Invece di generare una coda unica ogni volta che si sta facendo.

private static void SetupBus(int instanceName) 
{ 
    Bus.Initialize(sbc => 
    { 
     sbc.UseRabbitMqRouting(); 
     sbc.ReceiveFrom("rabbitmq://localhost/Commands); 
     sbc.Subscribe(subs => 
     { 
      subs.Handler<Message>(MessageHandled); 
     }); 
    }); 
} 

per quanto ne so, è necessario avere un processo separato per i gestori di comandi al contrario di gestori di eventi. Tutti i gestori di comandi riceveranno la stessa coda, tutti i gestori di eventi riceveranno dalla propria coda univoca.

L'altro pezzo del puzzle è come si ricevono i messaggi sul bus. Puoi ancora utilizzare publish per i comandi, ma se hai configurato utenti erroneamente potresti ottenere più esecuzioni poiché il messaggio andrà a tutti i consumatori, se vuoi garantire che il messaggio finisca su una singola coda puoi usare Send piuttosto che Publish.

Bus.Instance 
    .GetEndpoint(new Uri("rabbitmq://localhost/Commands")) 
    .Send(new Message 
    { 
     Id = g_Random.Next(), 
     Body = "Some message", 
     Sender = Assembly.GetEntryAssembly().GetName().Name 
    }); 
+0

grazie. Questo mi aiuta molto a capire la differenza. L'unico pensiero che mi spaventa è "avrai bisogno di un processo separato per i gestori di comandi rispetto ai gestori di eventi". Ciò avrà un impatto sull'architettura globale, ma vivrò con quello se non ci fosse altra scelta. –

+0

Per i consumatori in competizione, avrete bisogno di istanze di bus separate per ogni consumatore.Il motivo per cui i comandi sono in genere su un'istanza bus separata è in genere che vengono elaborati in modo sincrono dove i consumatori vengono elaborati su più thread. MT non consente di specificare la concorrenza in base al tipo di messaggio. Penso che sia possibile ospitare più bus in un unico processo ma non l'ho provato. Uso topshelf (degli stessi ragazzi) per ospitare ciascuno dei miei "servizi". Permette di scegliere in processo (appdomains) o processi separati/macchine separate al momento dell'implementazione molto facilmente –

+0

thx. Darò un'occhiata –

Problemi correlati