cb8df44ebda5_flexcontext_field.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. """flexcontext field
  2. Revision ID: cb8df44ebda5
  3. Revises: 2ba59c7c954e
  4. Create Date: 2024-12-16 18:39:34.168732
  5. """
  6. from alembic import op
  7. import json
  8. import sqlalchemy as sa
  9. from flexmeasures.utils.unit_utils import is_power_unit, is_capacity_price_unit, ur
  10. # revision identifiers, used by Alembic.
  11. revision = "cb8df44ebda5"
  12. down_revision = "2ba59c7c954e"
  13. branch_labels = None
  14. depends_on = None
  15. def build_flex_context(
  16. attributes_data,
  17. consumption_price_sensor_id,
  18. market_id,
  19. production_price_sensor_id,
  20. inflexible_device_sensors,
  21. capacity_in_mw,
  22. consumption_capacity_in_mw,
  23. production_capacity_in_mw,
  24. ems_peak_consumption_price,
  25. ems_peak_production_price,
  26. ems_consumption_breach_price,
  27. ems_production_breach_price,
  28. ):
  29. keys_to_remove = [
  30. "market_id",
  31. "capacity_in_mw",
  32. "consumption_capacity_in_mw",
  33. "production_capacity_in_mw",
  34. "ems_peak_consumption_price",
  35. "ems_peak_production_price",
  36. "ems_consumption_breach_price",
  37. "ems_production_breach_price",
  38. # Alt keys when values are stored as a fixed value
  39. "site-power-capacity",
  40. "site-consumption-capacity",
  41. "site-production-capacity",
  42. "site-peak-consumption-price",
  43. "site-peak-production-price",
  44. "site-consumption-breach-price",
  45. "site-production-breach-price",
  46. # Adding the below since these field could have been saved as either the hyphen or underscore format
  47. "ems-peak-consumption-price",
  48. "ems-peak-production-price",
  49. "ems-consumption-breach-price",
  50. "ems-production-breach-price",
  51. ]
  52. for key in keys_to_remove:
  53. attributes_data.pop(key, None)
  54. flex_context = attributes_data.pop("flex-context", None)
  55. if flex_context is None:
  56. flex_context = {}
  57. else:
  58. flex_context = json.loads(flex_context)
  59. # Fill the flex-context's consumption-price field with:
  60. # - the value of the consumption_price_sensor_id column
  61. # - otherwise, the market_id attribute (old fallback)
  62. # - otherwise, keep the consumption-price field from the flex-context attribute
  63. if (
  64. consumption_price_sensor_id is not None
  65. or market_id is not None
  66. or "consumption-price" not in flex_context
  67. ):
  68. flex_context["consumption-price"] = {
  69. "sensor": (
  70. consumption_price_sensor_id
  71. if consumption_price_sensor_id
  72. else market_id
  73. )
  74. }
  75. # Fill the flex-context's production-price field with:
  76. # - the value of the production_price_sensor_id column
  77. # - otherwise, the market_id attribute (old fallback, also for the production sensor)
  78. # - otherwise, keep the production-price field from the flex-context attribute
  79. if (
  80. production_price_sensor_id is not None
  81. or market_id is not None
  82. or "production-price" not in flex_context
  83. ):
  84. flex_context["production-price"] = {
  85. "sensor": (
  86. production_price_sensor_id if production_price_sensor_id else market_id
  87. )
  88. }
  89. if inflexible_device_sensors or "inflexible-device-sensors" not in flex_context:
  90. flex_context["inflexible-device-sensors"] = [
  91. s[0] for s in inflexible_device_sensors
  92. ]
  93. capacity_data = {
  94. "site-power-capacity": capacity_in_mw,
  95. "site-consumption-capacity": consumption_capacity_in_mw,
  96. "site-production-capacity": production_capacity_in_mw,
  97. }
  98. for key, value in capacity_data.items():
  99. if value is not None:
  100. if isinstance(value, (int, float)):
  101. flex_context[key] = f"{int(value * 1000)} kW"
  102. else:
  103. flex_context[key] = value
  104. price_data = {
  105. "site-peak-consumption-price": ems_peak_consumption_price,
  106. "site-peak-production-price": ems_peak_production_price,
  107. "site-consumption-breach-price": ems_consumption_breach_price,
  108. "site-production-breach-price": ems_production_breach_price,
  109. }
  110. for key, value in price_data.items():
  111. if value is not None:
  112. flex_context[key] = value
  113. return flex_context
  114. def process_field(value, attributes_data, original_key, new_key, validator):
  115. if value is not None:
  116. if isinstance(value, str) and validator(value):
  117. try:
  118. attributes_data[original_key] = (
  119. ur.Quantity(value).to(ur.Quantity("MW")).magnitude
  120. )
  121. except ValueError:
  122. attributes_data[new_key] = value
  123. else:
  124. attributes_data[new_key] = value
  125. def upgrade():
  126. with op.batch_alter_table("generic_asset", schema=None) as batch_op:
  127. batch_op.add_column(
  128. sa.Column("flex_context", sa.JSON(), nullable=False, server_default="{}")
  129. )
  130. generic_asset_table = sa.Table(
  131. "generic_asset",
  132. sa.MetaData(),
  133. sa.Column("id", sa.Integer, primary_key=True),
  134. sa.Column("attributes", sa.JSON),
  135. sa.Column("flex_context", sa.JSON),
  136. sa.Column("consumption_price_sensor_id", sa.Integer),
  137. sa.Column("production_price_sensor_id", sa.Integer),
  138. )
  139. inflexible_sensors_table = sa.Table(
  140. "assets_inflexible_sensors",
  141. sa.MetaData(),
  142. sa.Column("id", sa.Integer, primary_key=True),
  143. sa.Column("generic_asset_id", sa.Integer),
  144. sa.Column("inflexible_sensor_id", sa.Integer),
  145. )
  146. # Initiate connection to execute the queries
  147. conn = op.get_bind()
  148. select_stmt = sa.select(
  149. generic_asset_table.c.id,
  150. generic_asset_table.c.attributes,
  151. generic_asset_table.c.consumption_price_sensor_id,
  152. generic_asset_table.c.production_price_sensor_id,
  153. )
  154. results = conn.execute(select_stmt)
  155. for row in results:
  156. (
  157. asset_id,
  158. attributes_data,
  159. consumption_price_sensor_id,
  160. production_price_sensor_id,
  161. ) = row
  162. # fetch inflexible sensors
  163. select_stmt = sa.select(inflexible_sensors_table.c.inflexible_sensor_id).where(
  164. inflexible_sensors_table.c.generic_asset_id == asset_id
  165. )
  166. inflexible_device_sensors = conn.execute(select_stmt).fetchall()
  167. # Get fields-to-migrate from attributes
  168. market_id = attributes_data.get("market_id")
  169. capacity_in_mw = attributes_data.get("capacity_in_mw") or attributes_data.get(
  170. "site-power-capacity"
  171. )
  172. consumption_capacity_in_mw = attributes_data.get(
  173. "consumption_capacity_in_mw"
  174. ) or attributes_data.get("site-consumption-capacity")
  175. production_capacity_in_mw = attributes_data.get(
  176. "production_capacity_in_mw"
  177. ) or attributes_data.get("site-production-capacity")
  178. ems_peak_consumption_price = attributes_data.get(
  179. "ems-peak-consumption-price"
  180. ) or attributes_data.get("site-peak-consumption-price")
  181. ems_peak_production_price = attributes_data.get(
  182. "ems-peak-production-price"
  183. ) or attributes_data.get("site-peak-production-price")
  184. ems_consumption_breach_price = attributes_data.get(
  185. "ems-consumption-breach-price"
  186. ) or attributes_data.get("site-consumption-breach-price")
  187. ems_production_breach_price = attributes_data.get(
  188. "ems-production-breach-price"
  189. ) or attributes_data.get("site-production-breach-price")
  190. # Build flex context - code off-loaded to external function as it is too long
  191. flex_context = build_flex_context(
  192. attributes_data,
  193. consumption_price_sensor_id,
  194. market_id,
  195. production_price_sensor_id,
  196. inflexible_device_sensors,
  197. capacity_in_mw,
  198. consumption_capacity_in_mw,
  199. production_capacity_in_mw,
  200. ems_peak_consumption_price,
  201. ems_peak_production_price,
  202. ems_consumption_breach_price,
  203. ems_production_breach_price,
  204. )
  205. cleaned_flex_context = {}
  206. # loop through flex_context and remove keys with null values or empty arrays
  207. for key, value in flex_context.items():
  208. if (
  209. value
  210. and (isinstance(value, dict) or isinstance(value, list))
  211. or isinstance(value, str)
  212. ):
  213. if isinstance(value, dict) and value.get("sensor") is not None:
  214. cleaned_flex_context[key] = value
  215. elif isinstance(value, list) and len(value) > 0:
  216. cleaned_flex_context[key] = value
  217. elif isinstance(value, str) and (value != "" or value is not None):
  218. cleaned_flex_context[key] = value
  219. flex_context = cleaned_flex_context
  220. update_stmt = (
  221. generic_asset_table.update()
  222. .where(generic_asset_table.c.id == asset_id)
  223. .values(flex_context=flex_context, attributes=attributes_data)
  224. )
  225. conn.execute(update_stmt)
  226. with op.batch_alter_table("generic_asset", schema=None) as batch_op:
  227. batch_op.drop_constraint(
  228. batch_op.f("generic_asset_consumption_price_sensor_id_sensor_fkey"),
  229. type_="foreignkey",
  230. )
  231. batch_op.drop_constraint(
  232. batch_op.f("generic_asset_production_price_sensor_id_sensor_fkey"),
  233. type_="foreignkey",
  234. )
  235. batch_op.drop_column("production_price_sensor_id")
  236. batch_op.drop_column("consumption_price_sensor_id")
  237. # Drop foreign key constraints first
  238. op.drop_constraint(
  239. "assets_inflexible_sensors_generic_asset_id_generic_asset_fkey",
  240. "assets_inflexible_sensors",
  241. type_="foreignkey",
  242. )
  243. op.drop_constraint(
  244. "assets_inflexible_sensors_inflexible_sensor_id_sensor_fkey",
  245. "assets_inflexible_sensors",
  246. type_="foreignkey",
  247. )
  248. # Drop the table
  249. op.drop_table("assets_inflexible_sensors")
  250. def downgrade():
  251. with op.batch_alter_table("generic_asset", schema=None) as batch_op:
  252. batch_op.add_column(
  253. sa.Column("consumption_price_sensor_id", sa.Integer(), nullable=True)
  254. )
  255. batch_op.add_column(
  256. sa.Column("production_price_sensor_id", sa.Integer(), nullable=True)
  257. )
  258. batch_op.create_foreign_key(
  259. batch_op.f("generic_asset_production_price_sensor_id_sensor_fkey"),
  260. "sensor",
  261. ["production_price_sensor_id"],
  262. ["id"],
  263. ondelete="SET NULL",
  264. )
  265. batch_op.create_foreign_key(
  266. batch_op.f("generic_asset_consumption_price_sensor_id_sensor_fkey"),
  267. "sensor",
  268. ["consumption_price_sensor_id"],
  269. ["id"],
  270. ondelete="SET NULL",
  271. )
  272. # Create assets_inflexible_sensors table
  273. op.create_table(
  274. "assets_inflexible_sensors",
  275. sa.Column("generic_asset_id", sa.Integer(), nullable=False),
  276. sa.Column("inflexible_sensor_id", sa.Integer(), nullable=False),
  277. sa.ForeignKeyConstraint(
  278. ["generic_asset_id"],
  279. ["generic_asset.id"],
  280. name="assets_inflexible_sensors_generic_asset_id_generic_asset_fkey",
  281. ),
  282. sa.ForeignKeyConstraint(
  283. ["inflexible_sensor_id"],
  284. ["sensor.id"],
  285. name="assets_inflexible_sensors_inflexible_sensor_id_sensor_fkey",
  286. ),
  287. sa.PrimaryKeyConstraint(
  288. "generic_asset_id",
  289. "inflexible_sensor_id",
  290. name="assets_inflexible_sensors_pkey",
  291. ),
  292. sa.UniqueConstraint(
  293. "inflexible_sensor_id",
  294. "generic_asset_id",
  295. name="assets_inflexible_sensors_key",
  296. ),
  297. )
  298. generic_asset_table = sa.Table(
  299. "generic_asset",
  300. sa.MetaData(),
  301. sa.Column("id", sa.Integer, primary_key=True),
  302. sa.Column("attributes", sa.JSON),
  303. sa.Column("flex_context", sa.JSON),
  304. sa.Column("consumption_price_sensor_id", sa.Integer),
  305. sa.Column("production_price_sensor_id", sa.Integer),
  306. )
  307. inflexible_sensors_table = sa.Table(
  308. "assets_inflexible_sensors",
  309. sa.MetaData(),
  310. sa.Column("generic_asset_id", sa.Integer),
  311. sa.Column("inflexible_sensor_id", sa.Integer),
  312. )
  313. conn = op.get_bind()
  314. select_stmt = sa.select(
  315. generic_asset_table.c.id,
  316. generic_asset_table.c.flex_context,
  317. generic_asset_table.c.attributes,
  318. )
  319. results = conn.execute(select_stmt)
  320. for row in results:
  321. asset_id, flex_context, attributes_data = row
  322. if flex_context is None:
  323. flex_context = {}
  324. # If possible, fill in the consumption_price_sensor_id and production_price_sensor_id columns
  325. # (don't bother reverting to the deprecated market_id attribute)
  326. consumption_price = flex_context.pop("consumption-price", None)
  327. if (
  328. isinstance(consumption_price, dict)
  329. and consumption_price.get("sensor") is not None
  330. ):
  331. consumption_price_sensor_id = consumption_price["sensor"]
  332. else:
  333. # Unexpected type, so put it back
  334. if consumption_price is not None:
  335. flex_context["consumption-price"] = consumption_price
  336. consumption_price_sensor_id = None
  337. production_price = flex_context.pop("production-price", None)
  338. if (
  339. isinstance(production_price, dict)
  340. and production_price.get("sensor") is not None
  341. ):
  342. production_price_sensor_id = production_price["sensor"]
  343. else:
  344. # Unexpected type, so put it back
  345. if production_price is not None:
  346. flex_context["production-price"] = production_price
  347. production_price_sensor_id = None
  348. site_power_capacity = flex_context.pop("site-power-capacity", None)
  349. consumption_capacity_in_mw = flex_context.pop("site-consumption-capacity", None)
  350. production_capacity_in_mw = flex_context.pop("site-production-capacity", None)
  351. ems_peak_consumption_price = flex_context.pop(
  352. "site-peak-consumption-price", None
  353. )
  354. ems_peak_production_price = flex_context.pop("site-peak-production-price", None)
  355. ems_consumption_breach_price = flex_context.pop(
  356. "site-consumption-breach-price", None
  357. )
  358. ems_production_breach_price = flex_context.pop(
  359. "site-production-breach-price", None
  360. )
  361. process_field(
  362. site_power_capacity,
  363. attributes_data,
  364. "capacity_in_mw",
  365. "site-power-capacity",
  366. is_power_unit,
  367. )
  368. process_field(
  369. consumption_capacity_in_mw,
  370. attributes_data,
  371. "consumption_capacity_in_mw",
  372. "site-consumption-capacity",
  373. is_power_unit,
  374. )
  375. process_field(
  376. production_capacity_in_mw,
  377. attributes_data,
  378. "production_capacity_in_mw",
  379. "site-production-capacity",
  380. is_power_unit,
  381. )
  382. process_field(
  383. ems_peak_consumption_price,
  384. attributes_data,
  385. "ems-peak-consumption-price",
  386. "site-peak-consumption-price",
  387. is_capacity_price_unit,
  388. )
  389. process_field(
  390. ems_peak_production_price,
  391. attributes_data,
  392. "ems-peak-production-price",
  393. "site-peak-production-price",
  394. is_capacity_price_unit,
  395. )
  396. process_field(
  397. ems_consumption_breach_price,
  398. attributes_data,
  399. "ems-consumption-breach-price",
  400. "site-consumption-breach-price",
  401. is_capacity_price_unit,
  402. )
  403. process_field(
  404. ems_production_breach_price,
  405. attributes_data,
  406. "ems-production-breach-price",
  407. "site-production-breach-price",
  408. is_capacity_price_unit,
  409. )
  410. inflexible_device_sensors = flex_context.pop("inflexible-device-sensors", [])
  411. if not isinstance(inflexible_device_sensors, list) or not all(
  412. isinstance(s, int) for s in inflexible_device_sensors
  413. ):
  414. # Unexpected type, so put it back
  415. flex_context["inflexible-device-sensors"] = inflexible_device_sensors
  416. inflexible_device_sensors = []
  417. # Retain data in any new flex-context fields that are not supported after downgrading
  418. if flex_context:
  419. attributes_data["flex-context"] = json.dumps(flex_context)
  420. update_stmt = (
  421. generic_asset_table.update()
  422. .where(generic_asset_table.c.id == asset_id)
  423. .values(
  424. consumption_price_sensor_id=consumption_price_sensor_id,
  425. production_price_sensor_id=production_price_sensor_id,
  426. attributes=attributes_data,
  427. )
  428. )
  429. conn.execute(update_stmt)
  430. for sensor_id in inflexible_device_sensors:
  431. insert_stmt = inflexible_sensors_table.insert().values(
  432. generic_asset_id=asset_id, inflexible_sensor_id=sensor_id
  433. )
  434. conn.execute(insert_stmt)
  435. with op.batch_alter_table("generic_asset", schema=None) as batch_op:
  436. batch_op.drop_column("flex_context")