2012-03-08 13 views
21

Ho difficoltà a capire come aprire e chiudere correttamente le sessioni del database in modo efficiente, come ho capito dalla documentazione di sqlalchemy, se uso scoped_session per costruire il mio oggetto Session, e quindi uso la Sessione restituita oggetto per creare sessioni, è thread-safe, quindi in pratica ogni thread otterrà la propria sessione e non ci saranno problemi con esso. Ora l'esempio seguente funziona, l'ho messo in un ciclo infinito per vedere se chiude correttamente le sessioni e se l'ho monitorato correttamente (in mysql eseguendo "SHOW PROCESSLIST;"), le connessioni continuano a crescere, non le chiude , anche se ho usato session.close() e persino rimosso l'oggetto scoped_session alla fine di ogni esecuzione. Che cosa sto facendo di sbagliato? Il mio obiettivo in un'applicazione più ampia è utilizzare il numero minimo di connessioni al database richieste, perché la mia attuale implementazione funzionante crea una nuova sessione in ogni metodo in cui è richiesta e la chiude prima di tornare, il che sembra inefficiente.SQLAlchemy gestione della sessione corretta nelle applicazioni multi-thread

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

risposta

36

Si dovrebbe essere chiamata solo create_engine e scoped_session una volta per processo (per database). Ognuno avrà il proprio pool di connessioni o sessioni (rispettivamente), quindi si vuole assicurarsi che si stia creando solo un pool. Basta renderlo globale a livello di modulo. se avete bisogno di gestire le sessioni più preciesly di questo, probabilmente non dovrebbe utilizzare scoped_session

Un altro cambiamento di fare è quello di utilizzare DBSession direttamente come se si trattasse di una sessione . chiamando i metodi di sessione su scoped_session in modo trasparente crea una sessione thread-local, se necessario, e inoltra la chiamata del metodo alla sessione .

Un'altra cosa da considerare è il pool_size del pool di connessioni, che è 5 per impostazione predefinita. Per molte applicazioni che va bene, ma se si sta creando un sacco di discussioni, potrebbe essere necessario sintonizzare quel parametro

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

Grazie per le informazioni, è stato davvero molto disponibile. Cordiali saluti! – andrean

Problemi correlati