SQLAlchemyのモデル定義を複数のDBに対応させる

最近PythonのORMのSQLAlchemyを使っています。SQLAlchemyはおそらくPythonの中でも最もよく使われているORMで、Wikipediaによると最初のリリースが2006年なので、かなり歴史があるライブラリです。今回、このSQLAlchemyのモデル定義を複数のDBで切り替えて使ってました。備忘録として記事にしておきます。

SQLAlchemyの接続方法

SQLAlchemyは今まで使っていたActiveRecord系のORMよりもっとDB的で、たとえばsessionを具体的に記述してDB操作する形になっています。また加えて、実際のDBをengineという形でインスタン化しそれをsessionに紐付けて利用します。なので、たとえばこんな記述をします。

まず下記がモデル定義です。

import sqlalchemy as sa
from sqlalchemy import orm as orm
from sqlalchemy import Column, ForeignKey, Integer, String, DateTime, func
from sqlalchemy.ext.declarative import declarative_base

metadata = sa.MetaData()
Base = declarative_base(metadata=metadata)


class Company(Base):
    __tablename__ = 'companies'

    # Columns
    id = Column(Integer, primary_key=True)
    name = Column(String)
    employee_count = Column(Integer)

    created_at = Column('created_at', DateTime, default=func.now())
    updated_at = Column('updated_at', DateTime, default=func.now(), onupdate=func.now())

    # Relationships
    employees = orm.relationship('Employee', back_populates='company')


class Employee(Base):
    __tablename__ = 'employees'

    # Columns
    id = Column(Integer, primary_key=True)
    company_id = Column(Integer, ForeignKey('companies.id'), nullable=True)
    name = Column(String)

    created_at = Column('created_at', DateTime, default=func.now())
    updated_at = Column('updated_at', DateTime, default=func.now(), onupdate=func.now())

    # Relationships
    company = orm.relationship('Company', back_populates='employees', uselist=False)

そして下記のコードで、上記のモデル定義を元に実際のDBのengineを作成し、Sessionの設定を行った上でレコードを作成しています。

import os
import sqlalchemy
from sqlalchemy.orm import sessionmaker

from app import db

engine1 = sqlalchemy.create_engine('sqlite:///sqlite1.db', echo=False) # engine作成
db.metadata.bind = engine1
db.Base.metadata.create_all() # テーブル作成
db.metadata.bind = None # テーブル作成後はなくても問題なさそう
Session1 = sessionmaker()
Session1.configure(bind=engine1, expire_on_commit=False, autoflush=False) # sessionとengineを設定上紐付ける
session1 = Session1() # sessionを作成

comppany1_1 = db.Company(name='company1_1')
session1.add(comppany1_1)
comppany1_1.employees = [
    db.Employee(name='employee1_1'),
    db.Employee(name='employee1_2'),
]
session1.commit()
comppany1_1 = session1.query(db.Company).filter_by(name='company1_1').first()

このようにsessionを使ってDB操作をするプログラミングモデルになっているため、sessionの接続先を切り替えてやれば同じモデル定義をいろんなDBに適用できるという形になります。

実際に複数の接続先で使ってみる

先程のコードを拡張して、複数のSQLiteでDB操作していみたいと思います。

import os
import sqlalchemy
from sqlalchemy.orm import sessionmaker

from app import db

try:
    os.unlink('sqlite1.db')
    os.unlink('sqlite2.db')
except Exception as ex:
    pass

engine1 = sqlalchemy.create_engine('sqlite:///sqlite1.db', echo=False)
db.metadata.bind = engine1
db.Base.metadata.create_all()
db.metadata.bind = None
Session1 = sessionmaker()
Session1.configure(bind=engine1, expire_on_commit=False, autoflush=False)
session1 = Session1()

engine2 = sqlalchemy.create_engine('sqlite:///sqlite2.db', echo=False)
db.metadata.bind = engine2
db.Base.metadata.create_all()
db.metadata.bind = None
Session2 = sessionmaker()
Session2.configure(bind=engine2, expire_on_commit=False, autoflush=False)
session2 = Session2()

comppany1_1 = db.Company(name='company1_1')
session1.add(comppany1_1)
comppany1_1.employees = [
    db.Employee(name='employee1_1'),
    db.Employee(name='employee1_2'),
]
session1.commit()
comppany1_1 = session1.query(db.Company).filter_by(name='company1_1').first()

comppany2_1 = db.Company(name='company2_1')
session2.add(comppany2_1)
comppany2_1.employees = [
    db.Employee(name='employee2_1'),
    db.Employee(name='employee2_2'),
]
session2.commit()
comppany2_1 = session2.query(db.Company).filter_by(name='company2_1').first()

