Ho implementato l'esempio di ZMQ di ultima cache (LVV) (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching), ma non è possibile ottenere un 2 ° sottoscrittore da registrare nel back-end.ZMQ: nessun messaggio di abbonamento sul socket XPUB per più abbonati (modello di cache dell'ultimo valore)
La prima volta che un abbonato sale a bordo, la condizione event[0] == b'\x01'
è soddisfatta e il valore memorizzato nella cache viene inviato, ma il secondo abbonato (stesso argomento) non ha nemmeno registro (if backend in events:
non è mai vero). Tutto il resto funziona bene. I dati vengono passati dall'editore agli abbonati (tutti).
Quale potrebbe essere la ragione di questo? Il modo in cui il back-end è collegato è corretto? Questo schema dovrebbe funzionare solo con il primo abbonato?
Aggiornamento
Quando mi iscrivo al secondo abbonato a un altro argomento, ottengo il giusto comportamento (vale a dire \x01
all'atto della sottoscrizione). Sembra davvero funzionare per il primo iscritto onlt. C'è un bug in ZeroMQ?
Aggiorna 2
Ecco un esempio di lavoro minima che mostra che il modello LVC è non lavoro (almeno non il modo in cui è implementato qui).
# subscriber.py
import zmq
def main():
ctx = zmq.Context.instance()
sub = ctx.socket(zmq.SUB)
sub.connect("tcp://127.0.0.1:5558")
# Subscribe to every single topic from publisher
print 'subscribing (sub side)'
sub.setsockopt(zmq.SUBSCRIBE, b"my-topic")
poller = zmq.Poller()
poller.register(sub, zmq.POLLIN)
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
print("interrupted")
break
# Any new topic data we cache and then forward
if sub in events:
msg = sub.recv_multipart()
topic, current = msg
print 'received %s on topic %s' % (current, topic)
if __name__ == '__main__':
main()
E qui c'è il broker (come nell'esempio, ma con un po 'più di verbosità e un editore integrato).
# broker.py
# from http://zguide.zeromq.org/py:lvcache
import zmq
import threading
import time
class Publisher(threading.Thread):
def __init__(self):
super(Publisher, self).__init__()
def run(self):
time.sleep(10)
ctx = zmq.Context.instance()
pub = ctx.socket(zmq.PUB)
pub.connect("tcp://127.0.0.1:5557")
cnt = 0
while True:
msg = 'hello %d' % cnt
print 'publisher is publishing %s' % msg
pub.send_multipart(['my-topic', msg])
cnt += 1
time.sleep(5)
def main():
ctx = zmq.Context.instance()
frontend = ctx.socket(zmq.SUB)
frontend.bind("tcp://*:5557")
backend = ctx.socket(zmq.XPUB)
backend.bind("tcp://*:5558")
# Subscribe to every single topic from publisher
frontend.setsockopt(zmq.SUBSCRIBE, b"")
# Store last instance of each topic in a cache
cache = {}
# We route topic updates from frontend to backend, and
# we handle subscriptions by sending whatever we cached,
# if anything:
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
# launch a publisher
p = Publisher()
p.daemon = True
p.start()
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
print("interrupted")
break
# Any new topic data we cache and then forward
if frontend in events:
msg = frontend.recv_multipart()
topic, current = msg
cache[topic] = current
backend.send_multipart(msg)
### this is where it fails for the 2nd subscriber.
### There's never even an event from the backend
### in events when the 2nd subscriber is subscribing.
# When we get a new subscription we pull data from the cache:
if backend in events:
print 'message from subscriber'
event = backend.recv()
# Event is one byte 0=unsub or 1=sub, followed by topic
if event[0] == b'\x01':
topic = event[1:]
print ' => subscribe to %s' % topic
if topic in cache:
print ("Sending cached topic %s" % topic)
backend.send_multipart([ topic, cache[topic] ])
elif event[0] == b'\x00':
topic = event[1:]
print ' => unsubscribe from %s' % topic
if __name__ == '__main__':
main()
L'esecuzione di questo codice (1 x broker.py
, 2 x subscriber.py
) mostra che il primo abbonato registra presso il broker come previsto (\x01
e ricerca nella cache), ma il secondo abbonato non viene registrata nello stesso modo. È interessante notare che il 2 ° abbonato è collegato al pub/canale secondario, poiché dopo un po '(10 secondi) entrambi gli abbonati ricevono i dati dall'editore.
Questo è molto strano. Forse alcune delle mie librerie sono obsolete. Ecco quello che ho ottenuto:
Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> zmq.__version__
'14.1.1'
$ brew info zeromq
zeromq: stable 4.0.5 (bottled), HEAD
High-performance, asynchronous messaging library
http://www.zeromq.org/
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) *
Poured from bottle
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb
==> Dependencies
Build: pkg-config ✔
Optional: libpgm ✘, libsodium ✘
Update 3
Questo comportamento può anche essere osservata in zeromq 4.1.2
e pyzmq-14.7.0
(con o senza libpgm e libsodium installato).
Update 4
Un'altra osservazione suggerisce che il primo abbonato è in qualche modo gestito in modo diverso: il primo abbonato è l'unico annullamento dell'iscrizione nel modo previsto dalla presa XPUB
(backend
) precedendo il suo argomento abbonamento con \x00
. Gli altri abbonati (ho provato più di 2) sono rimasti silenziati sul canale di backend (sebbene ricevessero messaggi).
Update 5
spero non mi va giù un buco di coniglio, ma ho guardato negli czmq
attacchi e corsi mio esempio Python in C.I risultati sono gli stessi, quindi suppongo che non sia un problema con i binding, ma con libzmq
.
ho anche verificato che il 2 ° abbonato sta inviando un messaggio Iscriviti e anzi posso vedere questo sul filo:
sottoscrivere:
0000 02 00 00 00 45 00 00 3f 98 be 40 00 40 06 00 00 ....E..? [email protected]@...
0010 7f 00 00 01 7f 00 00 01 fa e5 15 b6 34 f0 51 c3 ........ ....4.Q.
0020 05 e4 8b 77 80 18 31 d4 fe 33 00 00 01 01 08 0a ...w..1. .3......
0030 2a aa d1 d2 2a aa cd e9 00 09 01 6d 79 2d 74 6f *...*... ...my-to
0040 70 69 63 pic
2 ° sottoscrivere un messaggio con la differenza (sopra) ha segnato e spiegato. Gli stessi dati vengono inviati nel frame di sottoscrizione.
identification
v
0000 02 00 00 00 45 00 00 3f ed be 40 00 40 06 00 00 ....E..? [email protected]@...
src port sequence number
v v v v v
0010 7f 00 00 01 7f 00 00 01 fa e6 15 b6 17 da 02 e7 ........ ........
Acknowledgement number window scaling factor
v v v v v
0020 71 4b 33 e6 80 18 31 d5 fe 33 00 00 01 01 08 0a qK3...1. .3......
timestamp value timestamp echo reply
v v v |<-------- data -------
0030 2a aa f8 2c 2a aa f4 45 00 09 01 6d 79 2d 74 6f *..,*..E ...my-to
------>|
0040 70 69 63 pic