2009-06-24 10 views
8

Volevo scrivere un server a cui un client potesse connettersi e ricevere aggiornamenti periodici senza dover eseguire il polling. Il problema che ho riscontrato con asyncore è che se non si restituisce true quando viene chiamato dispatcher.writable(), si deve attendere fino a quando il tempo di asyncore.loop non è scaduto (il valore predefinito è 30s).Asyncore di Python per inviare periodicamente i dati utilizzando un timeout variabile. C'è un modo migliore?

I due modi in cui ho tentato di aggirare questo problema sono 1) ridurre il timeout a un valore basso o 2) connessioni di query per quando verranno aggiornati e generare un valore di timeout adeguato. Tuttavia, se si fa riferimento a "Select Law" in "man 2 select_tut", si afferma, "Si dovrebbe sempre provare a utilizzare select() senza un timeout."

C'è un modo migliore per farlo? Twisted forse? Volevo provare ed evitare discussioni extra. Io includo l'esempio timeout variabile qui:

#!/usr/bin/python 

import time 
import socket 
import asyncore 


# in seconds 
UPDATE_PERIOD = 4.0 

class Channel(asyncore.dispatcher): 

    def __init__(self, sock, sck_map): 
     asyncore.dispatcher.__init__(self, sock=sock, map=sck_map) 
     self.last_update = 0.0 # should update immediately 
     self.send_buf = '' 
     self.recv_buf = '' 

    def writable(self): 
     return len(self.send_buf) > 0 

    def handle_write(self): 
     nbytes = self.send(self.send_buf) 
     self.send_buf = self.send_buf[nbytes:] 

    def handle_read(self): 
     print 'read' 
     print 'recv:', self.recv(4096) 

    def handle_close(self): 
     print 'close' 
     self.close() 

    # added for variable timeout 
    def update(self): 
     if time.time() >= self.next_update(): 
      self.send_buf += 'hello %f\n'%(time.time()) 
      self.last_update = time.time() 

    def next_update(self): 
     return self.last_update + UPDATE_PERIOD 


class Server(asyncore.dispatcher): 

    def __init__(self, port, sck_map): 
     asyncore.dispatcher.__init__(self, map=sck_map) 
     self.port = port 
     self.sck_map = sck_map 
     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.bind(("", port)) 
     self.listen(16) 
     print "listening on port", self.port 

    def handle_accept(self): 
     (conn, addr) = self.accept() 
     Channel(sock=conn, sck_map=self.sck_map) 

    # added for variable timeout 
    def update(self): 
     pass 

    def next_update(self): 
     return None 


sck_map = {} 

server = Server(9090, sck_map) 
while True: 
    next_update = time.time() + 30.0 
    for c in sck_map.values(): 
     c.update() # <-- fill write buffers 
     n = c.next_update() 
     #print 'n:',n 
     if n is not None: 
      next_update = min(next_update, n) 
    _timeout = max(0.1, next_update - time.time()) 

    asyncore.loop(timeout=_timeout, count=1, map=sck_map) 
+0

Nick: Qual è la piccola modifica per farlo funzionare? Potresti inserire il codice? Grazie –

risposta

4

La "legge di selezione" non si applica al vostro caso, come si deve non solo le attività client-triggered (server puro), ma anche le attività time-triggered - Questo è esattamente ciò che il timeout di selezione è per. Quello che la legge dovrebbe veramente dire è "se specifichi un timeout, assicurati di dover effettivamente fare qualcosa di utile quando arriva il timeout". La legge ha lo scopo di proteggere dall'attesa; il tuo codice non è occupato-aspetta.

Non impostare _timeout sul valore massimo di 0,1 e il tempo di aggiornamento successivo, ma sul massimo di 0,0 e sul timeout successivo. IOW, se un periodo di aggiornamento è scaduto mentre stavi facendo degli aggiornamenti, dovresti fare subito quell'aggiornamento specifico.

Invece di chiedere ad ogni canale ogni volta se desidera aggiornare, è possibile memorizzare tutti i canali in una coda di priorità (ordinati per il prossimo aggiornamento), quindi eseguire solo l'aggiornamento per i primi canali (finché non ne trovi uno il il tempo di aggiornamento non è arrivato). Puoi usare il modulo heapq per quello.

È anche possibile salvare alcune chiamate di sistema non avendo ciascun canale chiedere l'ora corrente, ma solo eseguire il polling dell'ora corrente una volta e passarlo a .update.

1

userei Contorto, molto tempo da quando ho usato asyncore, ma credo che questo dovrebbe essere l'equivalente contorta (non testato, scritto a memoria):

from twisted.internet import reactor, protocol 
import time 

UPDATE_PERIOD = 4.0 

class MyClient(protocol.Protocol): 

    def connectionMade(self): 
     self.updateCall = reactor.callLater(UPDATE_PERIOD, self.update) 

    def connectionLost(self, reason): 
     self.updateCall.cancel() 

    def update(self): 
     self.transport.write("hello %f\n" % (time.time(),)) 

    def dataReceived(self, data): 
     print "recv:", data 


f = protocol.ServerFactory() 
f.protocol = MyClient 

