b6d49ed7cceb_introduce_the_GenericAsset_table.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """introduce the GenericAsset table
  2. Revision ID: b6d49ed7cceb
  3. Revises: 565e092a6c5e
  4. Create Date: 2021-07-20 20:15:28.019102
  5. """
  6. import json
  7. from alembic import context, op
  8. from sqlalchemy import orm, insert, update
  9. import sqlalchemy as sa
  10. # revision identifiers, used by Alembic.
  11. revision = "b6d49ed7cceb"
  12. down_revision = "565e092a6c5e"
  13. branch_labels = None
  14. depends_on = None
  15. # Declare ORM table views we are using
  16. t_sensors = sa.Table(
  17. "sensor",
  18. sa.MetaData(),
  19. sa.Column("id", sa.Integer),
  20. sa.Column("name", sa.String(80)),
  21. sa.Column("generic_asset_id", sa.Integer),
  22. )
  23. t_generic_assets = sa.Table(
  24. "generic_asset",
  25. sa.MetaData(),
  26. sa.Column("id", sa.Integer),
  27. sa.Column("name", sa.String(80)),
  28. sa.Column("generic_asset_type_id", sa.Integer),
  29. sa.Column("owner_id", sa.Integer),
  30. )
  31. t_generic_asset_types = sa.Table(
  32. "generic_asset_type",
  33. sa.MetaData(),
  34. sa.Column("id"),
  35. sa.Column("name"),
  36. )
  37. t_assets = sa.Table(
  38. "asset",
  39. sa.MetaData(),
  40. sa.Column("id"),
  41. sa.Column(
  42. "asset_type_name",
  43. ),
  44. )
  45. t_markets = sa.Table(
  46. "market",
  47. sa.MetaData(),
  48. sa.Column("id"),
  49. sa.Column(
  50. "market_type_name",
  51. ),
  52. )
  53. t_weather_sensors = sa.Table(
  54. "weather_sensor",
  55. sa.MetaData(),
  56. sa.Column("id"),
  57. sa.Column(
  58. "weather_sensor_type_name",
  59. ),
  60. )
  61. def upgrade():
  62. """Add GenericAsset table and link with Sensor table
  63. For Sensors with corresponding Assets, Markets or WeatherSensors, a GenericAsset is created with matching name.
  64. For Sensors without, a GenericAsset is created with matching name.
  65. Optionally, sensors that do not correspond to an existing Asset, Market or WeatherSensor can be grouped using
  66. flexmeasures db upgrade +1 -x '{"asset_type_name": "waste power plant", "sensor_ids": [2, 4], "asset_name": "Afval Energie Centrale", "owner_id": 2}' -x '{"asset_type_name": "EVSE", "sensor_ids": [7, 8], "asset_name": "Laadstation Rijksmuseum - charger 2", "owner_id": 2}'
  67. The +1 makes sure we only upgrade by 1 revision, as these arguments are only meant to be used by this upgrade function.
  68. """
  69. upgrade_schema()
  70. upgrade_data()
  71. op.alter_column("generic_asset", "generic_asset_type_id", nullable=False)
  72. op.alter_column("sensor", "generic_asset_id", nullable=False)
  73. def downgrade():
  74. op.drop_constraint(
  75. op.f("sensor_generic_asset_id_generic_asset_fkey"), "sensor", type_="foreignkey"
  76. )
  77. op.drop_column("sensor", "generic_asset_id")
  78. op.drop_table("generic_asset")
  79. def upgrade_data():
  80. """Data migration adding 1 generic asset for each user defined group of sensors,
  81. plus 1 generic asset for each remaining sensor (i.e. those not part of a user defined group).
  82. """
  83. # Get user defined sensor groups
  84. sensor_groups = context.get_x_argument()
  85. # Use SQLAlchemy's connection and transaction to go through the data
  86. connection = op.get_bind()
  87. session = orm.Session(bind=connection)
  88. # Select all existing ids that need migrating, while keeping names intact
  89. sensor_results = connection.execute(
  90. sa.select(
  91. *[
  92. t_sensors.c.id,
  93. t_sensors.c.name,
  94. ]
  95. )
  96. ).fetchall()
  97. sensor_ids = [sensor_result[0] for sensor_result in sensor_results]
  98. # Construct generic asset for each user defined sensor group
  99. sensor_results_dict = {k: v for k, v in sensor_results}
  100. for i, sensor_group in enumerate(sensor_groups):
  101. sensor_group_dict = json.loads(sensor_group)
  102. print(f"Constructing one generic asset according to: {sensor_group_dict}")
  103. if not set(sensor_group_dict["sensor_ids"]).issubset(
  104. set(sensor_results_dict.keys())
  105. ):
  106. raise ValueError(
  107. f"At least some of these sensor ids {sensor_group_dict['sensor_ids']} do not exist."
  108. )
  109. generic_asset_type_results = connection.execute(
  110. sa.select(*[t_generic_asset_types.c.id]).where(
  111. t_generic_asset_types.c.name == sensor_group_dict["asset_type_name"]
  112. )
  113. ).one_or_none()
  114. if generic_asset_type_results is None:
  115. raise ValueError(
  116. f"Asset type name '{sensor_group_dict['asset_type_name']}' does not exist."
  117. )
  118. generic_asset_type_id = generic_asset_type_results[0]
  119. group_sensor_ids = [
  120. sensor_id
  121. for sensor_id in sensor_ids
  122. if sensor_id in sensor_group_dict["sensor_ids"]
  123. ]
  124. connection.execute(
  125. insert(t_generic_assets).values(
  126. name=sensor_group_dict["asset_name"],
  127. generic_asset_type_id=generic_asset_type_id,
  128. owner_id=sensor_group_dict["owner_id"],
  129. )
  130. )
  131. latest_ga_id = get_latest_generic_asset_id(connection)
  132. print(
  133. f"Created new generic asset with ID {latest_ga_id}. Now tying {len(group_sensor_ids)} sensors to it ..."
  134. )
  135. for sensor_id in group_sensor_ids:
  136. connection.execute(
  137. update(t_sensors)
  138. .where(t_sensors.c.id == sensor_id)
  139. .values(generic_asset_id=latest_ga_id)
  140. )
  141. for id_ in sensor_group_dict["sensor_ids"]:
  142. sensor_results_dict.pop(id_)
  143. # Construct generic assets for all remaining sensors
  144. if sensor_results_dict:
  145. print(
  146. f"Constructing generic assets for each of the following sensors: {sensor_results_dict}"
  147. )
  148. for id_, name in sensor_results_dict.items():
  149. _sensor_ids = [sensor_id for sensor_id in sensor_ids if sensor_id == id_]
  150. asset_results = connection.execute(
  151. sa.select(
  152. *[
  153. t_assets.c.asset_type_name,
  154. ]
  155. ).where(t_assets.c.id == id_)
  156. ).one_or_none()
  157. if asset_results is not None:
  158. asset_type_name = asset_results[0]
  159. else:
  160. market_results = connection.execute(
  161. sa.select(
  162. *[
  163. t_markets.c.market_type_name,
  164. ]
  165. ).where(t_markets.c.id == id_)
  166. ).one_or_none()
  167. if market_results is not None:
  168. asset_type_name = market_results[0]
  169. else:
  170. weather_sensor_results = connection.execute(
  171. sa.select(
  172. *[
  173. t_weather_sensors.c.weather_sensor_type_name,
  174. ]
  175. ).where(t_weather_sensors.c.id == id_)
  176. ).one_or_none()
  177. if weather_sensor_results is not None:
  178. asset_type_name = weather_sensor_results[0]
  179. else:
  180. raise ValueError(
  181. f"Cannot find an Asset, Market or WeatherSensor with id {id_}"
  182. )
  183. generic_asset_type_results = connection.execute(
  184. sa.select(
  185. *[
  186. t_generic_asset_types.c.id,
  187. ]
  188. ).where(t_generic_asset_types.c.name == asset_type_name)
  189. ).one_or_none()
  190. # Create new GenericAssets with matching names
  191. connection.execute(
  192. insert(t_generic_assets).values(
  193. name=name,
  194. generic_asset_type_id=generic_asset_type_results[0],
  195. )
  196. )
  197. latest_ga_id = get_latest_generic_asset_id(connection)
  198. print(
  199. f"Created new generic asset with ID {latest_ga_id}. Now tying {len(_sensor_ids)} sensors to it ..."
  200. )
  201. for sensor_id in _sensor_ids:
  202. connection.execute(
  203. update(t_sensors)
  204. .where(t_sensors.c.id == sensor_id)
  205. .values(generic_asset_id=latest_ga_id)
  206. )
  207. session.commit()
  208. def upgrade_schema():
  209. op.create_table(
  210. "generic_asset",
  211. sa.Column("id", sa.Integer(), nullable=False),
  212. sa.Column("name", sa.String(length=80), nullable=True),
  213. sa.Column("latitude", sa.Float(), nullable=True),
  214. sa.Column("longitude", sa.Float(), nullable=True),
  215. sa.Column(
  216. "generic_asset_type_id", sa.Integer(), nullable=True
  217. ), # we set nullable=False after data migration
  218. sa.Column("owner_id", sa.Integer(), nullable=True),
  219. sa.ForeignKeyConstraint(
  220. ["generic_asset_type_id"],
  221. ["generic_asset_type.id"],
  222. name=op.f("generic_asset_generic_asset_type_id_generic_asset_type_fkey"),
  223. ),
  224. sa.ForeignKeyConstraint(
  225. ["owner_id"],
  226. ["fm_user.id"],
  227. name=op.f("generic_asset_owner_id_fm_user_fkey"),
  228. ondelete="CASCADE",
  229. ),
  230. sa.PrimaryKeyConstraint("id", name=op.f("generic_asset_pkey")),
  231. )
  232. op.add_column(
  233. "sensor", sa.Column("generic_asset_id", sa.Integer(), nullable=True)
  234. ) # we set nullable=False after data migration
  235. op.create_foreign_key(
  236. op.f("sensor_generic_asset_id_generic_asset_fkey"),
  237. "sensor",
  238. "generic_asset",
  239. ["generic_asset_id"],
  240. ["id"],
  241. )
  242. def get_latest_generic_asset_id(connection) -> int:
  243. """
  244. Getting the latest (highest) GenericAsset ID.
  245. This is a useful method as somehow there was no inserted_primary_key available
  246. through the engine result (see
  247. https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.LegacyCursorResult.inserted_primary_key)
  248. """
  249. return connection.execute(
  250. sa.select(*[t_generic_assets.c.id]).order_by(t_generic_assets.c.id)
  251. ).fetchall()[-1][0]