c349f52c700d_update_data_sources.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """Update data sources
  2. Revision ID: c349f52c700d
  3. Revises: ad98460751d9
  4. Create Date: 2023-12-14 10:31:02.612590
  5. """
  6. from alembic import op
  7. import sqlalchemy as sa
  8. from sqlalchemy.dialects import postgresql
  9. # revision identifiers, used by Alembic.
  10. revision = "c349f52c700d"
  11. down_revision = "ad98460751d9"
  12. branch_labels = None
  13. depends_on = None
  14. def upgrade():
  15. # The name of the data_source should be 120 String, this was not correctly set in an earlier revision of the db.
  16. op.execute(
  17. sa.text("UPDATE data_source SET attributes = '{}' WHERE attributes IS NULL;")
  18. )
  19. with op.batch_alter_table("data_source", schema=None) as batch_op:
  20. batch_op.alter_column(
  21. "name",
  22. existing_type=sa.VARCHAR(length=80),
  23. type_=sa.String(length=120),
  24. existing_nullable=False,
  25. )
  26. # The attributes were initially set as nullable=False but the migration file did not reflect that.
  27. # In this migration the model and db are brought in line.
  28. batch_op.alter_column(
  29. "attributes",
  30. existing_type=postgresql.JSON(astext_type=sa.Text()),
  31. nullable=False,
  32. )
  33. # This constraint is renamed to include the full name of the `data_source` table.
  34. if foreign_key_exists("timed_belief", "timed_belief_source_id_source_fkey"):
  35. with op.batch_alter_table("timed_belief", schema=None) as batch_op:
  36. batch_op.drop_constraint(
  37. "timed_belief_source_id_source_fkey", type_="foreignkey"
  38. )
  39. batch_op.create_foreign_key(
  40. batch_op.f("timed_belief_source_id_data_source_fkey"),
  41. "data_source",
  42. ["source_id"],
  43. ["id"],
  44. )
  45. else:
  46. # already renamed
  47. assert foreign_key_exists(
  48. "timed_belief", "timed_belief_source_id_data_source_fkey"
  49. )
  50. def downgrade():
  51. with op.batch_alter_table("timed_belief", schema=None) as batch_op:
  52. batch_op.drop_constraint(
  53. batch_op.f("timed_belief_source_id_data_source_fkey"), type_="foreignkey"
  54. )
  55. batch_op.create_foreign_key(
  56. "timed_belief_source_id_source_fkey",
  57. "data_source",
  58. ["source_id"],
  59. ["id"],
  60. ondelete="CASCADE",
  61. )
  62. with op.batch_alter_table("data_source", schema=None) as batch_op:
  63. batch_op.alter_column(
  64. "attributes",
  65. existing_type=postgresql.JSON(astext_type=sa.Text()),
  66. nullable=True,
  67. )
  68. batch_op.alter_column(
  69. "name",
  70. existing_type=sa.String(length=120),
  71. type_=sa.VARCHAR(length=80),
  72. existing_nullable=False,
  73. )
  74. def foreign_key_exists(table_name, fk_name) -> bool:
  75. # Get the current connection
  76. connection = op.get_bind()
  77. # Create an Inspector
  78. insp = sa.inspect(connection)
  79. # Get the foreign keys for the specified table
  80. foreign_keys = insp.get_foreign_keys(table_name)
  81. # Check if the specified foreign key name exists
  82. return any(fk["name"] == fk_name for fk in foreign_keys)