Ho difficoltà a capire come aprire e chiudere correttamente le sessioni del database in modo efficiente, come ho capito dalla documentazione di sqlalchemy, se uso scoped_session per costruire il mio oggetto Session, e quindi uso la Sessione restituita oggetto per creare sessioni, è thread-safe, quindi in pratica ogni thread otterrà la propria sessione e non ci saranno problemi con esso. Ora l'esempio seguente funziona, l'ho messo in un ciclo infinito per vedere se chiude correttamente le sessioni e se l'ho monitorato correttamente (in mysql eseguendo "SHOW PROCESSLIST;"), le connessioni continuano a crescere, non le chiude , anche se ho usato session.close() e persino rimosso l'oggetto scoped_session alla fine di ogni esecuzione. Che cosa sto facendo di sbagliato? Il mio obiettivo in un'applicazione più ampia è utilizzare il numero minimo di connessioni al database richieste, perché la mia attuale implementazione funzionante crea una nuova sessione in ogni metodo in cui è richiesta e la chiude prima di tornare, il che sembra inefficiente.SQLAlchemy gestione della sessione corretta nelle applicazioni multi-thread
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
Grazie per le informazioni, è stato davvero molto disponibile. Cordiali saluti! – andrean