2010-11-11 13 views
72

Voglio creare un efficiente circular buffer in python (con l'obiettivo di calcolare la media dei valori interi nel buffer).buffer circolare efficiente?

È un modo efficace di utilizzare un elenco per raccogliere valori?

def add_to_buffer(self, num): 
    self.mylist.pop(0) 
    self.mylist.append(num) 

Cosa sarebbe più efficiente (e perché)?

risposta

137

vorrei usare collections.deque con un maxlen ARG

>>> import collections 
>>> d = collections.deque(maxlen=10) 
>>> d 
deque([], maxlen=10) 
>>> for i in xrange(20): 
...  d.append(i) 
... 
>>> d 
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10) 

C'è un recipe nella documentazione per deque che è simile a ciò che si desidera. La mia affermazione che è il più efficiente riposa interamente sul fatto che è implementato in C da un equipaggio incredibilmente abile che ha l'abitudine di tirare fuori il codice di prima qualità.

+5

+1 Sì, è il modo piacevole batterie incluse. Le operazioni per il buffer circolare sono O (1) e, come dici tu, l'overhead extra è in C, quindi dovrebbe essere abbastanza veloce –

+1

Non mi piace questa soluzione perché i documenti non garantiscono l'accesso casuale a O (1) quando ' maxlen' è definito. O (n) è comprensibile quando il 'deque' può crescere all'infinito, ma se viene dato' maxlen', l'indicizzazione di un elemento dovrebbe essere un tempo costante. – lvella

+0

La mia ipotesi è che sia implementato come elenco collegato e non come array. –

9

popping dalla testa di una lista provoca l'intera lista da copiare, quindi è inefficiente

si dovrebbe invece usare un elenco/array di dimensione fissa e un indice che si muove attraverso il buffer come Aggiungi/Rimuovi articoli

+2

