123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- """unique generic sensor ids
- Revision ID: a528c3c81506
- Revises: 22ce09690d23
- Create Date: 2021-03-19 23:21:22.992700
- This should be regarded as a non-reversible migration for production servers!
- Downgraded ids for markets and weather sensors are not guaranteed to be the same as before upgrading,
- because their ids have been shifted by the max_id of assets, and by the max_id of assets and markets, respectively.
- If new assets and markets have been created between upgrading and downgrading,
- the downgraded ids are not the same as before upgrading.
- Mitigating circumstances are that neither market ids nor weather sensor ids had been presented to users before,
- so the shift in ids shouldn't burden users.
- Asset ids (and their derived entity addresses) remain the same with this revision.
- This migration prepares the use of market ids and weather sensors ids in their entity addresses.
- Upgrade:
- - (schema for new sensors)
- - Creates new table for generic sensors, using timely_beliefs.SensorDBMixin
- - (data, with temporary schema change)
- - Updates non-user-exposed ids of markets and weather sensors to ensure unique ids across assets, markets and weather sensors
- - Creates new generic sensors for all assets, markets and weather sensors, specifically setting their id to correspond to the ids of the old sensors
- - (schema for old sensors)
- - Makes the id of old sensors a foreign key of the new generic sensor table
- Downgrade:
- - (schema for old sensors)
- - Lets old sensors store their own id again
- - (data, with temporary schema change)
- - Drops new generic sensors corresponding to old sensors
- - Reverts ids of markets and weather sensors to their old ids (migration fails if old sensors have no old id backed up)
- - (schema for new sensors)
- - Drop table for generic sensors
- The logic for shifting ids of markets and weather stations, by example:
- asset ids market ids weather station ids
- (+max_asset_id = +6) (+ max_asset_id + max_market_id = +6 + 8 = +14)
- (upgrade) a 1,2,6 -> 1,2,6 m 3,4,8 -> 9,10,14 w 1,6,7 -> 15,20,21
- (-max_asset_id = -6) (-max_market_id = -14)
- (downgrade) a 1,2,6 <- 1,2,6 m 3,4,8 <- 9,10,14 w 1,6,7 <- 15,20,21 (- max_market_id)
- """
- from alembic import op
- import sqlalchemy as sa
- from sqlalchemy import orm
- from flexmeasures.data.models.time_series import Sensor
- # revision identifiers, used by Alembic.
- revision = "a528c3c81506"
- down_revision = "22ce09690d23"
- branch_labels = None
- depends_on = None
- def upgrade():
- upgrade_schema_new_sensors()
- upgrade_data()
- upgrade_schema_old_sensors()
- def upgrade_schema_new_sensors():
- """Schema migration to create a new sensor table."""
- op.create_table(
- "sensor",
- sa.Column("id", sa.Integer(), nullable=False),
- sa.Column("name", sa.String(length=120), nullable=False),
- sa.Column("unit", sa.String(length=80), nullable=False),
- sa.Column("timezone", sa.String(length=80), nullable=False),
- sa.Column("event_resolution", sa.Interval(), nullable=False),
- sa.Column("knowledge_horizon_fnc", sa.String(length=80), nullable=False),
- sa.Column("knowledge_horizon_par", sa.JSON(), nullable=False),
- sa.PrimaryKeyConstraint("id", name=op.f("sensor_pkey")),
- )
- def upgrade_data():
- """Data migration to update the ids of old sensors."""
- # To support data upgrade, cascade upon updating ids
- recreate_sensor_fks(recreate_with_cascade_on_update=True)
- # Declare ORM table views
- t_assets = sa.Table(
- "asset",
- sa.MetaData(),
- sa.Column("id", sa.Integer),
- sa.Column("name", sa.String(80)),
- )
- t_markets = sa.Table(
- "market",
- sa.MetaData(),
- sa.Column("id", sa.Integer),
- sa.Column("name", sa.String(80)),
- )
- t_weather_sensors = sa.Table(
- "weather_sensor",
- sa.MetaData(),
- sa.Column("id", sa.Integer),
- sa.Column("name", sa.String(80)),
- )
- # Use SQLAlchemy's connection and transaction to go through the data
- connection = op.get_bind()
- # Get the max id used by assets and markets
- max_asset_id = get_max_id(connection, "asset")
- max_market_id = get_max_id(connection, "market")
- max_weather_sensor_id = get_max_id(connection, "weather_sensor")
- # Select all existing ids that need migrating, while keeping names intact
- asset_results = connection.execute(
- sa.select(
- *[
- t_assets.c.id,
- t_assets.c.name,
- ]
- )
- ).fetchall()
- market_results = connection.execute(
- sa.select(
- *[
- t_markets.c.id,
- t_markets.c.name,
- ]
- )
- ).fetchall()
- weather_sensor_results = connection.execute(
- sa.select(
- *[
- t_weather_sensors.c.id,
- t_weather_sensors.c.name,
- ]
- )
- ).fetchall()
- # Prepare to build a list of new sensors
- new_sensors = []
- # Iterate over all assets
- for id_, name in asset_results:
- # Determine the new id
- new_id = id_ # assets keep their original ids
- # Create new Sensors with matching ids
- new_sensor = Sensor(name=name)
- new_sensor.id = new_id
- new_sensors.append(new_sensor)
- # Iterate over all markets
- for id_, name in market_results:
- # Determine the new id
- new_id = id_ + max_asset_id
- # Update the id
- connection.execute(
- t_markets.update().where(t_markets.c.name == name).values(id=new_id)
- )
- # Create new Sensors with matching ids
- new_sensor = Sensor(name=name)
- new_sensor.id = new_id
- new_sensors.append(new_sensor)
- # Iterate over all weather sensors
- for id_, name in weather_sensor_results:
- # Determine the new id
- new_id = id_ + max_asset_id + max_market_id
- # Update the id
- connection.execute(
- t_weather_sensors.update()
- .where(t_weather_sensors.c.name == name)
- .values(id=new_id)
- )
- # Create new Sensors with matching ids
- new_sensor = Sensor(name=name)
- new_sensor.id = new_id
- new_sensors.append(new_sensor)
- # Add the new sensors
- session = orm.Session(bind=connection)
- session.add_all(new_sensors)
- session.commit()
- # After supporting data upgrade, stop cascading upon updating ids
- recreate_sensor_fks(recreate_with_cascade_on_update=False)
- # Finally, help out the autoincrement of the Sensor table
- t_sensors = sa.Table(
- "sensor",
- sa.MetaData(),
- sa.Column("id", sa.Integer),
- )
- sequence_name = "%s_id_seq" % t_sensors.name
- # Set next id for table seq to just after max id of all old sensors combined
- connection.execute(
- sa.sql.expression.text("SELECT setval(:seq_name, :next_id, false);").bindparams(
- seq_name=sequence_name,
- next_id=max_asset_id + max_market_id + max_weather_sensor_id + 1,
- )
- )
- def upgrade_schema_old_sensors():
- """Schema migration to let old sensor tables get their id from the new sensor table."""
- op.create_foreign_key(
- "asset_id_sensor_fkey",
- "asset",
- "sensor",
- ["id"],
- ["id"],
- )
- op.create_foreign_key(
- "market_id_sensor_fkey",
- "market",
- "sensor",
- ["id"],
- ["id"],
- )
- op.create_foreign_key(
- "weather_sensor_id_sensor_fkey",
- "weather_sensor",
- "sensor",
- ["id"],
- ["id"],
- )
- def downgrade():
- downgrade_schema_old_sensors()
- downgrade_data()
- downgrade_schema_new_sensors()
- def downgrade_schema_old_sensors():
- """Schema migration to decouple the id of old sensor tables from the new sensor table."""
- op.drop_constraint("asset_id_sensor_fkey", "asset", type_="foreignkey")
- op.drop_constraint("market_id_sensor_fkey", "market", type_="foreignkey")
- op.drop_constraint(
- "weather_sensor_id_sensor_fkey", "weather_sensor", type_="foreignkey"
- )
- def downgrade_data():
- """Data migration to retrieve the ids of old sensors.
- Note that downgraded ids are not guaranteed to be the same as during upgrade."""
- # To support data downgrade, cascade upon updating ids
- recreate_sensor_fks(recreate_with_cascade_on_update=True)
- # Declare ORM table views
- t_markets = sa.Table(
- "market",
- sa.MetaData(),
- sa.Column("id", sa.Integer),
- sa.Column("name", sa.String(80)),
- )
- # Use Alchemy's connection and transaction to go through the data
- connection = op.get_bind()
- # Get the max id used by assets and markets
- max_asset_id = get_max_id(
- connection, "asset"
- ) # may be different than during upgrade!
- max_market_id = get_max_id(
- connection, "market"
- ) # may be different than during upgrade!
- # Select all existing ids that need migrating
- market_results = connection.execute(
- sa.select(
- *[
- t_markets.c.id,
- t_markets.c.name,
- ]
- )
- ).fetchall()
- # Iterate over all selected data tuples
- for id_, name in market_results:
- # Determine the new id
- new_id = id_ - max_asset_id
- # Update the id
- connection.execute(
- t_markets.update().where(t_markets.c.name == name).values(id=new_id)
- )
- # Repeat steps for weather sensors
- t_weather_sensors = sa.Table(
- "weather_sensor",
- sa.MetaData(),
- sa.Column("id", sa.Integer),
- sa.Column("name", sa.String(80)),
- )
- weather_sensor_results = connection.execute(
- sa.select(
- *[
- t_weather_sensors.c.id,
- t_weather_sensors.c.name,
- ]
- )
- ).fetchall()
- for id_, name in weather_sensor_results:
- # Determine the new id
- new_id = id_ - max_market_id
- # Update the id
- connection.execute(
- t_weather_sensors.update()
- .where(t_weather_sensors.c.name == name)
- .values(id=new_id)
- )
- # After supporting data downgrade, stop cascading upon updating ids
- recreate_sensor_fks(recreate_with_cascade_on_update=False)
- def downgrade_schema_new_sensors():
- """Schema migration to drop the new sensor table."""
- op.drop_table("sensor")
- def recreate_sensor_fks(recreate_with_cascade_on_update: bool):
- """Schema migration to make foreign id keys cascade on update."""
- op.drop_constraint("asset_market_id_market_fkey", "asset", type_="foreignkey")
- op.create_foreign_key(
- "asset_market_id_market_fkey",
- "asset",
- "market",
- ["market_id"],
- ["id"],
- onupdate="CASCADE" if recreate_with_cascade_on_update else None,
- )
- op.drop_constraint("price_market_id_market_fkey", "price", type_="foreignkey")
- op.create_foreign_key(
- "price_market_id_market_fkey",
- "price",
- "market",
- ["market_id"],
- ["id"],
- onupdate="CASCADE" if recreate_with_cascade_on_update else None,
- )
- op.drop_constraint(
- "weather_sensor_id_weather_sensor_fkey", "weather", type_="foreignkey"
- )
- op.create_foreign_key(
- "weather_sensor_id_weather_sensor_fkey",
- "weather",
- "weather_sensor",
- ["sensor_id"],
- ["id"],
- onupdate="CASCADE" if recreate_with_cascade_on_update else None,
- )
- def get_max_id(connection, generic_sensor_type: str) -> int:
- """
- Get the max id of a given generic sensor type.
- :param generic_sensor_type: "asset", "market", or "weather_sensor"
- """
- t_generic_sensor = sa.Table(
- generic_sensor_type,
- sa.MetaData(),
- sa.Column("id", sa.Integer),
- )
- max_id = connection.execute(
- sa.select(
- *[
- sa.sql.expression.func.max(
- t_generic_sensor.c.id,
- )
- ]
- )
- ).scalar() # None if there are none
- max_id = 0 if max_id is None else max_id
- return max_id
|