12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- """Update data sources
- Revision ID: c349f52c700d
- Revises: ad98460751d9
- Create Date: 2023-12-14 10:31:02.612590
- """
- from alembic import op
- import sqlalchemy as sa
- from sqlalchemy.dialects import postgresql
- # revision identifiers, used by Alembic.
- revision = "c349f52c700d"
- down_revision = "ad98460751d9"
- branch_labels = None
- depends_on = None
- def upgrade():
- # The name of the data_source should be 120 String, this was not correctly set in an earlier revision of the db.
- op.execute(
- sa.text("UPDATE data_source SET attributes = '{}' WHERE attributes IS NULL;")
- )
- with op.batch_alter_table("data_source", schema=None) as batch_op:
- batch_op.alter_column(
- "name",
- existing_type=sa.VARCHAR(length=80),
- type_=sa.String(length=120),
- existing_nullable=False,
- )
- # The attributes were initially set as nullable=False but the migration file did not reflect that.
- # In this migration the model and db are brought in line.
- batch_op.alter_column(
- "attributes",
- existing_type=postgresql.JSON(astext_type=sa.Text()),
- nullable=False,
- )
- # This constraint is renamed to include the full name of the `data_source` table.
- if foreign_key_exists("timed_belief", "timed_belief_source_id_source_fkey"):
- with op.batch_alter_table("timed_belief", schema=None) as batch_op:
- batch_op.drop_constraint(
- "timed_belief_source_id_source_fkey", type_="foreignkey"
- )
- batch_op.create_foreign_key(
- batch_op.f("timed_belief_source_id_data_source_fkey"),
- "data_source",
- ["source_id"],
- ["id"],
- )
- else:
- # already renamed
- assert foreign_key_exists(
- "timed_belief", "timed_belief_source_id_data_source_fkey"
- )
- def downgrade():
- with op.batch_alter_table("timed_belief", schema=None) as batch_op:
- batch_op.drop_constraint(
- batch_op.f("timed_belief_source_id_data_source_fkey"), type_="foreignkey"
- )
- batch_op.create_foreign_key(
- "timed_belief_source_id_source_fkey",
- "data_source",
- ["source_id"],
- ["id"],
- ondelete="CASCADE",
- )
- with op.batch_alter_table("data_source", schema=None) as batch_op:
- batch_op.alter_column(
- "attributes",
- existing_type=postgresql.JSON(astext_type=sa.Text()),
- nullable=True,
- )
- batch_op.alter_column(
- "name",
- existing_type=sa.String(length=120),
- type_=sa.VARCHAR(length=80),
- existing_nullable=False,
- )
- def foreign_key_exists(table_name, fk_name) -> bool:
- # Get the current connection
- connection = op.get_bind()
- # Create an Inspector
- insp = sa.inspect(connection)
- # Get the foreign keys for the specified table
- foreign_keys = insp.get_foreign_keys(table_name)
- # Check if the specified foreign key name exists
- return any(fk["name"] == fk_name for fk in foreign_keys)
|