上記で2つのSQLiteのDBで更新・参照操作が別個に可能です。

engineによってListenerの設定を変えたい時にどうするか?

ちょっとニッチな要望だと思うのですが、ぼくの場合は上記のように複数のengineに対応させるだけでなく、engineによって設定するListenerを変えたりしたくなりました。Listenerとはたとえばテーブルの特定のカラムが更新された場合のフックを起動させる仕組みで、カウンターキャッシュカラムを更新したりするのに使える仕組みです。似た仕組みではLaravelだとObserver、RailsだとRails::Observersがあります。

で、このListenerなんですが、少し困ったことにモデルのクラスに対して設定するんですよね。たとえば下記のような形になります。

event.listen(db.Employee, 'after_insert', self.__after_insert)

この場合どうなるかと言うと、このListenerの設定はクラスレベルで有効になるので、上記でせっかくengineを分けてDB操作はできてるんですが、このListenerの設定はengineを跨いで有効になります。なので、どのengineでもこのListenerが走ることになります。本来はおそらくそれで問題ないと思うんですが、少しニッチなケースとして今回さらにListener設定をどのように分けられるかを考えてみることにしました。

下記に解決策を書いておきます。ただ、あまりおすすめじゃないかも。。

(解決策)動的にクラスを作成する

Listenerがクラスに設定されるのは避けようがない事実なので、結局動的にモデルのクラスを作成するという事になりそうです。作成してみたサンプルは下記のような形でモデルを拡張の上動的に定義しています。

import os
import sqlalchemy
from sqlalchemy.orm import sessionmaker

from app import db, listener_registers

try:
    os.unlink('sqlite1.db')
    os.unlink('sqlite2.db')
except Exception as ex:
    pass


Company1 = type('Company1', (db.Company,), {})
Employee1 = type('Employee1', (db.Employee,), {})
db.setup_relationships(company_class=Company1, employee_class=Employee1)

engine1 = sqlalchemy.create_engine('sqlite:///sqlite1.db', echo=False)
db.metadata.bind = engine1
db.Base.metadata.create_all()
db.metadata.bind = None
Session1 = sessionmaker()
Session1.configure(bind=engine1, expire_on_commit=False, autoflush=False)
session1 = Session1()

Company2 = type('Company2', (db.Company,), {})
Employee2 = type('Employee2', (db.Employee,), {})
db.setup_relationships(company_class=Company2, employee_class=Employee2)

engine2 = sqlalchemy.create_engine('sqlite:///sqlite2.db', echo=False)
db.metadata.bind = engine2
db.Base.metadata.create_all()
db.metadata.bind = None
Session2 = sessionmaker()
Session2.configure(bind=engine2, expire_on_commit=False, autoflush=False)
session2 = Session2()

registerer1 = listener_registers.ListenersRegisterer(sessionmaker=Session1, employee_class=Employee1)
registerer1.up()

comppany1_1 = Company1(name='company1_1')
session1.add(comppany1_1)
comppany1_1.employees = [
    Employee1(name='employee1_1'),
    Employee1(name='employee1_2'),
]
session1.commit()
comppany1_1 = session1.query(db.Company).filter_by(name='company1_1').first()
print('company.employee_count = {}'.format(comppany1_1.employee_count))

comppany2_1 = Company2(name='company2_1')
session2.add(comppany2_1)
comppany2_1.employees = [
    Employee2(name='employee2_1'),
    Employee2(name='employee2_2'),
]
session2.commit()
comppany2_1 = session2.query(db.Company).filter_by(name='company2_1').first()
print('company.employee_count = {}'.format(comppany2_1.employee_count))

ポイントは下記の部分です。このように動的にモデル定義を作成し、

Company1 = type('Company1', (db.Company,), {})
Employee1 = type('Employee1', (db.Employee,), {})
db.setup_relationships(company_class=Company1, employee_class=Employee1)

registerer1 = listener_registers.ListenersRegisterer(sessionmaker=Session1, employee_class=Employee1)
registerer1.up()

下記のように動的に生成されたクラスを引数にとって、それにListenerを設定するようにしています。

class ListenersRegisterer:

    def __init__(self, sessionmaker, employee_class):
        self.Session = sessionmaker
        self.employee_class = employee_class

    def up(self):
        try:
            # Prevent duplicated registration
            self.down()
        except InvalidRequestError as ex:
            pass

        event.listen(self.employee_class, 'after_insert', self.__after_insert)

かなりオレオレ感が出てしまいそうなので、実際に使う場合には注意が必要そうです。ぼくもプロダクションでは使ってません。
なお、いちおうサンプルとしてリポジトリにまとめてあります。ご興味があれば動かしてみて下さい。