2015-10-16 10 views
6

io sto cercando di fare un programma per fare un sacco di connessioni web socket al server che ho creato:creare attività asincroni e attendere che tutti loro di completare

class WebSocketClient(): 

    @asyncio.coroutine 
    def run(self): 
     print(self.client_id, 'Connecting') 
     ws = yield from aiohttp.ws_connect(self.url) 
     print(self.client_id, 'Connected') 
     print(self.client_id, 'Sending the message') 
     ws.send_str(self.make_new_message()) 

     while not ws.closed: 
      msg = yield from ws.receive() 

      if msg.tp == aiohttp.MsgType.text: 
       print(self.client_id, 'Received the echo') 
       yield from ws.close() 
       break 

     print(self.client_id, 'Closed') 


@asyncio.coroutine 
def make_clients(): 

    for client_id in range(args.clients): 
     yield from WebSocketClient(client_id, WS_CHANNEL_URL.format(client_id=client_id)).run() 


event_loop.run_until_complete(make_clients()) 

Il problema è che tutti i clienti fanno il loro lavoro uno dopo l'altro:

0 Connecting 
0 Connected 
0 Sending the message 
0 Received the echo 
0 Closed 
1 Connecting 
1 Connected 
1 Sending the message 
1 Received the echo 
1 Closed 
... 

ho cercato di usare asyncio.wait, ma tutti i clienti iniziano insieme. Voglio che vengano creati gradualmente e connessi al server immediatamente dopo aver creato ciascuno di essi. Allo stesso tempo continua a creare nuovi clienti.

Quale approccio devo applicare per realizzare questo?

+0

si potrebbe [utilizzare un semaforo per limitare il numero di connessioni simultanee] (http://stackoverflow.com/a/20722204/4279) – jfs

+0

Si prega di decorare WebSocketClient come ' @ coroutine' –

+0

@AndrewSvetlov si, è decorato - un errore di copia/incolla – warvariuc

risposta

7

Utilizzare asyncio.wait è un buon approccio. È possibile combinare con asyncio.ensure_future e asyncio.sleep per creare attività gradualmente:

@asyncio.coroutine 
def make_clients(nb_clients, delay): 
    futures = [] 
    for client_id in range(nb_clients): 
     url = WS_CHANNEL_URL.format(client_id=client_id) 
     coro = WebSocketClient(client_id, url).run() 
     futures.append(asyncio.ensure_future(coro)) 
     yield from asyncio.sleep(delay) 
    yield from asyncio.wait(futures) 

EDIT: ho implementato una classe FutureSet che dovrebbe fare quello che vuoi. Questo set può essere riempito con i future e li rimuove automaticamente quando hanno finito. È anche possibile attendere il completamento di tutti i futures.

class FutureSet: 

    def __init__(self, maxsize, *, loop=None): 
     self._set = set() 
     self._loop = loop 
     self._maxsize = maxsize 
     self._waiters = [] 

    @asyncio.coroutine 
    def add(self, item): 
     if not asyncio.iscoroutine(item) and \ 
      not isinstance(item, asyncio.Future): 
      raise ValueError('Expecting a coroutine or a Future') 
     if item in self._set: 
      return 
     while len(self._set) >= self._maxsize: 
      waiter = asyncio.Future(loop=self._loop) 
      self._waiters.append(waiter) 
      yield from waiter 
     item = asyncio.async(item, loop=self._loop)  
     self._set.add(item) 
     item.add_done_callback(self._remove) 

    def _remove(self, item): 
     if not item.done(): 
      raise ValueError('Cannot remove a pending Future') 
     self._set.remove(item) 
     if self._waiters: 
      waiter = self._waiters.pop(0) 
      waiter.set_result(None) 

    @asyncio.coroutine 
    def wait(self): 
     return asyncio.wait(self._set) 

Esempio:

@asyncio.coroutine 
def make_clients(nb_clients, limit=0): 
    futures = FutureSet(maxsize=limit) 
    for client_id in range(nb_clients): 
     url = WS_CHANNEL_URL.format(client_id=client_id) 
     client = WebSocketClient(client_id, url) 
     yield from futures.add(client.run()) 
    yield from futures.wait() 
+0

'asyncio.Queue' è una classe * final * non destinata all'ereditarietà. Quindi gli utenti non dovrebbero mai derivare le proprie classi da 'asyncio.Queue' anche se tecnicamente possibile. –

+0

@AndrewSvetlov Immagino che gli utenti potrebbero voler ereditare da 'asyncio.Queue' per creare diversi tipi di code (come' asyncio.PriorityQueue' o 'asyncio.LifoQueue') ma in questo caso sono semplicemente stato pigro: p I liberarsene comunque. – Vincent

+0

No, l'utente non può (almeno non dovrebbe). 'LifoQueue' e' PriorityQueue' sono classi 'asyncio', non destinate all'ereditarietà. Le sole classi 'asyncio' destinate all'ereditarietà sono' Protocol' e famiglia. Lo stato fu pronunciato più volte da Guido van Rossum quando progettammo la biblioteca. –

Problemi correlati