"""Utilities for views"""
from __future__ import annotations
from functools import wraps
import json
import os
import subprocess
from sqlalchemy import select
from flask import render_template, request, session, current_app
from flask_security.core import current_user
from flexmeasures.data import db
from flexmeasures import __version__ as flexmeasures_version
from flexmeasures.auth.policy import user_has_admin_access
from flexmeasures.ui.utils.breadcrumb_utils import get_breadcrumb_info
from flexmeasures.utils import time_utils
from flexmeasures.ui import flexmeasures_ui
from flexmeasures.data.models.user import User, Account
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.ui.utils.chart_defaults import chart_options
from flexmeasures.ui.utils.color_defaults import get_color_settings
def fall_back_to_flask_template(render_function):
"""In case the render_function is raising an error, fall back to using flask.render_template."""
@wraps(render_function)
def wrapper(template_name, *args, **kwargs):
try:
return render_function(template_name, *args, **kwargs)
except Exception as e:
current_app.logger.warning(
f"""Rendering via Flask's render_template("{template_name}"). """
f"""Failed to render via {render_function.__name__}("{template_name}") due to {e}."""
)
return render_template(template_name, **kwargs)
return wrapper
@fall_back_to_flask_template
def render_flexmeasures_template(html_filename: str, **variables):
"""Render template and add all expected template variables, plus the ones given as **variables."""
variables["FLEXMEASURES_ENFORCE_SECURE_CONTENT_POLICY"] = current_app.config.get(
"FLEXMEASURES_ENFORCE_SECURE_CONTENT_POLICY"
)
variables["documentation_exists"] = False
if os.path.exists(
"%s/static/documentation/html/index.html" % flexmeasures_ui.root_path
):
variables["documentation_exists"] = True
# use event_starts_after and event_ends_before from session if not given
variables["event_starts_after"] = variables.get(
"event_starts_after"
) or session.get("event_starts_after")
variables["event_ends_before"] = variables.get("event_ends_before") or session.get(
"event_ends_before"
)
variables["chart_type"] = session.get("chart_type", "bar_chart")
variables["page"] = html_filename.split("/")[-1].replace(".html", "")
variables["resolution"] = session.get("resolution", "")
variables["flexmeasures_version"] = flexmeasures_version
(
variables["git_version"],
variables["git_commits_since"],
variables["git_hash"],
) = get_git_description()
app_start_time = current_app.config.get("START_TIME")
variables["app_running_since"] = time_utils.naturalized_datetime_str(app_start_time)
variables["loaded_plugins"] = ", ".join(
f"{p_name} (v{p_version})"
for p_name, p_version in current_app.config.get("LOADED_PLUGINS", {}).items()
)
variables["user_is_logged_in"] = current_user.is_authenticated
variables["user_is_admin"] = user_has_admin_access(current_user, "update")
variables["user_has_admin_reader_rights"] = user_has_admin_access(
current_user, "read"
)
variables["user_is_anonymous"] = (
current_user.is_authenticated and current_user.has_role("anonymous")
)
variables["user_email"] = current_user.is_authenticated and current_user.email or ""
variables["user_name"] = (
current_user.is_authenticated and current_user.username or ""
)
variables["js_versions"] = current_app.config.get("FLEXMEASURES_JS_VERSIONS")
# Chart options passed to vega-embed
options = chart_options.copy()
if "sensor_id" in variables:
options["downloadFileName"] = f"sensor-{variables['sensor_id']}"
elif "asset" in variables:
asset = variables["asset"]
options["downloadFileName"] = f"asset-{asset.id}-{asset.name}"
variables["chart_options"] = json.dumps(options)
account: Account | None = (
current_user.account if current_user.is_authenticated else None
)
# check if user/consultant has logo_url set
if account:
variables["menu_logo"] = (
account.logo_url
or (account.consultancy_account and account.consultancy_account.logo_url)
or current_app.config.get("FLEXMEASURES_MENU_LOGO_PATH")
)
else:
variables["menu_logo"] = current_app.config.get("FLEXMEASURES_MENU_LOGO_PATH")
variables["extra_css"] = current_app.config.get("FLEXMEASURES_EXTRA_CSS_PATH")
if "asset" in variables:
current_page = variables.get("current_page")
variables["breadcrumb_info"] = get_breadcrumb_info(
asset, current_page=current_page
)
variables.update(get_color_settings(account)) # add color settings to variables
return render_template(html_filename, **variables)
def clear_session(keys_to_clear: list[str] = None):
"""
Clear out session variables.
If keys_to_clear is provided, only clear out those specific session variables.
Otherwise, clear out all session variables except for some special ones
(e.g. Flask-Security's, CSRF token, and our own session variables).
"""
if keys_to_clear:
for skey in keys_to_clear:
if skey not in session:
continue
current_app.logger.info(
"Removing %s:%s from session ... " % (skey, session[skey])
)
del session[skey]
else:
for skey in [
k
for k in session.keys()
if k not in ("_fresh", "_id", "_user_id", "csrf_token", "fs_cc", "fs_paa")
]:
current_app.logger.info(
"Removing %s:%s from session ... " % (skey, session[skey])
)
del session[skey]
def set_session_variables(*var_names: str):
"""Store request values as session variables, for a consistent UX across UI page loads.
>>> set_session_variables("event_starts_after", "event_ends_before", "chart_type")
"""
for var_name in var_names:
var = request.values.get(var_name)
if var is not None:
session[var_name] = var
def get_git_description() -> tuple[str, int, str]:
"""
Get information about the SCM (git) state if possible (if a .git directory exists).
Returns the latest git version (tag) as a string, the number of commits since then as an int and the
current commit hash as string.
"""
def _minimal_ext_cmd(cmd: list):
# construct minimal environment
env = {}
for k in ["SYSTEMROOT", "PATH"]:
v = os.environ.get(k)
if v is not None:
env[k] = v
# LANGUAGE is used on win32
env["LANGUAGE"] = "C"
env["LANG"] = "C"
env["LC_ALL"] = "C"
return subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env).communicate()[0]
version = "Unknown"
commits_since = 0
sha = "Unknown"
path_to_flexmeasures_root = os.path.join(
os.path.dirname(__file__), "..", "..", ".."
)
if os.path.exists(os.path.join(path_to_flexmeasures_root, ".git")):
commands = ["git", "describe", "--always", "--long"]
try:
git_output = _minimal_ext_cmd(commands)
components = git_output.strip().decode("ascii").split("-")
if not (len(components) == 1 and components[0] == ""):
sha = components.pop()
if len(components) > 0:
commits_since = int(components.pop())
version = "-".join(components)
except OSError as ose:
current_app.logger.warning("Problem when reading git describe: %s" % ose)
return version, commits_since, sha
ICON_MAPPING = {
# site structure
"evse": "icon-charging_station",
"charge point": "icon-charging_station",
"project": "icon-calculator",
"tariff": "icon-time",
"renewables": "icon-wind",
"site": "icon-empty-marker",
"scenario": "icon-binoculars",
# weather
"irradiance": "wi wi-horizon-alt",
"temperature": "wi wi-thermometer",
"wind direction": "wi wi-wind-direction",
"wind speed": "wi wi-strong-wind",
}
SVG_ICON_MAPPING = {
# site structure
"building": "https://api.iconify.design/mdi/home-city.svg",
"battery": "https://api.iconify.design/mdi/battery.svg",
"simulation": "https://api.iconify.design/mdi/home-city.svg",
"site": "https://api.iconify.design/mdi/map-marker-outline.svg",
"scenario": "https://api.iconify.design/mdi/binoculars.svg",
"pv": "https://api.iconify.design/wi/day-sunny.svg",
"solar": "https://api.iconify.design/wi/day-sunny.svg",
"chargepoint": "https://api.iconify.design/material-symbols/ev-station-outline.svg",
"ev": "https://api.iconify.design/material-symbols/ev-station-outline.svg",
"add_asset": "https://api.iconify.design/material-symbols/add-rounded.svg?color=white", # Plus Icon for Add Asset
}
def asset_icon_name(asset_type_name: str) -> str:
"""Icon name for this asset type.
This can be used for UI html templates made with Jinja.
ui.__init__ makes this function available as the filter "asset_icon".
For example:
becomes (for a battery):
"""
if asset_type_name:
asset_type_name = asset_type_name.lower()
return ICON_MAPPING.get(asset_type_name, f"icon-{asset_type_name}")
def svg_asset_icon_name(asset_type_name: str) -> str:
if asset_type_name:
asset_type_name = asset_type_name.split(".")[-1].lower()
return SVG_ICON_MAPPING.get(
asset_type_name, "https://api.iconify.design/fa-solid/question-circle.svg"
)
def username(user_id) -> str:
user = db.session.get(User, user_id)
if user is None:
current_app.logger.warning(f"Could not find user with id {user_id}")
return ""
else:
return user.username
def accountname(account_id) -> str:
account = db.session.get(Account, account_id)
if account is None:
current_app.logger.warning(f"Could not find account with id {account_id}")
return ""
else:
return account.name
def available_units() -> list[str]:
"""
Return a list of all available units from sensors currently in the database.
"""
units = db.session.execute(select(Sensor.unit).distinct()).all()
return [unit[0] for unit in units]