views.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. from __future__ import annotations
  2. from datetime import datetime
  3. from flask import request, url_for
  4. from flask_classful import FlaskView
  5. from flask_security.core import current_user
  6. from flask_security import login_required
  7. from werkzeug.exceptions import Forbidden, Unauthorized
  8. from sqlalchemy import select
  9. from flexmeasures.auth.policy import ADMIN_READER_ROLE, ADMIN_ROLE, check_access
  10. from flexmeasures.auth.decorators import roles_required, roles_accepted
  11. from flexmeasures.data import db
  12. from flexmeasures.data.models.audit_log import AuditLog
  13. from flexmeasures.data.models.user import User, Role, Account
  14. from flexmeasures.data.services.users import (
  15. get_user,
  16. )
  17. from flexmeasures.ui.utils.view_utils import render_flexmeasures_template
  18. from flexmeasures.ui.views.api_wrapper import InternalApi
  19. """
  20. User Crud views for admins.
  21. """
  22. def render_user(user: User | None, msg: str | None = None):
  23. user_view_user_auditlog = True
  24. try:
  25. check_access(AuditLog.user_table_acl(current_user), "read")
  26. except (Forbidden, Unauthorized):
  27. user_view_user_auditlog = False
  28. can_edit_user_details = True
  29. try:
  30. check_access(user, "update")
  31. except (Forbidden, Unauthorized):
  32. can_edit_user_details = False
  33. roles = {}
  34. for role in db.session.scalars(select(Role)).all():
  35. roles[role.name] = role.id
  36. user_roles = []
  37. if user is not None:
  38. user_roles = [role.name for role in user.flexmeasures_roles]
  39. return render_flexmeasures_template(
  40. "users/user.html",
  41. can_view_user_auditlog=user_view_user_auditlog,
  42. can_edit_user_details=can_edit_user_details,
  43. user=user,
  44. user_roles=user_roles,
  45. roles=roles,
  46. asset_count=user.account.number_of_assets,
  47. msg=msg,
  48. )
  49. def process_internal_api_response(
  50. user_data: dict, user_id: int | None = None, make_obj=False
  51. ) -> User | dict:
  52. """
  53. Turn data from the internal API into something we can use to further populate the UI.
  54. Either as a user object or a dict for form filling.
  55. """
  56. with db.session.no_autoflush:
  57. role_ids = tuple(user_data.get("flexmeasures_roles", []))
  58. user_data["flexmeasures_roles"] = db.session.scalars(
  59. select(Role).filter(Role.id.in_(role_ids))
  60. ).all()
  61. user_data.pop("status", None) # might have come from requests.response
  62. for date_field in ("last_login_at", "last_seen_at"):
  63. if date_field in user_data and user_data[date_field] is not None:
  64. user_data[date_field] = datetime.fromisoformat(user_data[date_field])
  65. if user_id:
  66. user_data["id"] = user_id
  67. if make_obj:
  68. user = User(**user_data)
  69. user.account = db.session.get(Account, user_data.get("account_id", -1))
  70. if user in db.session:
  71. db.session.expunge(user)
  72. return user
  73. return user_data
  74. def get_all_users(include_inactive: bool = False) -> list[User]:
  75. get_users_response = InternalApi().get(
  76. url_for(
  77. "UserAPI:index",
  78. include_inactive=include_inactive,
  79. )
  80. )
  81. users = [user for user in get_users_response.json()]
  82. return users
  83. class UserCrudUI(FlaskView):
  84. route_base = "/users"
  85. trailing_slash = False
  86. @login_required
  87. def index(self):
  88. """/users"""
  89. include_inactive = request.args.get("include_inactive", "0") != "0"
  90. return render_flexmeasures_template(
  91. "users/users.html", include_inactive=include_inactive
  92. )
  93. @login_required
  94. @roles_accepted(ADMIN_ROLE, ADMIN_READER_ROLE)
  95. def get(self, id: str):
  96. """GET from /users/<id>"""
  97. get_user_response = InternalApi().get(url_for("UserAPI:get", id=id))
  98. user: User = process_internal_api_response(
  99. get_user_response.json(), make_obj=True
  100. )
  101. return render_user(user)
  102. @roles_required(ADMIN_ROLE)
  103. def toggle_active(self, id: str):
  104. """Toggle activation status via /users/toggle_active/<id>"""
  105. user: User = get_user(id)
  106. user_response = InternalApi().patch(
  107. url_for("UserAPI:patch", id=id),
  108. args={"active": not user.active},
  109. )
  110. patched_user: User = process_internal_api_response(
  111. user_response.json(), make_obj=True
  112. )
  113. return render_user(
  114. patched_user,
  115. msg="User %s's new activation status is now %s."
  116. % (patched_user.username, patched_user.active),
  117. )
  118. @login_required
  119. def reset_password_for(self, id: str):
  120. """/users/reset_password_for/<id>
  121. Set the password to something random (in case of worries the password might be compromised)
  122. and send instructions on how to reset."""
  123. user: User = get_user(id)
  124. InternalApi().patch(
  125. url_for("UserAPI:reset_user_password", id=id),
  126. )
  127. return render_user(
  128. user,
  129. msg="The user's password has been changed to a random password"
  130. " and password reset instructions have been sent to the user."
  131. " Cookies and the API access token have also been invalidated.",
  132. )
  133. @login_required
  134. def auditlog(self, id: str):
  135. """/users/auditlog/<id>
  136. View all user actions.
  137. """
  138. user: User = get_user(id)
  139. return render_flexmeasures_template(
  140. "users/user_audit_log.html",
  141. user=user,
  142. )