reactor.listenTCP(9090, f) 
reactor.run() 
+0

+1: con codice twistato diventa leggibile e di facile manutenzione. – nosklo

+0

Il codice funziona (piuttosto buono se dalla memoria!) Ma ha bisogno di una piccola modifica per chiamare reactor.callLater() nel metodo update() per inviare il prossimo aggiornamento. Altrimenti si ottiene solo un messaggio e updateCall.cancel() non riuscirà alla disconnessione. Il mio unico problema è che torsione aggiunge una dipendenza extra, ma dovrò pesare questo contro la produttività e la leggibilità reale. –

4

Forse si può fare questo con sched.scheduler, come questo (nb non testato):

import sched, asyncore, time 

# Create a scheduler with a delay function that calls asyncore.loop 
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time())) 

# Add the update timeouts with scheduler.enter 
# ... 

def _poll_loop(timeout, start_time): 
    asyncore.loop(timeout, count=1) 
    finish_time = time.time() 
    timeleft = finish_time - start_time 
    if timeleft > timeout: # there was a message and the timeout delay is not finished 
    _poll_loop(timeleft, finish_time) # so wait some more polling the socket 

def main_loop(): 
    while True: 
    if scheduler.empty(): 
     asyncore.loop(30.0, count=1) # just default timeout, use what suits you 
     # add other work that might create scheduled events here 
    else: 
     scheduler.run() 
+0

Mentre questa risposta è piacevole, si eseguirà presto un RuntimeError con profondità di ricorsione in _poll_loop. Meglio scriverlo senza ricorsione per esempi del mondo reale. ;-) –

2

Questa è fondamentalmente la soluzione di demiurgo con i bordi grezzi arrotondati. Mantiene la sua idea di base, ma impedisce RuntimeErrors e loop occupati ed è testato. [Edit: risolto problemi con la modifica del programma di pianificazione durante _delay]

class asynschedcore(sched.scheduler): 
    """Combine sched.scheduler and asyncore.loop.""" 
    # On receiving a signal asyncore kindly restarts select. However the signal 
    # handler might change the scheduler instance. This tunable determines the 
    # maximum time in seconds to spend in asycore.loop before reexamining the 
    # scheduler. 
    maxloop = 30 
    def __init__(self, map=None): 
     sched.scheduler.__init__(self, time.time, self._delay) 
     if map is None: 
      self._asynmap = asyncore.socket_map 
     else: 
      self._asynmap = map 
     self._abort_delay = False 

    def _maybe_abort_delay(self): 
     if not self._abort_delay: 
      return False 
     # Returning from this function causes the next event to be executed, so 
     # it might be executed too early. This can be avoided by modifying the 
     # head of the queue. Also note that enterabs sets _abort_delay to True. 
     self.enterabs(0, 0, lambda:None,()) 
     self._abort_delay = False 
     return True 

    def _delay(self, timeout): 
     if self._maybe_abort_delay(): 
      return 
     if 0 == timeout: 
      # Should we support this hack, too? 
      # asyncore.loop(0, map=self._asynmap, count=1) 
      return 
     now = time.time() 
     finish = now + timeout 
     while now < finish and self._asynmap: 
      asyncore.loop(min(finish - now, self.maxloop), map=self._asynmap, 
          count=1) 
      if self._maybe_abort_delay(): 
       return 
      now = time.time() 
     if now < finish: 
      time.sleep(finish - now) 

    def enterabs(self, abstime, priority, action, argument): 
     # We might insert an event before the currently next event. 
     self._abort_delay = True 
     return sched.scheduler.enterabs(self, abstime, priority, action, 
             argument) 

    # Overwriting enter is not necessary, because it is implemented using enter. 

    def cancel(self, event): 
     # We might cancel the next event. 
     self._abort_delay = True 
     return sched.scheduler.cancel(self, event) 

    def run(self): 
     """Runs as long as either an event is scheduled or there are 
     sockets in the map.""" 
     while True: 
      if not self.empty(): 
       sched.scheduler.run(self) 
      elif self._asynmap: 
       asyncore.loop(self.maxloop, map=self._asynmap, count=1) 
      else: 
       break 
+0

Ho fatto solo test limitati, ma sembra funzionare alla grande! – konrad

0

Forse non capisco quello che il PO è stato cercando di realizzare, ma ho appena risolto questo problema utilizzando 1 filo che ottiene un weakref di ogni canale (asyncore.dispatcher) oggetto. Questo thread determina il proprio timing e invierà periodicamente al canale un aggiornamento utilizzando una coda in quel canale. Ottiene la coda dall'oggetto Canale chiamando getQueue.

La ragione per cui utilizzo un weakref è perché i client sono transitori. Se il canale muore, allora weakref restituisce None. In questo modo il thread di temporizzazione non mantiene vivi i vecchi oggetti perché li fa riferimento.

So che l'OP voleva evitare i thread, ma questa soluzione è molto semplice.Crea sempre e solo un thread e parla con tutti i canali creati mentre l'oggetto Server li aggiunge all'elenco dei thread degli oggetti da monitorare.

Problemi correlati