2009-08-03 10 views
12

SQLAlchemy ha qualcosa di simile al concetto di segnale di Django? Fondamentalmente, mi piacerebbe attivare alcune funzioni quando pre-salva o post-save alcuni oggetti entità. Grazie.in SQLAlchemy

Modifica: Voglio solo l'equivalente di segnali django in SQLAlchemy.

+0

Cosa c'è di sbagliato con semplicemente sovrascrivendo 'save'? –

+0

@ S.Lott Mi dispiace far risorgere un thread antico, ma il modulo dichiarativo SQLAlchemy non ha lo stesso concetto di un metodo 'save' che può essere sovrascritto come l'ORM Django. Per salvare oggetti in SA, li aggiungi a una sessione e quindi svuota la sessione. È possibile * implementare la propria classe base dichiarativa e ottenere lo stesso effetto, comunque. –

+1

@JoeHolloway: Sebbene utile, non spiega perché il tag dice Django e il titolo dice SQLAlchemy. –

risposta

5

Non hai chiarito se stai integrando SQLAlchemy e Django o se vuoi solo l'equivalente dei segnali di django in SQLAlchemy.

Se volete equivalente di Django segnala come post_save, pre_save, pre_delete ecc, vorrei fare riferimento alla pagina,

sqlalchemy.orm.interfaces.MapperExtension

+1

Vedere la risposta di S.C. per "Eventi ORM" per il modo corrente per farlo. Dai documenti SQLAlchemy: "Novità nella versione 0.7: l'evento sostituisce il precedente sistema di classi di" estensione "." –

2

si può prendere in considerazione la sqlalchemy.orm.SessionExtension così

Ecco alcuni codice che ho gettato insieme per impostare un id proprietario su un'istanza e impostare un update_date che ottiene il lavoro svolto in un'app di piloni. la classe OrmExt è dove avviene tutta la magia. E init_model è dove lo colleghi.

import logging 
import sqlalchemy as sa 
from sqlalchemy import orm 

from pylons import session 

import datetime 

log = logging.getLogger(__name__) 

class ORMSecurityException(Exception): 
    ''' 
    thrown for security violations in orm layer 
    ''' 
    pass 

def _get_current_user(): 
    log.debug('getting current user from session...') 
    log.debug(session) 
    return session['user'] 

def _is_admin(user): 
    return False 


def set_update_date(instance): 

    if hasattr(instance,'update_date'): 
    instance.update_date = datetime.datetime.now() 

def set_owner(instance): 
    ''' 
    if owner_id, run it through the rules 
    ''' 
    log.info('set_owner') 
    if hasattr(instance, 'owner_id'): 
    log.info('instance.owner_id=%s' % instance.owner_id) 
    u = _get_current_user() 
    log.debug('user: %s' % u.email) 
    if not u: 
     #anonymous users can't save owned objects 
     raise ORMSecurityException() 
    if instance.owner_id==None: 
     #must be new object thus, owned by current user 
     log.info('setting owner on object %s for user: %s' % (instance.__class__.__name__,u.email)) 
     instance.owner_id = u.id 
    elif instance.owner_id!=u.id and not _is_admin(u): 
     #if owner_id does not match user_id and user is not admin VIOLATION 
     raise ORMSecurityException() 
    else: 
     log.info('object is already owned by this user') 
     return #good to go 
else: 
    log.info('%s is not an owned object' % instance.__class__.__name__) 
    return 

def instance_policy(instance): 
    log.info('setting owner for %s' % instance.__class__.__name__) 
    set_owner(instance) 
    log.info('setting update_date for %s' % instance.__class__.__name__) 
    set_update_date(instance) 


class ORMExt(orm.SessionExtension): 
    ''' 
    attempt at managing ownership logic on objects 
    ''' 
    def __init__(self,policy): 
     self._policy = policy 

    def before_flush(self,sqlsess,flush_context,instances): 
     ''' 
     check all instances for owner_id==user.id 
     ''' 
     try: 
      for instance in sqlsess.deleted: 
       try: 
        log.info('running policy for deleted %s' % instance.__class__.__name__) 
        self._policy(instance) 
       except Exception,ex: 
        log.error(ex) 
        raise ex 

      for instance in sqlsess.new: 
       try: 
        log.info('running policy for new %s' % instance.__class__.__name__) 
        self._policy(instance) 
       except Exception,ex: 
        log.error(ex) 
        raise ex 

      for instance in sqlsess.dirty: 
       try: 
        if sqlsess.is_modified(instance,include_collections=False,passive=True): 
         log.info('running policy for updated %s' % instance.__class__.__name__) 
         self._policy(instance) 
       except Exception, ex: 
        log.error(ex) 
        raise ex 

     except Exception,ex: 
      sqlsess.expunge_all() 
      raise ex 

