2015-03-23 45 views
5

RabbitMQ 3.5 ora supports message priority; Tuttavia, non sono in grado di creare un esempio funzionante. Ho inserito il mio codice qui sotto. Include l'output che mi aspetto e l'output in realtà. Sarei interessato a più documentazione e/o un esempio funzionante.RabbitMQ 3.5 e priorità messaggio

Quindi la mia domanda in breve: come faccio a ricevere la priorità del messaggio per funzionare su Rabbit 3.5.0.0?

Editore:

using System; 
using RabbitMQ.Client; 
using System.Text; 
using System.Collections.Generic; 

class Publisher 
{ 

    public static void Main() 
    { 
     var factory = new ConnectionFactory() { HostName = "localhost" }; 
     using (var connection = factory.CreateConnection()) 
     { 
      using (var channel = connection.CreateModel()) 
      { 
       IDictionary <String , Object> args = new Dictionary<String,Object>() ; 
       args.Add(" x-max-priority ", 10); 
       channel.QueueDeclare("task_queue1", true, false, true, args); 

       for (int i = 1 ; i<=10; i++) 
       { 
        var message = "Message"; 
        var body = Encoding.UTF8.GetBytes(message + " " + i); 
        var properties = channel.CreateBasicProperties(); 
        properties.SetPersistent(true); 
        properties.Priority = Convert.ToByte(i); 
        channel.BasicPublish("", "task_queue1", properties, body); 
       } 
      } 
     } 
    } 
} 

dei consumatori:

using System; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 
using System.Text; 
using System.Threading; 
using System.Collections.Generic; 

namespace Consumer 
{ 
    class Worker 
    { 
     public static void Main() 
     { 
      var factory = new ConnectionFactory() { HostName = "localhost" }; 
      using (var connection = factory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        IDictionary<String, Object> args = new Dictionary<String, Object>();      
        channel.BasicQos(0, 1, false); 
        var consumer = new QueueingBasicConsumer(channel); 
        IDictionary<string, object> consumerArgs = new Dictionary<string, object>(); 
        channel.BasicConsume("task_queue1", false, "", args, consumer); 
        Console.WriteLine(" [*] Waiting for messages. " + 
             "To exit press CTRL+C"); 
        while (true) 
        { 
         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
         var body = ea.Body; 
         var message = Encoding.UTF8.GetString(body); 
         Console.WriteLine(" [x] Received {0}", message); 
         channel.BasicAck(ea.DeliveryTag, false); 
        } 
       } 
      } 
     } 
    } 
} 

uscita effettiva:

[*] Waiting for messages. To exit press CTRL+C 
[x] Received Message 1 
[x] Received Message 2 
[x] Received Message 3 
[x] Received Message 4 
[x] Received Message 5 
[x] Received Message 6 
[x] Received Message 7 
[x] Received Message 8 
[x] Received Message 9 
[x] Received Message 10 

uscita prevista:

[*] Waiting for messages. To exit press CTRL+C 
[x] Received Message 10 
[x] Received Message 9 
[x] Received Message 8 
[x] Received Message 7 
[x] Received Message 6 
[x] Received Message 5 
[x] Received Message 4 
[x] Received Message 3 
[x] Received Message 2 
[x] Received Message 1 

AGGIORNAMENTO # 1. Ho trovato un esempio in Java here. Tuttavia è il coniglio 3.4.x.x. addin che è stato incorporato in 3.5. L'unica differenza che posso vedere è che esprimono la priorità come int e la mia è un byte. Ma sento che è un'aringa rossa. Sono un po 'in perdita qui.

risposta

6

Bene, l'ho risolto. È stato un errore stupido. ho scritto:

args.Add(" x-max-priority ", 10); 

Avrebbe dovuto essere

args.Add("x-max-priority", 10); 

mi lasciare questo in modo che altre persone possano avere un esempio funzionante di RabbitMQ 3,5 di code di priorità in C#.

2

Un simile Attuazione Coda RabbitMQ priorità nel Nodo JS

Installare amqplib

Al fine di verificare, siamo tenuti ad avere amqplib installato

npm install amqplib 

Editore (invio. js)

#!/usr/bin/env node 

