generic_assets.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. from __future__ import annotations
  2. from itertools import groupby
  3. from flask_login import current_user
  4. from sqlalchemy import select, Select, or_, and_, union_all
  5. from sqlalchemy.orm import aliased
  6. from flexmeasures.data import db
  7. from flexmeasures.auth.policy import user_has_admin_access
  8. from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
  9. from flexmeasures.data.models.user import Account
  10. from flexmeasures.data.queries.utils import potentially_limit_assets_query_to_account
  11. from flexmeasures.utils.flexmeasures_inflection import pluralize
  12. def query_assets_by_type(
  13. type_names: list[str] | str,
  14. account_id: int | None = None,
  15. query: Select | None = None,
  16. ) -> Select:
  17. """
  18. Return a query which looks for GenericAssets by their type.
  19. :param type_names: Pass in a list of type names or only one type name.
  20. :param account_id: Pass in an account ID if you want to query an account other than your own. This only works for admins. Public assets are always queried.
  21. :param query: Pass in an existing Query object if you have one.
  22. """
  23. if not query:
  24. query = select(GenericAsset)
  25. query = query.join(GenericAssetType).filter(
  26. GenericAsset.generic_asset_type_id == GenericAssetType.id
  27. )
  28. if isinstance(type_names, str):
  29. query = query.filter(GenericAssetType.name == type_names)
  30. else:
  31. query = query.filter(GenericAssetType.name.in_(type_names))
  32. query = potentially_limit_assets_query_to_account(query, account_id)
  33. return query
  34. def get_location_queries(
  35. account_id: int | None = None,
  36. ) -> dict[str, Select[tuple[GenericAsset]]]:
  37. """
  38. Make queries for grouping assets by location.
  39. We group EVSE assets by location (if they share a location, they belong to the same Charge Point)
  40. Like get_asset_group_queries, the values in the returned dict still need an executive call, like all(), count() or first(). Note that this function will still load and inspect assets to do its job.
  41. The Charge Points are named on the basis of the first EVSE in their list,
  42. using either the whole EVSE name or that part that comes before a " -" delimiter. For example:
  43. If:
  44. evse_name = "Seoul Hilton - charger 1"
  45. Then:
  46. charge_point_name = "Seoul Hilton (Charge Point)"
  47. A Charge Point is a special case. If all assets on a location are of type EVSE,
  48. we can call the location a "Charge Point".
  49. :param account_id: Pass in an account ID if you want to query an account other than your own. This only works for admins. Public assets are always queried.
  50. """
  51. asset_queries = {}
  52. all_assets = db.session.scalars(
  53. potentially_limit_assets_query_to_account(select(GenericAsset), account_id)
  54. ).all()
  55. loc_groups = group_assets_by_location(all_assets)
  56. for loc_group in loc_groups:
  57. if len(loc_group) == 1:
  58. continue
  59. location_type = "(Location)"
  60. if all(
  61. [
  62. asset.asset_type.name in ["one-way_evse", "two-way_evse"]
  63. for asset in loc_group
  64. ]
  65. ):
  66. location_type = "(Charge Point)"
  67. location_name = f"{loc_group[0].name.split(' -')[0]} {location_type}"
  68. location_query = select(GenericAsset).filter(
  69. GenericAsset.name.in_([asset.name for asset in loc_group])
  70. )
  71. asset_queries[location_name] = potentially_limit_assets_query_to_account(
  72. location_query, account_id
  73. )
  74. return asset_queries
  75. def group_assets_by_location(
  76. asset_list: list[GenericAsset],
  77. ) -> list[list[GenericAsset]]:
  78. groups = []
  79. def key_function(x):
  80. return x.location if x.location else ()
  81. sorted_asset_list = sorted(asset_list, key=key_function)
  82. for _k, g in groupby(sorted_asset_list, key=key_function):
  83. groups.append(list(g))
  84. return groups
  85. def get_asset_group_queries(
  86. group_by_type: bool = True,
  87. group_by_account: bool = False,
  88. group_by_location: bool = False,
  89. custom_aggregate_type_groups: dict[str, list[str]] | None = None,
  90. ) -> dict[str, Select]:
  91. """
  92. An asset group is defined by Asset queries, which this function can generate.
  93. Each query has a name (for the asset group it represents).
  94. These queries still need an executive call, like all(), count() or first().
  95. This function limits the assets to be queried to the current user's account,
  96. if the user is not an admin.
  97. Note: Make sure the current user has the "read" permission on their account (on GenericAsset.__class__?? See https://github.com/FlexMeasures/flexmeasures/issues/200) or is an admin.
  98. :param group_by_type: If True, groups will be made for assets with the same type. We prefer pluralised group names here. Defaults to True.
  99. :param group_by_account: If True, groups will be made for assets within the same account. This makes sense for admins, as they can query across accounts.
  100. :param group_by_location: If True, groups will be made for assets at the same location. Naming of the location currently supports charge points (for EVSEs).
  101. :param custom_aggregate_type_groups: dict of asset type groupings (mapping group names to names of asset types). See also the setting FLEXMEASURES_ASSET_TYPE_GROUPS.
  102. """
  103. asset_queries = {}
  104. # 1. Custom asset groups by combinations of asset types
  105. asset_types_to_remove = []
  106. if custom_aggregate_type_groups:
  107. for asset_type_group_name, asset_types in custom_aggregate_type_groups.items():
  108. asset_queries[asset_type_group_name] = query_assets_by_type(asset_types)
  109. # Remember subgroups
  110. asset_types_to_remove += asset_types
  111. # 2. Include a group per asset type - using the pluralised asset type name
  112. if group_by_type:
  113. for asset_type in db.session.scalars(select(GenericAssetType)).all():
  114. # Add asset type as a group if not already covered by custom group
  115. if asset_type.name not in asset_types_to_remove:
  116. asset_queries[pluralize(asset_type.name)] = query_assets_by_type(
  117. asset_type.name
  118. )
  119. # 3. Include a group per account (admins only) # TODO: we can later adjust this for accounts who admin certain others, not all
  120. if group_by_account and user_has_admin_access(current_user, "read"):
  121. for account in db.session.scalars(select(Account)).all():
  122. asset_queries[account.name] = select(GenericAsset).filter_by(
  123. account_id=account.id
  124. )
  125. # 4. Finally, we can group assets by location
  126. if group_by_location:
  127. asset_queries.update(get_location_queries())
  128. return asset_queries
  129. def query_assets_by_search_terms(
  130. search_terms: list[str] | None,
  131. filter_statement: bool = True,
  132. sort_by: str | None = None,
  133. sort_dir: str | None = None,
  134. ) -> Select:
  135. select_statement = select(GenericAsset)
  136. valid_sort_columns = {
  137. "id": GenericAsset.id,
  138. "name": GenericAsset.name,
  139. "owner": GenericAsset.account_id,
  140. }
  141. # Initialize base query
  142. query = select_statement
  143. if search_terms is not None:
  144. private_select_statement = select_statement.join(
  145. Account, Account.id == GenericAsset.account_id
  146. )
  147. private_filter_statement = filter_statement & and_(
  148. *(
  149. or_(
  150. GenericAsset.name.ilike(f"%{term}%"),
  151. Account.name.ilike(f"%{term}%"),
  152. )
  153. for term in search_terms
  154. )
  155. )
  156. public_select_statement = select_statement
  157. public_filter_statement = (
  158. filter_statement
  159. & GenericAsset.account_id.is_(None)
  160. & and_(GenericAsset.name.ilike(f"%{term}%") for term in search_terms)
  161. )
  162. if sort_by is not None and sort_dir is not None:
  163. if sort_by in valid_sort_columns:
  164. order_by_clause = (
  165. valid_sort_columns[sort_by].asc()
  166. if sort_dir == "asc"
  167. else valid_sort_columns[sort_by].desc()
  168. )
  169. private_select_statement = private_select_statement.order_by(
  170. order_by_clause
  171. )
  172. public_select_statement = public_select_statement.order_by(
  173. order_by_clause
  174. )
  175. # Combine private and public queries
  176. subquery = union_all(
  177. private_select_statement.where(private_filter_statement),
  178. public_select_statement.where(public_filter_statement),
  179. ).subquery()
  180. asset_alias = aliased(GenericAsset, subquery)
  181. query = select(asset_alias)
  182. else:
  183. query = query.where(filter_statement)
  184. if sort_by is not None and sort_dir is not None:
  185. if sort_by in valid_sort_columns:
  186. order_by_clause = (
  187. valid_sort_columns[sort_by].asc()
  188. if sort_dir == "asc"
  189. else valid_sort_columns[sort_by].desc()
  190. )
  191. query = query.order_by(order_by_clause)
  192. return query