主页 > 其他  > 

sqlalchemy事务自动控制(类javaaop)


最近使用它交互数据库,想实现类似java aop那种自动事务控制,不用手动commit或者rollback。我是用的是flask+denpendency-injecter

 这是我的db的配置类,里面会初始化一些session配置,里面比较重要的是把autocommit和autoflush关闭了,因为我们的代码会来处理这个,还有就是把expire_on_commit设置为flase,否则你commit之后,再取用某个entity就会报错了,例如你新建了一个entity,这个时候会更新他的id,返回给前端的时候就会报错了(Error Messages — SQLAlchemy 2.0 Documentation)。

"""Database module.""" from contextlib import contextmanager, AbstractContextManager from typing import Callable from sqlalchemy import create_engine, orm from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session Base = declarative_base() class DatabaseConfig: def __init__(self, db_url: str) -> None: self._engine = create_engine(db_url, echo=True) self._session_factory = orm.scoped_session( orm.sessionmaker( autocommit=False, autoflush=False, expire_on_commit=False, bind=self._engine, ), ) def create_database(self) -> None: Base.metadata.create_all(self._engine) @contextmanager def session(self) -> Callable[..., AbstractContextManager[Session]]: session: Session = self._session_factory() try: yield session except Exception: session.rollback() raise else: if session._transaction.is_active: session.commit() session.close()

然后comtextmanger里面就是我们的处理代码了,我们主要依靠with代码块来控制,在yield之前的属于__init__,在yield之后属于__exit__,也就是当with代码块结束之前,如果发生任何报错,我们都会进行rollback操作,并且raise(这部分需要error handler来做了,这里就不赘述了),然后如果什么错误都没有发生,就检测transaction是否还是active,如果是就commit,然后关闭session。

 然后在Container中注入session contextmanager。

class Container(containers.DeclarativeContainer): wiring_config = containers.WiringConfiguration(packages=[ "main" ]) config = providers.Configuration(yaml_files=["config.yml"]) db=providers.Singleton(DatabaseConfig,db_url=config.db.url) user_repository = providers.Factory( UserRepositoryImpl ) user_service = providers.Factory( UserService, user_repository=user_repository, session_factory=db.provided.session )

然后再service层使用with代码块控制transation ,整个逻辑包含在同一个with中就行了。

class UserService: @inject def __init__(self, user_repository: UserRepository, session_factory: Callable[..., AbstractContextManager[Session]]) -> None: self._repository: UserRepository = user_repository self.session_factory=session_factory def create_user(self,user) -> User: with self.session_factory() as session: return self._repository.add( session=session, user=user )

然后在repo里面写具体代码就行了

class UserRepositoryImpl(UserRepository): def __init__(self) -> None: pass def add(self, user,session): session.add(user) return user

关于测试怎么写,可以看我篇blog,里面也会讲到测试类的事务控制:Flask+ Dependency-injecter+pytest 写测试类-CSDN博客

标签:

sqlalchemy事务自动控制(类javaaop)由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“sqlalchemy事务自动控制(类javaaop)