var amqp = require('amqplib/callback_api'); 

function bail(err, conn) { 
    console.error(err); 
    if (conn) conn.close(function() { process.exit(1); }); 
} 

function on_connect(err, conn) { 
    if (err !== null) return bail(err); 

    // name of queue 
    var q = 'hello'; 
    var msg = 'Hello World!'; 
    var priorityValue = 0; 

    function on_channel_open(err, ch) { 
    if (err !== null) return bail(err, conn); 
    // maxPriority : max priority value supported by queue 
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) { 
     if (err !== null) return bail(err, conn); 

     for(var index=1; index<=100; index++) { 
      priorityValue = Math.floor((Math.random() * 10)); 
      msg = 'Hello World!' + ' ' + index + ' ' + priorityValue; 
      ch.publish('', q, new Buffer(msg), {priority: priorityValue}); 
      console.log(" [x] Sent '%s'", msg); 
     } 

     ch.close(function() { conn.close(); }); 
    }); 
    } 

    conn.createChannel(on_channel_open); 
} 

amqp.connect(on_connect); 

Subscriber (receive.js)

#!/usr/bin/env node 

var amqp = require('amqplib/callback_api'); 

function bail(err, conn) { 
    console.error(err); 
    if (conn) conn.close(function() { process.exit(1); }); 
} 

function on_connect(err, conn) { 
    if (err !== null) return bail(err); 
    process.once('SIGINT', function() { conn.close(); }); 

    var q = 'hello'; 

    function on_channel_open(err, ch) { 
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) { 
     if (err !== null) return bail(err, conn); 
     ch.consume(q, function(msg) { // message callback 
     console.log(" [x] Received '%s'", msg.content.toString()); 
     }, {noAck: true}, function(_consumeOk) { // consume callback 
     console.log(' [*] Waiting for messages. To exit press CTRL+C'); 
     }); 
    }); 
    } 

    conn.createChannel(on_channel_open); 
} 

amqp.connect(on_connect); 

Run:

node send.js 

Si creerà una coda denominata 'ciao' e inonderà con '1000' messaggi di esempio utilizzando lo scambio di default AMQP.

node receive.js 

Si comporterà da utente per abbonarsi ai messaggi in attesa in coda.

0

Un'altra possibilità (per i ricercatori futuri)

Il metodo "Push" della consegna del messaggio non sembra rispettare la priorità.

http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html

Il sotto è una citazione dal URL sopra. Ho sfidato la parte importante.

Recupero messaggi di sottoscrizione ("push API")

Un altro modo per ricevere i messaggi è di istituire un abbonamento utilizzando l'interfaccia IBasicConsumer. I messaggi verranno quindi consegnati automaticamente al loro arrivo, piuttosto che dover essere richiesti in modo proattivo. Un modo per implementare un consumatore è quello di utilizzare l'EventingBasicConsumer classe di convenienza, che invia le consegne ed altri eventi del ciclo di vita dei consumatori, come C# eventi:

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (ch, ea) => 
       { 
        var body = ea.Body; 
        // ... process the message 
        ch.BasicAck(ea.DeliveryTag, false); 
       }; 
String consumerTag = channel.BasicConsume(queueName, false, consumer); 

Cambiando il metodo "pull", la priorità sembra essere rispettato. Tuttavia, nella citazione sotto (dallo stesso URL sopra), sembra che ci sia un trade-off (che ho grassetto)

Fetching singoli messaggi ("pull API") Per recuperare singoli messaggi, utilizzare IModel.BasicGet. Il valore restituito è un'istanza di BasicGetResult, da cui le informazioni di intestazione (proprietà) e corpo del messaggio può essere estratto:

Dal Noack = false sopra, è necessario chiamare IModel.BasicAck a riconoscere che si hanno ricevuto con successo e elaborato il messaggio:

... 
    // acknowledge receipt of the message 
    channel.BasicAck(result.DeliveryTag, false); 
} 

si noti che il recupero messaggi utilizzando questa API è relativamente inefficiente. Se si preferisce RabbitMQ per inviare messaggi al client, vedere la sezione successiva.

(La sezione "prossimo" in questo caso vi porta al metodo "push" nella parte superiore di questo post)