a528c3c81506_unique_generic_sensor_ids.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. """unique generic sensor ids
  2. Revision ID: a528c3c81506
  3. Revises: 22ce09690d23
  4. Create Date: 2021-03-19 23:21:22.992700
  5. This should be regarded as a non-reversible migration for production servers!
  6. Downgraded ids for markets and weather sensors are not guaranteed to be the same as before upgrading,
  7. because their ids have been shifted by the max_id of assets, and by the max_id of assets and markets, respectively.
  8. If new assets and markets have been created between upgrading and downgrading,
  9. the downgraded ids are not the same as before upgrading.
  10. Mitigating circumstances are that neither market ids nor weather sensor ids had been presented to users before,
  11. so the shift in ids shouldn't burden users.
  12. Asset ids (and their derived entity addresses) remain the same with this revision.
  13. This migration prepares the use of market ids and weather sensors ids in their entity addresses.
  14. Upgrade:
  15. - (schema for new sensors)
  16. - Creates new table for generic sensors, using timely_beliefs.SensorDBMixin
  17. - (data, with temporary schema change)
  18. - Updates non-user-exposed ids of markets and weather sensors to ensure unique ids across assets, markets and weather sensors
  19. - Creates new generic sensors for all assets, markets and weather sensors, specifically setting their id to correspond to the ids of the old sensors
  20. - (schema for old sensors)
  21. - Makes the id of old sensors a foreign key of the new generic sensor table
  22. Downgrade:
  23. - (schema for old sensors)
  24. - Lets old sensors store their own id again
  25. - (data, with temporary schema change)
  26. - Drops new generic sensors corresponding to old sensors
  27. - Reverts ids of markets and weather sensors to their old ids (migration fails if old sensors have no old id backed up)
  28. - (schema for new sensors)
  29. - Drop table for generic sensors
  30. The logic for shifting ids of markets and weather stations, by example:
  31. asset ids market ids weather station ids
  32. (+max_asset_id = +6) (+ max_asset_id + max_market_id = +6 + 8 = +14)
  33. (upgrade) a 1,2,6 -> 1,2,6 m 3,4,8 -> 9,10,14 w 1,6,7 -> 15,20,21
  34. (-max_asset_id = -6) (-max_market_id = -14)
  35. (downgrade) a 1,2,6 <- 1,2,6 m 3,4,8 <- 9,10,14 w 1,6,7 <- 15,20,21 (- max_market_id)
  36. """
  37. from alembic import op
  38. import sqlalchemy as sa
  39. from sqlalchemy import orm
  40. from flexmeasures.data.models.time_series import Sensor
  41. # revision identifiers, used by Alembic.
  42. revision = "a528c3c81506"
  43. down_revision = "22ce09690d23"
  44. branch_labels = None
  45. depends_on = None
  46. def upgrade():
  47. upgrade_schema_new_sensors()
  48. upgrade_data()
  49. upgrade_schema_old_sensors()
  50. def upgrade_schema_new_sensors():
  51. """Schema migration to create a new sensor table."""
  52. op.create_table(
  53. "sensor",
  54. sa.Column("id", sa.Integer(), nullable=False),
  55. sa.Column("name", sa.String(length=120), nullable=False),
  56. sa.Column("unit", sa.String(length=80), nullable=False),
  57. sa.Column("timezone", sa.String(length=80), nullable=False),
  58. sa.Column("event_resolution", sa.Interval(), nullable=False),
  59. sa.Column("knowledge_horizon_fnc", sa.String(length=80), nullable=False),
  60. sa.Column("knowledge_horizon_par", sa.JSON(), nullable=False),
  61. sa.PrimaryKeyConstraint("id", name=op.f("sensor_pkey")),
  62. )
  63. def upgrade_data():
  64. """Data migration to update the ids of old sensors."""
  65. # To support data upgrade, cascade upon updating ids
  66. recreate_sensor_fks(recreate_with_cascade_on_update=True)
  67. # Declare ORM table views
  68. t_assets = sa.Table(
  69. "asset",
  70. sa.MetaData(),
  71. sa.Column("id", sa.Integer),
  72. sa.Column("name", sa.String(80)),
  73. )
  74. t_markets = sa.Table(
  75. "market",
  76. sa.MetaData(),
  77. sa.Column("id", sa.Integer),
  78. sa.Column("name", sa.String(80)),
  79. )
  80. t_weather_sensors = sa.Table(
  81. "weather_sensor",
  82. sa.MetaData(),
  83. sa.Column("id", sa.Integer),
  84. sa.Column("name", sa.String(80)),
  85. )
  86. # Use SQLAlchemy's connection and transaction to go through the data
  87. connection = op.get_bind()
  88. # Get the max id used by assets and markets
  89. max_asset_id = get_max_id(connection, "asset")
  90. max_market_id = get_max_id(connection, "market")
  91. max_weather_sensor_id = get_max_id(connection, "weather_sensor")
  92. # Select all existing ids that need migrating, while keeping names intact
  93. asset_results = connection.execute(
  94. sa.select(
  95. *[
  96. t_assets.c.id,
  97. t_assets.c.name,
  98. ]
  99. )
  100. ).fetchall()
  101. market_results = connection.execute(
  102. sa.select(
  103. *[
  104. t_markets.c.id,
  105. t_markets.c.name,
  106. ]
  107. )
  108. ).fetchall()
  109. weather_sensor_results = connection.execute(
  110. sa.select(
  111. *[
  112. t_weather_sensors.c.id,
  113. t_weather_sensors.c.name,
  114. ]
  115. )
  116. ).fetchall()
  117. # Prepare to build a list of new sensors
  118. new_sensors = []
  119. # Iterate over all assets
  120. for id_, name in asset_results:
  121. # Determine the new id
  122. new_id = id_ # assets keep their original ids
  123. # Create new Sensors with matching ids
  124. new_sensor = Sensor(name=name)
  125. new_sensor.id = new_id
  126. new_sensors.append(new_sensor)
  127. # Iterate over all markets
  128. for id_, name in market_results:
  129. # Determine the new id
  130. new_id = id_ + max_asset_id
  131. # Update the id
  132. connection.execute(
  133. t_markets.update().where(t_markets.c.name == name).values(id=new_id)
  134. )
  135. # Create new Sensors with matching ids
  136. new_sensor = Sensor(name=name)
  137. new_sensor.id = new_id
  138. new_sensors.append(new_sensor)
  139. # Iterate over all weather sensors
  140. for id_, name in weather_sensor_results:
  141. # Determine the new id
  142. new_id = id_ + max_asset_id + max_market_id
  143. # Update the id
  144. connection.execute(
  145. t_weather_sensors.update()
  146. .where(t_weather_sensors.c.name == name)
  147. .values(id=new_id)
  148. )
  149. # Create new Sensors with matching ids
  150. new_sensor = Sensor(name=name)
  151. new_sensor.id = new_id
  152. new_sensors.append(new_sensor)
  153. # Add the new sensors
  154. session = orm.Session(bind=connection)
  155. session.add_all(new_sensors)
  156. session.commit()
  157. # After supporting data upgrade, stop cascading upon updating ids
  158. recreate_sensor_fks(recreate_with_cascade_on_update=False)
  159. # Finally, help out the autoincrement of the Sensor table
  160. t_sensors = sa.Table(
  161. "sensor",
  162. sa.MetaData(),
  163. sa.Column("id", sa.Integer),
  164. )
  165. sequence_name = "%s_id_seq" % t_sensors.name
  166. # Set next id for table seq to just after max id of all old sensors combined
  167. connection.execute(
  168. sa.sql.expression.text("SELECT setval(:seq_name, :next_id, false);").bindparams(
  169. seq_name=sequence_name,
  170. next_id=max_asset_id + max_market_id + max_weather_sensor_id + 1,
  171. )
  172. )
  173. def upgrade_schema_old_sensors():
  174. """Schema migration to let old sensor tables get their id from the new sensor table."""
  175. op.create_foreign_key(
  176. "asset_id_sensor_fkey",
  177. "asset",
  178. "sensor",
  179. ["id"],
  180. ["id"],
  181. )
  182. op.create_foreign_key(
  183. "market_id_sensor_fkey",
  184. "market",
  185. "sensor",
  186. ["id"],
  187. ["id"],
  188. )
  189. op.create_foreign_key(
  190. "weather_sensor_id_sensor_fkey",
  191. "weather_sensor",
  192. "sensor",
  193. ["id"],
  194. ["id"],
  195. )
  196. def downgrade():
  197. downgrade_schema_old_sensors()
  198. downgrade_data()
  199. downgrade_schema_new_sensors()
  200. def downgrade_schema_old_sensors():
  201. """Schema migration to decouple the id of old sensor tables from the new sensor table."""
  202. op.drop_constraint("asset_id_sensor_fkey", "asset", type_="foreignkey")
  203. op.drop_constraint("market_id_sensor_fkey", "market", type_="foreignkey")
  204. op.drop_constraint(
  205. "weather_sensor_id_sensor_fkey", "weather_sensor", type_="foreignkey"
  206. )
  207. def downgrade_data():
  208. """Data migration to retrieve the ids of old sensors.
  209. Note that downgraded ids are not guaranteed to be the same as during upgrade."""
  210. # To support data downgrade, cascade upon updating ids
  211. recreate_sensor_fks(recreate_with_cascade_on_update=True)
  212. # Declare ORM table views
  213. t_markets = sa.Table(
  214. "market",
  215. sa.MetaData(),
  216. sa.Column("id", sa.Integer),
  217. sa.Column("name", sa.String(80)),
  218. )
  219. # Use Alchemy's connection and transaction to go through the data
  220. connection = op.get_bind()
  221. # Get the max id used by assets and markets
  222. max_asset_id = get_max_id(
  223. connection, "asset"
  224. ) # may be different than during upgrade!
  225. max_market_id = get_max_id(
  226. connection, "market"
  227. ) # may be different than during upgrade!
  228. # Select all existing ids that need migrating
  229. market_results = connection.execute(
  230. sa.select(
  231. *[
  232. t_markets.c.id,
  233. t_markets.c.name,
  234. ]
  235. )
  236. ).fetchall()
  237. # Iterate over all selected data tuples
  238. for id_, name in market_results:
  239. # Determine the new id
  240. new_id = id_ - max_asset_id
  241. # Update the id
  242. connection.execute(
  243. t_markets.update().where(t_markets.c.name == name).values(id=new_id)
  244. )
  245. # Repeat steps for weather sensors
  246. t_weather_sensors = sa.Table(
  247. "weather_sensor",
  248. sa.MetaData(),
  249. sa.Column("id", sa.Integer),
  250. sa.Column("name", sa.String(80)),
  251. )
  252. weather_sensor_results = connection.execute(
  253. sa.select(
  254. *[
  255. t_weather_sensors.c.id,
  256. t_weather_sensors.c.name,
  257. ]
  258. )
  259. ).fetchall()
  260. for id_, name in weather_sensor_results:
  261. # Determine the new id
  262. new_id = id_ - max_market_id
  263. # Update the id
  264. connection.execute(
  265. t_weather_sensors.update()
  266. .where(t_weather_sensors.c.name == name)
  267. .values(id=new_id)
  268. )
  269. # After supporting data downgrade, stop cascading upon updating ids
  270. recreate_sensor_fks(recreate_with_cascade_on_update=False)
  271. def downgrade_schema_new_sensors():
  272. """Schema migration to drop the new sensor table."""
  273. op.drop_table("sensor")
  274. def recreate_sensor_fks(recreate_with_cascade_on_update: bool):
  275. """Schema migration to make foreign id keys cascade on update."""
  276. op.drop_constraint("asset_market_id_market_fkey", "asset", type_="foreignkey")
  277. op.create_foreign_key(
  278. "asset_market_id_market_fkey",
  279. "asset",
  280. "market",
  281. ["market_id"],
  282. ["id"],
  283. onupdate="CASCADE" if recreate_with_cascade_on_update else None,
  284. )
  285. op.drop_constraint("price_market_id_market_fkey", "price", type_="foreignkey")
  286. op.create_foreign_key(
  287. "price_market_id_market_fkey",
  288. "price",
  289. "market",
  290. ["market_id"],
  291. ["id"],
  292. onupdate="CASCADE" if recreate_with_cascade_on_update else None,
  293. )
  294. op.drop_constraint(
  295. "weather_sensor_id_weather_sensor_fkey", "weather", type_="foreignkey"
  296. )
  297. op.create_foreign_key(
  298. "weather_sensor_id_weather_sensor_fkey",
  299. "weather",
  300. "weather_sensor",
  301. ["sensor_id"],
  302. ["id"],
  303. onupdate="CASCADE" if recreate_with_cascade_on_update else None,
  304. )
  305. def get_max_id(connection, generic_sensor_type: str) -> int:
  306. """
  307. Get the max id of a given generic sensor type.
  308. :param generic_sensor_type: "asset", "market", or "weather_sensor"
  309. """
  310. t_generic_sensor = sa.Table(
  311. generic_sensor_type,
  312. sa.MetaData(),
  313. sa.Column("id", sa.Integer),
  314. )
  315. max_id = connection.execute(
  316. sa.select(
  317. *[
  318. sa.sql.expression.func.max(
  319. t_generic_sensor.c.id,
  320. )
  321. ]
  322. )
  323. ).scalar() # None if there are none
  324. max_id = 0 if max_id is None else max_id
  325. return max_id