sqlalchemy-continuum: Duplicate key value on many-to-many relationship versioning

I have a many-to-many relationship, here’s the models:

class Membership(BaseModel):
    tenant_id = Column(Integer, ForeignKey('tenant.tenant_id', ondelete='CASCADE'), primary_key=True)
    user_id = Column(Integer, ForeignKey('user.user_id', ondelete='CASCADE'), primary_key=True)

class User(BaseModel):
    user_id = Column(Integer, primary_key=True)

class Tenant(BaseModel):
    tenant_id = Column(Integer, primary_key=True)

When I tried to create the association model, I got psycopg2.IntegrityError error on the versioned membership table.

>> m = Membership(tenant_id=tenant_id, user_id=user_id)
>> db.session.add(m)
>> db.commit()
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
psycopg2.IntegrityError: duplicate key value violates unique constraint "version_membership_pkey"
DETAIL:  Key (tenant_id, user_id, transaction_id)=(1, 1, 2) already exists.

However, it seems to work fine if I use the association table instead of the association object to create this relationship:

membership = db.Table('membership',
                      Column('tenant_id', Integer, ForeignKey('tenant.tenant_id', ondelete='CASCADE'), primary_key=True),
                      Column('user_id', Integer, ForeignKey('user.user_id', ondelete='CASCADE'), primary_key=True))

But I do need association object in my case to support extra columns. Does Continuum actually support versioning use of association object for many-to-many relationship? Or is there any configuration I’m missing here to get this work?

About this issue

  • Original URL
  • State: open
  • Created 6 years ago
  • Reactions: 3
  • Comments: 19 (4 by maintainers)

Most upvoted comments

I didn’t put the kludge I come up with into this thread originally, because it does seem to be a horrible kludge. But, as this issue is still ongoing, maybe it’ll be helpful to someone. It removes the duplicate statements by the hacky method of putting them into a dictionary and using the stringified version of the statement as the key.

Add the replacement UnitOfWork class to your codebase somewhere:

class UnitOfWork(sqlalchemy_continuum.UnitOfWork):
    """ We replace the function we need to patch, otherwise the superclass still does everything
    """

    def create_association_versions(self, session):
        """
        Creates association table version records for given session.
        :param session: SQLAlchemy session object

        """
        # statements = copy(self.pending_statements)

        # Dedupe statements
        temp = { str(st) : st for st in self.pending_statements }

        statements = temp.values()

        for stmt in statements:
            stmt = stmt.values(
                **{
                    self.manager.options['transaction_column_name']:
                    self.current_transaction.id
                }
            )
            session.execute(stmt)
        self.pending_statements = []

Also, right after your call to make_versioned, insert the code to replace the supplied UnitOfWork class with the patched one:

sqlalchemy_continuum.versioning_manager.uow_class = horrible_kludges.UnitOfWork

I am 100% certain that a better solution than this exists and I am by no means advocating for adding this to Continuum. But it does get rid of the issue, at the cost of additional execution time.

I’ve made a small alembic migration script that fixes this issue, for anyone in need

from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils

from sqlalchemy_continuum.version import VersionClassBase
from my_package import db
import inspect


def alter_version_model(klass, conn, metadata):
    table_name = klass.__table__.name
    pk_cols = [c.name for c in klass.__table__.primary_key.columns]

    existing_table = sa.Table(table_name, metadata, autoload_with=conn)
    existing_pk_cols = [c.name for c in existing_table.primary_key.columns]
    pkey = existing_table.primary_key.name

    if sorted(pk_cols) == sorted(existing_pk_cols):
        print('Nothing to do for {}'.format(table_name))
        return

    msg = 'Updating the PK of {} from {} to {}'
    print(msg.format(table_name, existing_pk_cols, pk_cols))

    op.drop_constraint(pkey, table_name)
    op.create_primary_key(pkey, table_name, pk_cols)


def upgrade():
    conn = op.get_bind()
    metadata = sa.schema.MetaData()

    for klass in db.Model._decl_class_registry.values():
        if inspect.isclass(klass) and issubclass(klass, VersionClassBase):
            alter_version_model(klass, conn, metadata)


def downgrade():
    pass

I also confirm that running this migration script fixes my issues.

I just had another look, and I understand better now.

The versioning table was indeed added by an alembic migration.

The Version model created by Continuum correctly flags the primary keys, but as you said, alembic doesn’t pick them up when generating the migration script.

So in short, this issue is indeed generated by alembic (at least, on my end). Thanks for your help @killthekitten

edit: as requested, the migration code in question:

    op.create_table(
        'model1_model2_version',
        sa.Column('model1_id', sa.Integer(), nullable=False),
        sa.Column('model2_id', sa.Integer(), nullable=False),
        sa.Column(
            'transaction_id',
            sa.BigInteger(),
            autoincrement=False,
            nullable=False,
        ), sa.Column('end_transaction_id', sa.BigInteger(), nullable=True),
        sa.Column('operation_type', sa.SmallInteger(), nullable=False),
        sa.PrimaryKeyConstraint('transaction_id'))

I guess I’m using an out or date version of alembic 😃