Accetto.Non importa quanto elegante o inelegante possa sembrare o qualunque altra lingua venga utilizzata. In realtà, meno preoccupa il garbage collector (o il gestore dell'heap oi meccanismi di paging/mapping o qualsiasi altra cosa sia la magia della memoria reale), meglio è. –

+0

@RocketSurgeon Non è magico, è solo che è un array il cui primo elemento è cancellato. Quindi per un array di dimensioni n questo significa operazioni di copia n-1. Nessun garbage collector o dispositivo simile è coinvolto qui. – Christian

+2

Sono d'accordo. Fare ciò è anche molto più facile di quanto pensino alcune persone. Basta usare un contatore sempre crescente e utilizzare l'operatore modulo (% arraylen) quando si accede all'elemento. –

5

ok con l'uso di classe deque, ma per i requeriments della questione (media) questa è la mia soluzione:

>>> from collections import deque 
>>> class CircularBuffer(deque): 
...  def __init__(self, size=0): 
...    super(CircularBuffer, self).__init__(maxlen=size) 
...  @property 
...  def average(self): # TODO: Make type check for integer or floats 
...    return sum(self)/len(self) 
... 
>>> 
>>> cb = CircularBuffer(size=10) 
>>> for i in range(20): 
...  cb.append(i) 
...  print "@%s, Average: %s" % (cb, cb.average) 
... 
@deque([0], maxlen=10), Average: 0 
@deque([0, 1], maxlen=10), Average: 0 
@deque([0, 1, 2], maxlen=10), Average: 1 
@deque([0, 1, 2, 3], maxlen=10), Average: 1 
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2 
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2 
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3 
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3 
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4 
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4 
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5 
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6 
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7 
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8 
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9 
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10 
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11 
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12 
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13 
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14 
+0

Ottiene 'TypeError: 'oggetto numpy.float64' non è richiamabile quando si tenta di chiamare il metodo' average' – scls

+4

È per il mio post ?, il codice non usa numpy da nessuna parte. ¿? – SmartElectron

+0

Sì ... infatti suppongo che deque usi internamente gli array numpy (dopo aver rimosso @property funziona bene) – scls

3

Si può anche vedere questo abbastanza vecchio Python recipe.

Ecco la mia versione con array di NumPy:

#!/usr/bin/env python 

import numpy as np 

class RingBuffer(object): 
    def __init__(self, size_max, default_value=0.0, dtype=float): 
     """initialization""" 
     self.size_max = size_max 

     self._data = np.empty(size_max, dtype=dtype) 
     self._data.fill(default_value) 

     self.size = 0 

    def append(self, value): 
     """append an element""" 
     self._data = np.roll(self._data, 1) 
     self._data[0] = value 

     self.size += 1 

     if self.size == self.size_max: 
      self.__class__ = RingBufferFull 

    def get_all(self): 
     """return a list of elements from the oldest to the newest""" 
     return(self._data) 

    def get_partial(self): 
     return(self.get_all()[0:self.size]) 

    def __getitem__(self, key): 
     """get element""" 
     return(self._data[key]) 

    def __repr__(self): 
     """return string representation""" 
     s = self._data.__repr__() 
     s = s + '\t' + str(self.size) 
     s = s + '\t' + self.get_all()[::-1].__repr__() 
     s = s + '\t' + self.get_partial()[::-1].__repr__() 
     return(s) 

class RingBufferFull(RingBuffer): 
    def append(self, value): 
     """append an element when buffer is full""" 
     self._data = np.roll(self._data, 1) 
     self._data[0] = value 
+0

+1 per usare numpy – shx2

+0

+1 per usare numpy, ma -1 per non implementare un buffer circolare. Il modo in cui l'hai implementato, stai spostando tutti i dati ogni volta che aggiungi un singolo elemento, questo costa 'O (n)' tempo. Per implementare un appropriato [buffer circolare] (https://en.wikipedia.org/wiki/Circular_buffer), dovresti avere sia un indice che una variabile di dimensione e devi gestire correttamente il caso quando i dati "si avvolgono" la fine del buffer. Durante il recupero dei dati, potrebbe essere necessario concatenare due sezioni all'inizio e alla fine del buffer. –

1

Questo non richiede alcuna libreria. Cresce una lista e poi cicla dentro per indice.

Il footprint è molto piccolo (nessuna libreria) e funziona due volte più veloce del dequeue almeno. Questo è buono per calcolare le medie mobili, ma bisogna essere consapevoli che gli oggetti non vengono mantenuti ordinati per età come sopra.

class CircularBuffer(object): 
    def __init__(self, size): 
     """initialization""" 
     self.index= 0 
     self.size= size 
     self._data = [] 

    def record(self, value): 
     """append an element""" 
     if len(self._data) == self.size: 
      self._data[self.index]= value 
     else: 
      self._data.append(value) 
     self.index= (self.index + 1) % self.size 

    def __getitem__(self, key): 
     """get element by index like a regular array""" 
     return(self._data[key]) 

    def __repr__(self): 
     """return string representation""" 
     return self._data.__repr__() + ' (' + str(len(self._data))+' items)' 

    def get_all(self): 
     """return a list of all the elements""" 
     return(self._data) 

Per ottenere il valore medio, per es .:

q= CircularBuffer(1000000); 
for i in range(40000): 
    q.record(i); 
print "capacity=", q.size 
print "stored=", len(q.get_all()) 
print "average=", sum(q.get_all())/len(q.get_all()) 

Risultati in:

capacity= 1000000 
stored= 40000 
average= 19999 

real 0m0.024s 
user 0m0.020s 
sys 0m0.000s 

Si tratta di circa 1/3 il tempo della equivalente con dequeue.

0

La domanda originale era: "efficiente" buffer circolare. Secondo questa efficienza richiesta, la risposta di aaronasterling sembra essere definitivamente corretta. Utilizzando una classe dedicata programmata in Python e confrontando l'elaborazione del tempo con collections.deque mostra un'accelerazione x5.2 volte con deque! Ecco codice molto semplice per testare questo:

class cb: 
    def __init__(self, size): 
     self.b = [0]*size 
     self.i = 0 
     self.sz = size 
    def append(self, v): 
     self.b[self.i] = v 
     self.i = (self.i + 1) % self.sz 

b = cb(1000) 
for i in range(10000): 
    b.append(i) 
# called 200 times, this lasts 1.097 second on my laptop 

from collections import deque 
b = deque([], 1000) 
for i in range(10000): 
    b.append(i) 
# called 200 times, this lasts 0.211 second on my laptop 

per trasformare una coda doppia in un elenco, basta usare:

my_list = [v for v in my_deque] 

Sarà quindi ottenere O (1) accesso casuale agli elementi deque. Naturalmente, questo è utile solo se è necessario eseguire molti accessi casuali al deque dopo averlo impostato una volta.

4

Basato su MoonCactus's answer, qui è una classe circularlist. La differenza con la sua versione è che qui c[0] fornisce sempre l'elemento più vecchio, c[-1] l'ultimo elemento aggiunto, c[-2] penultimo ... Questo è più naturale per le applicazioni.

c = circularlist(4) 
c.append(1); print c, c[0], c[-1] #[1] (1 items)    first 1, last 1 
c.append(2); print c, c[0], c[-1] #[1, 2] (2 items)   first 1, last 2 
c.append(3); print c, c[0], c[-1] #[1, 2, 3] (3 items)  first 1, last 3 
c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] (4 items)  first 1, last 8 
c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] (4 items) first 2, last 10 
c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] (4 items) first 3, last 11 

