view_utils.py 10 KB


  1. """Utilities for views"""
  2. from __future__ import annotations
  3. from functools import wraps
  4. import json
  5. import os
  6. import subprocess
  7. from sqlalchemy import select
  8. from flask import render_template, request, session, current_app
  9. from flask_security.core import current_user
  10. from flexmeasures.data import db
  11. from flexmeasures import __version__ as flexmeasures_version
  12. from flexmeasures.auth.policy import user_has_admin_access
  13. from flexmeasures.ui.utils.breadcrumb_utils import get_breadcrumb_info
  14. from flexmeasures.utils import time_utils
  15. from flexmeasures.ui import flexmeasures_ui
  16. from flexmeasures.data.models.user import User, Account
  17. from flexmeasures.data.models.time_series import Sensor
  18. from flexmeasures.ui.utils.chart_defaults import chart_options
  19. from flexmeasures.ui.utils.color_defaults import get_color_settings
  20. def fall_back_to_flask_template(render_function):
  21. """In case the render_function is raising an error, fall back to using flask.render_template."""
  22. @wraps(render_function)
  23. def wrapper(template_name, *args, **kwargs):
  24. try:
  25. return render_function(template_name, *args, **kwargs)
  26. except Exception as e:
  27. current_app.logger.warning(
  28. f"""Rendering via Flask's render_template("{template_name}"). """
  29. f"""Failed to render via {render_function.__name__}("{template_name}") due to {e}."""
  30. )
  31. return render_template(template_name, **kwargs)
  32. return wrapper
  33. @fall_back_to_flask_template
  34. def render_flexmeasures_template(html_filename: str, **variables):
  35. """Render template and add all expected template variables, plus the ones given as **variables."""
  36. variables["FLEXMEASURES_ENFORCE_SECURE_CONTENT_POLICY"] = current_app.config.get(
  37. "FLEXMEASURES_ENFORCE_SECURE_CONTENT_POLICY"
  38. )
  39. variables["documentation_exists"] = False
  40. if os.path.exists(
  41. "%s/static/documentation/html/index.html" % flexmeasures_ui.root_path
  42. ):
  43. variables["documentation_exists"] = True
  44. # use event_starts_after and event_ends_before from session if not given
  45. variables["event_starts_after"] = variables.get(
  46. "event_starts_after"
  47. ) or session.get("event_starts_after")
  48. variables["event_ends_before"] = variables.get("event_ends_before") or session.get(
  49. "event_ends_before"
  50. )
  51. variables["chart_type"] = session.get("chart_type", "bar_chart")
  52. variables["page"] = html_filename.split("/")[-1].replace(".html", "")
  53. variables["resolution"] = session.get("resolution", "")
  54. variables["flexmeasures_version"] = flexmeasures_version
  55. (
  56. variables["git_version"],
  57. variables["git_commits_since"],
  58. variables["git_hash"],
  59. ) = get_git_description()
  60. app_start_time = current_app.config.get("START_TIME")
  61. variables["app_running_since"] = time_utils.naturalized_datetime_str(app_start_time)
  62. variables["loaded_plugins"] = ", ".join(
  63. f"{p_name} (v{p_version})"
  64. for p_name, p_version in current_app.config.get("LOADED_PLUGINS", {}).items()
  65. )
  66. variables["user_is_logged_in"] = current_user.is_authenticated
  67. variables["user_is_admin"] = user_has_admin_access(current_user, "update")
  68. variables["user_has_admin_reader_rights"] = user_has_admin_access(
  69. current_user, "read"
  70. )
  71. variables["user_is_anonymous"] = (
  72. current_user.is_authenticated and current_user.has_role("anonymous")
  73. )
  74. variables["user_email"] = current_user.is_authenticated and current_user.email or ""
  75. variables["user_name"] = (
  76. current_user.is_authenticated and current_user.username or ""
  77. )
  78. variables["js_versions"] = current_app.config.get("FLEXMEASURES_JS_VERSIONS")
  79. # Chart options passed to vega-embed
  80. options = chart_options.copy()
  81. if "sensor_id" in variables:
  82. options["downloadFileName"] = f"sensor-{variables['sensor_id']}"
  83. elif "asset" in variables:
  84. asset = variables["asset"]
  85. options["downloadFileName"] = f"asset-{asset.id}-{asset.name}"
  86. variables["chart_options"] = json.dumps(options)
  87. account: Account | None = (
  88. current_user.account if current_user.is_authenticated else None
  89. )
  90. # check if user/consultant has logo_url set
  91. if account:
  92. variables["menu_logo"] = (
  93. account.logo_url
  94. or (account.consultancy_account and account.consultancy_account.logo_url)
  95. or current_app.config.get("FLEXMEASURES_MENU_LOGO_PATH")
  96. )
  97. else:
  98. variables["menu_logo"] = current_app.config.get("FLEXMEASURES_MENU_LOGO_PATH")
  99. variables["extra_css"] = current_app.config.get("FLEXMEASURES_EXTRA_CSS_PATH")
  100. if "asset" in variables:
  101. current_page = variables.get("current_page")
  102. variables["breadcrumb_info"] = get_breadcrumb_info(
  103. asset, current_page=current_page
  104. )
  105. variables.update(get_color_settings(account)) # add color settings to variables
  106. return render_template(html_filename, **variables)
  107. def clear_session(keys_to_clear: list[str] = None):
  108. """
  109. Clear out session variables.
  110. If keys_to_clear is provided, only clear out those specific session variables.
  111. Otherwise, clear out all session variables except for some special ones
  112. (e.g. Flask-Security's, CSRF token, and our own session variables).
  113. """
  114. if keys_to_clear:
  115. for skey in keys_to_clear:
  116. if skey not in session:
  117. continue
  118. current_app.logger.info(
  119. "Removing %s:%s from session ... " % (skey, session[skey])
  120. )
  121. del session[skey]
  122. else:
  123. for skey in [
  124. k
  125. for k in session.keys()
  126. if k not in ("_fresh", "_id", "_user_id", "csrf_token", "fs_cc", "fs_paa")
  127. ]:
  128. current_app.logger.info(
  129. "Removing %s:%s from session ... " % (skey, session[skey])
  130. )
  131. del session[skey]
  132. def set_session_variables(*var_names: str):
  133. """Store request values as session variables, for a consistent UX across UI page loads.
  134. >>> set_session_variables("event_starts_after", "event_ends_before", "chart_type")
  135. """
  136. for var_name in var_names:
  137. var = request.values.get(var_name)
  138. if var is not None:
  139. session[var_name] = var
  140. def get_git_description() -> tuple[str, int, str]:
  141. """
  142. Get information about the SCM (git) state if possible (if a .git directory exists).
  143. Returns the latest git version (tag) as a string, the number of commits since then as an int and the
  144. current commit hash as string.
  145. """
  146. def _minimal_ext_cmd(cmd: list):
  147. # construct minimal environment
  148. env = {}
  149. for k in ["SYSTEMROOT", "PATH"]:
  150. v = os.environ.get(k)
  151. if v is not None:
  152. env[k] = v
  153. # LANGUAGE is used on win32
  154. env["LANGUAGE"] = "C"
  155. env["LANG"] = "C"
  156. env["LC_ALL"] = "C"
  157. return subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env).communicate()[0]
  158. version = "Unknown"
  159. commits_since = 0
  160. sha = "Unknown"
  161. path_to_flexmeasures_root = os.path.join(
  162. os.path.dirname(__file__), "..", "..", ".."
  163. )
  164. if os.path.exists(os.path.join(path_to_flexmeasures_root, ".git")):
  165. commands = ["git", "describe", "--always", "--long"]
  166. try:
  167. git_output = _minimal_ext_cmd(commands)
  168. components = git_output.strip().decode("ascii").split("-")
  169. if not (len(components) == 1 and components[0] == ""):
  170. sha = components.pop()
  171. if len(components) > 0:
  172. commits_since = int(components.pop())
  173. version = "-".join(components)
  174. except OSError as ose:
  175. current_app.logger.warning("Problem when reading git describe: %s" % ose)
  176. return version, commits_since, sha
  177. ICON_MAPPING = {
  178. # site structure
  179. "evse": "icon-charging_station",
  180. "charge point": "icon-charging_station",
  181. "project": "icon-calculator",
  182. "tariff": "icon-time",
  183. "renewables": "icon-wind",
  184. "site": "icon-empty-marker",
  185. "scenario": "icon-binoculars",
  186. # weather
  187. "irradiance": "wi wi-horizon-alt",
  188. "temperature": "wi wi-thermometer",
  189. "wind direction": "wi wi-wind-direction",
  190. "wind speed": "wi wi-strong-wind",
  191. }
  192. SVG_ICON_MAPPING = {
  193. # site structure
  194. "building": "https://api.iconify.design/mdi/home-city.svg",
  195. "battery": "https://api.iconify.design/mdi/battery.svg",
  196. "simulation": "https://api.iconify.design/mdi/home-city.svg",
  197. "site": "https://api.iconify.design/mdi/map-marker-outline.svg",
  198. "scenario": "https://api.iconify.design/mdi/binoculars.svg",
  199. "pv": "https://api.iconify.design/wi/day-sunny.svg",
  200. "solar": "https://api.iconify.design/wi/day-sunny.svg",
  201. "chargepoint": "https://api.iconify.design/material-symbols/ev-station-outline.svg",
  202. "ev": "https://api.iconify.design/material-symbols/ev-station-outline.svg",
  203. "add_asset": "https://api.iconify.design/material-symbols/add-rounded.svg?color=white", # Plus Icon for Add Asset
  204. }
  205. def asset_icon_name(asset_type_name: str) -> str:
  206. """Icon name for this asset type.
  207. This can be used for UI html templates made with Jinja.
  208. ui.__init__ makes this function available as the filter "asset_icon".
  209. For example:
  210. <i class={{ asset_type.name | asset_icon }}></i>
  211. becomes (for a battery):
  212. <i class="icon-battery"></i>
  213. """
  214. if asset_type_name:
  215. asset_type_name = asset_type_name.lower()
  216. return ICON_MAPPING.get(asset_type_name, f"icon-{asset_type_name}")
  217. def svg_asset_icon_name(asset_type_name: str) -> str:
  218. if asset_type_name:
  219. asset_type_name = asset_type_name.split(".")[-1].lower()
  220. return SVG_ICON_MAPPING.get(
  221. asset_type_name, "https://api.iconify.design/fa-solid/question-circle.svg"
  222. )
  223. def username(user_id) -> str:
  224. user = db.session.get(User, user_id)
  225. if user is None:
  226. current_app.logger.warning(f"Could not find user with id {user_id}")
  227. return ""
  228. else:
  229. return user.username
  230. def accountname(account_id) -> str:
  231. account = db.session.get(Account, account_id)
  232. if account is None:
  233. current_app.logger.warning(f"Could not find account with id {account_id}")
  234. return ""
  235. else:
  236. return account.name
  237. def available_units() -> list[str]:
  238. """
  239. Return a list of all available units from sensors currently in the database.
  240. """
  241. units = db.session.execute(select(Sensor.unit).distinct()).all()
  242. return [unit[0] for unit in units]