def init_model(engine): 
    """Call me before using any of the tables or classes in the model""" 
    sm = orm.sessionmaker(autoflush=True, autocommit=True, bind=engine,extension=ORMExt(instance_policy)) 
    meta.engine = engine 
    meta.Session = orm.scoped_session(sm) 
2

Ecco il mio prendere a questo problema, utilizza Louie di inviare segnali:

dispatch.py

""" 
Signals dispatching for SQLAlchemy mappers. 
""" 

import louie 
from sqlalchemy.orm.interfaces import MapperExtension 
import signals 


class LouieDispatcherExtension(MapperExtension): 
    """ 
    Dispatch signals using louie on insert, update and delete actions. 
    """ 

    def after_insert(self, mapper, connection, instance): 
     louie.send(signals.after_insert, instance.__class__, 
       instance=instance) 
     return super(LouieDispatcherExtension, self).after_insert(mapper, 
       connection, instance) 

    def after_delete(self, mapper, connection, instance): 
     louie.send(signals.after_delete, instance.__class__, 
       instance=instance) 
     return super(LouieDispatcherExtension, self).after_delete(mapper, 
       connection, instance) 

    def after_update(self, mapper, connection, instance): 
     louie.send(signals.after_update, instance.__class__, 
       instance=instance) 
     return super(LouieDispatcherExtension, self).after_update(mapper, 
       connection, instance) 

    def before_delete(self, mapper, connection, instance): 
     louie.send(signals.before_delete, instance.__class__, 
       instance=instance) 
     return super(LouieDispatcherExtension, self).before_delete(mapper, 
       connection, instance) 

    def before_insert(self, mapper, connection, instance): 
     louie.send(signals.before_insert, instance.__class__, 
       instance=instance) 
     return super(LouieDispatcherExtension, self).before_insert(mapper, 
       connection, instance) 

    def before_update(self, mapper, connection, instance): 
     louie.send(signals.before_update, instance.__class__, 
       instance=instance) 
     return super(LouieDispatcherExtension, self).before_update(mapper, 
       connection, instance) 

signals.py

from louie import Signal 


class after_delete(Signal): pass 
class after_insert(Signal): pass 
class after_update(Signal): pass 
class before_delete(Signal): pass 
class before_insert(Signal): pass 
class before_update(Signal): pass 

utilizzo Esempio:

class MyModel(DeclarativeBase): 

    __mapper_args__ = {"extension": LouieDispatcherExtension()} 

    ID = Column(Integer, primary_key=True) 
    name = Column(String(255)) 

def on_insert(instance): 
    print "inserted %s" % instance 

louie.connect(on_insert, signals.after_insert, MyModel) 
+0

C'è un problema con tutti gli eventi menzionati, sono già nel mezzo del processo di flush. Quindi, anche se è sufficiente modificare la colonna sull'istanza, non è sufficiente se vuoi compiere azioni più complesse come aggiungere/rimuovere istanze dalla sessione. Tutte quelle modifiche saranno scartate. – Drachenfels

9

penso che siete alla ricerca di `ORM Eventi. È possibile trovare la documentazione qui:

http://docs.sqlalchemy.org/en/latest/orm/events.html

+0

BTW, 'MapperExtension' da altre risposte è [deprecato] (http://docs.sqlalchemy.org/en/rel_0_7/orm/deprecated.html) da SQLAlchemy 0.7. Quindi 'ORM Events' sembra una soluzione adeguata. – HighCat

0

È possibile utilizzare interiore MapperExtension classe:

class YourModel(db.Model): 

    class BaseExtension(MapperExtension): 

     def before_insert(self, mapper, connection, instance): 
      # do something here 

     def before_update(self, mapper, connection, instance): 
      # do something here 

    __mapper_args__ = { 'extension': BaseExtension() } 

    # ....