utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. from __future__ import annotations
  2. from typing import Optional, Dict
  3. import json
  4. from flask import url_for
  5. from flask_security import current_user
  6. from werkzeug.exceptions import NotFound
  7. from sqlalchemy import select
  8. from sqlalchemy.sql.expression import or_
  9. from flexmeasures.auth.policy import check_access
  10. from flexmeasures.data import db
  11. from flexmeasures import Asset
  12. from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
  13. from flexmeasures.data.models.user import Account
  14. from flexmeasures.data.models.time_series import Sensor
  15. from flexmeasures.ui.views.api_wrapper import InternalApi
  16. from flexmeasures.utils.unit_utils import (
  17. is_energy_price_unit,
  18. is_energy_unit,
  19. is_power_unit,
  20. )
  21. from flexmeasures.ui.utils.view_utils import svg_asset_icon_name
  22. def get_asset_by_id_or_raise_notfound(asset_id: str) -> GenericAsset:
  23. """find an show existing asset or raise NotFound"""
  24. if not str(asset_id).isdigit():
  25. raise NotFound
  26. asset = db.session.query(GenericAsset).filter_by(id=asset_id).first()
  27. if asset is None:
  28. raise NotFound
  29. return asset
  30. def get_allowed_price_sensor_data(account_id: Optional[int]) -> Dict[int, str]:
  31. """
  32. Return a list of sensors which the user can add
  33. as consumption_price_sensor_id or production_price_sensor_id.
  34. For each sensor we get data as sensor_id: asset_name:sensor_name.
  35. # todo: this function seem obsolete
  36. """
  37. if not account_id:
  38. assets = db.session.scalars(
  39. select(GenericAsset).filter(GenericAsset.account_id.is_(None))
  40. ).all()
  41. else:
  42. assets = db.session.scalars(
  43. select(GenericAsset).filter(
  44. or_(
  45. GenericAsset.account_id == account_id,
  46. GenericAsset.account_id.is_(None),
  47. )
  48. )
  49. ).all()
  50. sensors_data = list()
  51. for asset in assets:
  52. sensors_data += [
  53. (sensor.id, asset.name, sensor.name, sensor.unit)
  54. for sensor in asset.sensors
  55. ]
  56. return {
  57. sensor_id: f"{asset_name}:{sensor_name}"
  58. for sensor_id, asset_name, sensor_name, sensor_unit in sensors_data
  59. if is_energy_price_unit(sensor_unit)
  60. }
  61. def get_allowed_inflexible_sensor_data(account_id: Optional[int]) -> Dict[int, str]:
  62. """
  63. Return a list of sensors which the user can add
  64. as inflexible device sensors.
  65. This list is built using sensors with energy or power units
  66. within the current account (or among public assets when account_id argument is not specified).
  67. For each sensor we get data as sensor_id: asset_name:sensor_name.
  68. # todo: this function seem obsolete
  69. """
  70. query = None
  71. if not account_id:
  72. query = select(GenericAsset).filter(GenericAsset.account_id.is_(None))
  73. else:
  74. query = select(GenericAsset).filter(GenericAsset.account_id == account_id)
  75. assets = db.session.scalars(query).all()
  76. sensors_data = list()
  77. for asset in assets:
  78. sensors_data += [
  79. (sensor.id, asset.name, sensor.name, sensor.unit)
  80. for sensor in asset.sensors
  81. ]
  82. return {
  83. sensor_id: f"{asset_name}:{sensor_name}"
  84. for sensor_id, asset_name, sensor_name, sensor_unit in sensors_data
  85. if is_energy_unit(sensor_unit) or is_power_unit(sensor_unit)
  86. }
  87. def process_internal_api_response(
  88. asset_data: dict, asset_id: int | None = None, make_obj=False
  89. ) -> GenericAsset | dict:
  90. """
  91. Turn data from the internal API into something we can use to further populate the UI.
  92. Either as an asset object or a dict for form filling.
  93. If we add other data by querying the database, we make sure the asset is not in the session afterwards.
  94. """
  95. def expunge_asset():
  96. # use if no insert is wanted from a previous query which flushes its results
  97. if asset in db.session:
  98. db.session.expunge(asset)
  99. asset_data.pop("status", None) # might have come from requests.response
  100. if asset_id:
  101. asset_data["id"] = asset_id
  102. if make_obj:
  103. children = asset_data.pop("child_assets", [])
  104. asset_data.pop("sensors", [])
  105. asset_data.pop("owner", None)
  106. asset_type = asset_data.pop("generic_asset_type", {})
  107. asset = GenericAsset(
  108. **{
  109. **asset_data,
  110. **{"attributes": json.loads(asset_data.get("attributes", "{}"))},
  111. **{"flex_context": json.loads(asset_data.get("flex_context", "{}"))},
  112. **{
  113. "sensors_to_show": json.loads(
  114. asset_data.get("sensors_to_show", "[]")
  115. )
  116. },
  117. }
  118. ) # TODO: use schema?
  119. if "generic_asset_type_id" in asset_data:
  120. asset.generic_asset_type = db.session.get(
  121. GenericAssetType, asset_data["generic_asset_type_id"]
  122. )
  123. else:
  124. asset.generic_asset_type = db.session.get(
  125. GenericAssetType, asset_type.get("id", None)
  126. )
  127. expunge_asset()
  128. asset.owner = db.session.get(Account, asset_data["account_id"])
  129. expunge_asset()
  130. db.session.flush()
  131. if "id" in asset_data:
  132. asset.sensors = db.session.scalars(
  133. select(Sensor).filter_by(generic_asset_id=asset_data["id"])
  134. ).all()
  135. expunge_asset()
  136. if asset_data.get("parent_asset_id", None) is not None:
  137. asset.parent_asset = db.session.execute(
  138. select(GenericAsset).filter(
  139. GenericAsset.id == asset_data["parent_asset_id"]
  140. )
  141. ).scalar_one_or_none()
  142. expunge_asset()
  143. child_assets = []
  144. for child in children:
  145. if "child_assets" in child:
  146. # not deeper than one level
  147. child.pop("child_assets")
  148. child_asset = process_internal_api_response(child, child["id"], True)
  149. child_assets.append(child_asset)
  150. asset.child_assets = child_assets
  151. expunge_asset()
  152. return asset
  153. return asset_data
  154. def user_can_create_assets() -> bool:
  155. try:
  156. check_access(current_user.account, "create-children")
  157. except Exception:
  158. return False
  159. return True
  160. def user_can_delete(asset) -> bool:
  161. try:
  162. check_access(asset, "delete")
  163. except Exception:
  164. return False
  165. return True
  166. def user_can_update(asset) -> bool:
  167. try:
  168. check_access(asset, "update")
  169. except Exception:
  170. return False
  171. return True
  172. def get_assets_by_account(account_id: int | str | None) -> list[GenericAsset]:
  173. if account_id is not None:
  174. get_assets_response = InternalApi().get(
  175. url_for("AssetAPI:index"), query={"account_id": account_id}
  176. )
  177. else:
  178. get_assets_response = InternalApi().get(url_for("AssetAPI:public"))
  179. return [
  180. process_internal_api_response(ad, make_obj=True)
  181. for ad in get_assets_response.json()
  182. ]
  183. def serialize_asset(asset: Asset, is_head=False) -> dict:
  184. serialized_asset = {
  185. "name": asset.name,
  186. "id": asset.id,
  187. "asset_type": asset.generic_asset_type.name,
  188. "link": url_for("AssetCrudUI:get", id=asset.id),
  189. "icon": svg_asset_icon_name(asset.generic_asset_type.name),
  190. "tooltip": {
  191. "name": asset.name,
  192. "ID": asset.id,
  193. "Asset Type": asset.generic_asset_type.name,
  194. },
  195. "sensors": [
  196. {
  197. "name": sensor.name,
  198. "unit": sensor.unit,
  199. "link": url_for("SensorUI:get", id=sensor.id),
  200. }
  201. for sensor in asset.sensors
  202. ],
  203. }
  204. if asset.parent_asset and not is_head:
  205. serialized_asset["parent"] = asset.parent_asset.id
  206. return serialized_asset
  207. def get_list_assets_chart(
  208. asset: Asset,
  209. base_asset: Asset,
  210. parent_depth=0,
  211. child_depth=0,
  212. look_for_child=False,
  213. is_head=False,
  214. ) -> list[dict]:
  215. """
  216. Recursively builds a tree of assets from a given asset and its parents and children up to a certain depth.
  217. Args:
  218. asset: The asset to start the recursion from
  219. base_asset: The asset that is the base of the chart
  220. parent_depth: The current depth of the parents hierarchy
  221. child_depth: The current depth of the children hierarchy
  222. look_for_child: If True, start looking for children of the current asset
  223. is_head: If True, the current asset is the head of the chart
  224. Returns:
  225. A list of dictionaries representing the assets in the tree
  226. """
  227. assets = list()
  228. asset_def = serialize_asset(asset, is_head=is_head)
  229. # Fetch parents if there is parent asset and parent_depth is less than 2
  230. if asset.parent_asset and parent_depth < 2 and not look_for_child:
  231. parent_depth += 1
  232. assets += get_list_assets_chart(
  233. asset=asset.parent_asset,
  234. base_asset=base_asset,
  235. parent_depth=parent_depth,
  236. is_head=False if parent_depth < 2 else True,
  237. )
  238. else:
  239. look_for_child = True
  240. parent_depth = (
  241. 2 # Auto increase depth in the parents hierarchy is less than two
  242. )
  243. assets.append(asset_def)
  244. if look_for_child and child_depth < 2:
  245. child_depth += 1
  246. for child in base_asset.child_assets:
  247. assets += get_list_assets_chart(
  248. child,
  249. parent_depth=parent_depth,
  250. child_depth=child_depth,
  251. base_asset=child,
  252. )
  253. return assets
  254. def add_child_asset(asset: Asset, assets: list) -> list:
  255. """
  256. Add a child asset to the current assets list.
  257. This function is used to add a child asset to the current asset in the list of assets.
  258. Args:
  259. asset: The current asset to be used as parent
  260. assets: The list of assets
  261. """
  262. # Add Extra node to the current asset
  263. new_child_asset = {
  264. "name": "Add Child Asset",
  265. "id": "new",
  266. "asset_type": asset.generic_asset_type.name,
  267. "link": url_for("AssetCrudUI:post", id="new", parent_asset_id=asset.id),
  268. "icon": svg_asset_icon_name("add_asset"),
  269. "tooltip": "Click here to add a child asset.",
  270. "sensors": [],
  271. "parent": asset.id,
  272. }
  273. assets.append(new_child_asset)
  274. return assets