2012-03-08 13 views
21

mam problemy ze zrozumieniem, jak prawidłowo otwierać i zamykać sesje bazodanowe sprawnie, jak zrozumiałem w dokumentacji sqlalchemy, jeśli mogę użyć scoped_session skonstruować mojego obiektu Session, a następnie użyj wracającą Session obiekt do tworzenia sesji, jest bezpieczny dla wątków, więc w zasadzie każdy wątek dostanie własną sesję i nie będzie z tym problemów. Teraz poniższy przykład działa, umieszczam go w nieskończonej pętli, aby zobaczyć, czy poprawnie zamyka sesje i jeśli monitorowałem je poprawnie (w mysql, wykonując "POKAŻ LICENCJĘ PROCESOWĄ;), połączenia właśnie rosną, nie zamyka ich , mimo że użyłem session.close(), a nawet usunę obiekt scoped_session pod koniec każdego uruchomienia. Co ja robię źle? Moim celem w większej aplikacji jest użycie minimalnej liczby wymaganych połączeń z bazą danych, ponieważ moja obecna działająca implementacja tworzy nową sesję w każdej metodzie, w której jest wymagana i zamyka ją przed powrotem, co wydaje się nieefektywne.SQLAlchemy Prawidłowa obsługa sesji w aplikacjach wielowątkowych

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

Odpowiedz

36

Należy dzwonić tylko create_engine i scoped_session raz na procesu (na bazie). Każdy dostanie swój własny basen połączeń lub sesji (odpowiednio), więc chcesz, aby upewnić się, że tworzysz tylko jeden basen. Po prostu uczyń go globalnym modułem. jeśli trzeba więcej niż preciesly zarządzać sesje, prawdopodobnie nie powinien być używany scoped_session

Kolejna zmiana zrobić jest użycie DBSession bezpośrednio jakby to była sesja . Wywoływanie metod sesji na scoped_session będzie przejrzysty utworzyć sesji wątek lokalny, jeśli to konieczne, i przekazuje wywołanie metody do sesji .

Inną rzeczą, aby mieć świadomość jest pool_size z puli połączeń, który jest 5 domyślnie. Dla wielu aplikacji, które jest w porządku, ale jeśli tworzysz wiele wątków, może być konieczne, aby dostroić że parametr

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

Dziękuję za informacje, to było rzeczywiście bardzo pomocne. Pozdrawiam! – andrean

Powiązane problemy