Ho un modello basato su agente, in cui diversi agenti sono avviati da un processo centrale e comunicano tramite un altro processo centrale. Ogni agente e il processo di comunicazione comunicano tramite zmq. Tuttavia, quando inizio a più di 100 agenti standard_out invia:multiprocessing di zeromq e python, troppi file aperti
Invalid argument (src/stream_engine.cpp: 143) Troppi file aperti (src/ipc_listener.cpp: 292)
e Mac Os richiede una segnalazione del problema:
Python si è chiuso improvvisamente mentre utilizzava il plug-in libzmq.5.dylib.
Il problema mi sembra che siano aperti troppi contesti. Ma come posso evitarlo con il multiprocessing?
I allegare una parte del codice qui sotto:
class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
def __init__(self, idn, group, _addresses, trade_logging):
multiprocessing.Process.__init__(self)
....
def run(self):
self.context = zmq.Context()
self.commands = self.context.socket(zmq.SUB)
self.commands.connect(self._addresses['command_addresse'])
self.commands.setsockopt(zmq.SUBSCRIBE, "all")
self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out = self.context.socket(zmq.PUSH)
self.out.connect(self._addresses['frontend'])
time.sleep(0.1)
self.database_connection = self.context.socket(zmq.PUSH)
self.database_connection.connect(self._addresses['database'])
time.sleep(0.1)
self.logger_connection = self.context.socket(zmq.PUSH)
self.logger_connection.connect(self._addresses['logger'])
self.messages_in = self.context.socket(zmq.DEALER)
self.messages_in.setsockopt(zmq.IDENTITY, self.name)
self.messages_in.connect(self._addresses['backend'])
self.shout = self.context.socket(zmq.SUB)
self.shout.connect(self._addresses['group_backend'])
self.shout.setsockopt(zmq.SUBSCRIBE, "all")
self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out.send_multipart(['!', '!', 'register_agent', self.name])
while True:
try:
self.commands.recv() # catches the group adress.
except KeyboardInterrupt:
print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
break
command = self.commands.recv()
if command == "!":
subcommand = self.commands.recv()
if subcommand == 'die':
self.__signal_finished()
break
try:
self._methods[command]()
except KeyError:
if command not in self._methods:
raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
else:
raise
except KeyboardInterrupt:
print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
break
if command[0] != '_':
self.__reject_polled_but_not_accepted_offers()
self.__signal_finished()
#self.context.destroy()
l'intero codice è sotto http://www.github.com/DavoudTaghawiNejad/abce
[Ecco alcune informazioni sul controllo e la modifica del limite FD] (http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/). – Jason