accounts.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from __future__ import annotations
  2. from sqlalchemy import select
  3. from werkzeug.exceptions import Forbidden, Unauthorized
  4. from flask import request, url_for
  5. from flask_classful import FlaskView
  6. from flask_security import login_required
  7. from flask_security.core import current_user
  8. from flexmeasures.auth.policy import user_has_admin_access, check_access
  9. from flexmeasures.ui.views.api_wrapper import InternalApi
  10. from flexmeasures.ui.utils.view_utils import render_flexmeasures_template
  11. from flexmeasures.data.models.audit_log import AuditLog
  12. from flexmeasures.data.models.user import Account
  13. from flexmeasures.data import db
  14. def get_accounts() -> list[dict]:
  15. """/accounts"""
  16. accounts_response = InternalApi().get(url_for("AccountAPI:index"))
  17. accounts = accounts_response.json()
  18. return accounts
  19. def get_account(account_id: str) -> dict:
  20. account_response = InternalApi().get(url_for("AccountAPI:get", id=account_id))
  21. account = account_response.json()
  22. return account
  23. class AccountCrudUI(FlaskView):
  24. route_base = "/accounts"
  25. trailing_slash = False
  26. @login_required
  27. def index(self):
  28. """/accounts"""
  29. return render_flexmeasures_template(
  30. "accounts/accounts.html",
  31. )
  32. @login_required
  33. def get(self, account_id: str):
  34. """/accounts/<account_id>"""
  35. include_inactive = request.args.get("include_inactive", "0") != "0"
  36. account = db.session.execute(select(Account).filter_by(id=account_id)).scalar()
  37. if account.consultancy_account_id:
  38. consultancy_account = db.session.execute(
  39. select(Account).filter_by(id=account.consultancy_account_id)
  40. ).scalar_one_or_none()
  41. if consultancy_account:
  42. account.consultancy_account.name = consultancy_account.name
  43. accounts = get_accounts() if user_has_admin_access(current_user, "read") else []
  44. user_can_view_account_auditlog = True
  45. try:
  46. check_access(AuditLog.account_table_acl(account), "read")
  47. except (Forbidden, Unauthorized):
  48. user_can_view_account_auditlog = False
  49. user_can_update_account = True
  50. try:
  51. check_access(account, "update")
  52. except (Forbidden, Unauthorized):
  53. user_can_update_account = False
  54. return render_flexmeasures_template(
  55. "accounts/account.html",
  56. account=account,
  57. accounts=accounts,
  58. include_inactive=include_inactive,
  59. user_can_update_account=user_can_update_account,
  60. can_view_account_auditlog=user_can_view_account_auditlog,
  61. )
  62. @login_required
  63. def auditlog(self, account_id: str):
  64. """/accounts/auditlog/<account_id>"""
  65. account = db.session.execute(select(Account).filter_by(id=account_id)).scalar()
  66. audit_log_response = InternalApi().get(
  67. url_for("AccountAPI:auditlog", id=account_id)
  68. )
  69. audit_logs_response = audit_log_response.json()
  70. return render_flexmeasures_template(
  71. "accounts/account_audit_log.html",
  72. audit_logs=audit_logs_response,
  73. account=account,
  74. )