Classe:

class circularlist(object): 
    def __init__(self, size): 
     """Initialization""" 
     self.index = 0 
     self.size = size 
     self._data = [] 

    def append(self, value): 
     """Append an element""" 
     if len(self._data) == self.size: 
      self._data[self.index] = value 
     else: 
      self._data.append(value) 
     self.index = (self.index + 1) % self.size 

    def __getitem__(self, key): 
     """Get element by index, relative to the current index""" 
     if len(self._data) == self.size: 
      return(self._data[(key + self.index) % self.size]) 
     else: 
      return(self._data[key]) 

    def __repr__(self): 
     """Return string representation""" 
     return self._data.__repr__() + ' (' + str(len(self._data))+' items)' 
1

Ho avuto questo problema prima di fare programmazione seriale. All'epoca, poco più di un anno fa, non sono riuscito a trovare alcuna implementazione efficiente, quindi ho finito per scrivere one as a C extension ed è anche disponibile on pypi con una licenza MIT. È di base, gestisce solo buffer di caratteri con firma a 8 bit, ma è di lunghezza flessibile, quindi puoi usare Struct o qualcosa sopra se hai bisogno di qualcosa di diverso dai caratteri. Ora vedo con una ricerca su google che ci sono diverse opzioni in questi giorni, quindi potresti voler guardare anche a quelle.

0

La risposta non è giusta. circolare principale tampone sono due priciples (https://en.wikipedia.org/wiki/Circular_buffer)

  1. Il lenth del buffer è impostato;
  2. Primo nella prima uscita;
  3. Quando si aggiunge o si elimina un elemento, gli altri oggetti non dovrebbero spostare la loro posizione

il codice qui sotto:

def add_to_buffer(self, num): 
    self.mylist.pop(0) 
    self.mylist.append(num) 

Consideriamo una situazione che la lista è piena, per utilizzare il codice :

self.mylist = [1, 2, 3, 4, 5] 

ora aggiungiamo 6, l'elenco viene modificato in

self.mylist = [2, 3, 4, 5, 6] 

le voci si aspettano 1 nella lista ha cambiato la loro posizione

il codice è una coda, non è un buffer cerchio.

La risposta di Basj, penso sia la più efficace.

A proposito, un buffer circolare può imporvi le prestazioni dell'operazione per aggiungere un elemento.

0

Questo applica lo stesso principio ad alcuni buffer destinati a contenere i messaggi di testo più recenti.

import time 
import datetime 
import sys, getopt 

class textbffr(object): 
    def __init__(self, size_max): 
     #initialization 
     self.posn_max = size_max-1 
     self._data = [""]*(size_max) 
     self.posn = self.posn_max 

    def append(self, value): 
     #append an element 
     if self.posn == self.posn_max: 
      self.posn = 0 
      self._data[self.posn] = value 
     else: 
      self.posn += 1 
      self._data[self.posn] = value 

    def __getitem__(self, key): 
     #return stored element 
     if (key + self.posn+1) > self.posn_max: 
      return(self._data[key - (self.posn_max-self.posn)]) 
     else: 
      return(self._data[key + self.posn+1]) 


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max): 
     stored = bffr[ind] 
     if stored != "": 
      print(stored) 
    print ('\n') 

def make_time_text(time_value): 
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2) 
     + str(time_value.hour).zfill(2) + str(time_value.minute).zfill(2) 
     + str(time_value.second).zfill(2)) 


def main(argv): 
    #Set things up 
    starttime = datetime.datetime.now() 
    log_max = 5 
    status_max = 7 
    log_bffr = textbffr(log_max) 
    status_bffr = textbffr(status_max) 
    scan_count = 1 

    #Main Loop 
    # every 10 secounds write a line with the time and the scan count. 
    while True: 

     time_text = make_time_text(datetime.datetime.now()) 
     #create next messages and store in buffers 
     status_bffr.append(str(scan_count).zfill(6) + " : Status is just fine at : " + time_text) 
     log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ") 

     #print whole buffers so far 
     print_bffr(log_bffr,log_max) 
     print_bffr(status_bffr,status_max) 

     time.sleep(2) 
     scan_count += 1 

if __name__ == '__main__': 
    main(sys.argv[1:]) 
Problemi correlati