policy.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. """
  2. Tooling & docs for implementing our auth policy
  3. """
  4. from __future__ import annotations
  5. from typing import List, Tuple, Union
  6. from flask import current_app
  7. from flask_security import current_user
  8. from werkzeug.exceptions import Unauthorized, Forbidden
  9. PERMISSIONS = ["create-children", "read", "update", "delete"]
  10. ADMIN_ROLE = "admin"
  11. ADMIN_READER_ROLE = "admin-reader"
  12. ACCOUNT_ADMIN_ROLE = "account-admin"
  13. CONSULTANT_ROLE = "consultant"
  14. # constants to allow access to certain groups
  15. EVERY_LOGGED_IN_USER = "every-logged-in-user"
  16. # todo: Use | instead of Union, list instead of List and tuple instead of Tuple when FM stops supporting Python 3.9 (because of https://github.com/python/cpython/issues/86399)
  17. PRINCIPALS_TYPE = Union[str, Tuple[str], List[Union[str, Tuple[str]]]]
  18. class AuthModelMixin(object):
  19. def __acl__(self) -> dict[str, PRINCIPALS_TYPE]:
  20. """
  21. This function returns an access control list (ACL) for an instance of a model which is relevant for authorization.
  22. ACLs in FlexMeasures are inspired by Pyramid's resource ACLs.
  23. In an ACL, we list which principal (security contexts, see below) allow certain kinds of actions
  24. ― by mapping supported permissions to the required principals.
  25. # What is a principal / security context?
  26. In computer security, a "principal" is the security context of the authenticated user [1].
  27. For example, within FlexMeasures, an accepted principal is "user:2", which denotes that the user should have ID 2
  28. (more technical specifications follow below).
  29. # Example
  30. Here are some examples of principals mapped to permissions in a fictional ACL:
  31. {
  32. "create-children": "account:3", # Everyone in Account 3 can create child items (e.g. beliefs for a sensor)
  33. "read": EVERYONE, # Reading is available to every logged-in user
  34. "update": ["user:14", # This user can update, ...
  35. user:15"], # and also this user, ...
  36. "update": "account-role:MDC", # also people in such accounts can update
  37. "delete": ("account:3", "role:CEO"), # Only CEOs of Account 3 can delete
  38. }
  39. Such a list of principals can be checked with match_principals, see below.
  40. # Specifications of principals
  41. Within FlexMeasures, a principal is handled as a string, usually defining context and identification, like so:
  42. <context>:<identification>.
  43. Supported contexts are user and account IDs, as well as user and account roles. All of them feature in the example above.
  44. Iterable principal descriptors should be treated as follows:
  45. - a list contains OR-connected items, which can be principal or tuples of principals (one of the items in the list is sufficient to grant the permission)
  46. - a tuple contains AND-connected strings (you need all of the items in the list to grant the permission).
  47. # Row-level authorization
  48. This ACL approach to authorization is usually called "row-level authorization" ― it always requires an instance, from which to get the ACL.
  49. Unlike pyramid, we don't have a general solution for table-level auth (as we haven't needed a general implementation so far), but there is a nice custom approach to it.
  50. A class method on the model can be added which returns an AuthModelMixin. That would have an __acl__() function with your rules, which the auth policy will then go on and use. The permission_required_for_context decorator can make sure this AuthModelMixin object is used by the policy via ctx_loader. It can even pass in the context if that is helpful for your logic.
  51. See the AuditLog model class for an example, where we required authorization logic which governs if a subset of a table (e.g. all audit logs that relate to an account) are availabe to the current user."
  52. Row level access policy works because we make use of the hierarchy in our model.
  53. The highest level (e.g. an account) is created by site-admins and usually not in the API, but CLI. For everything else, we can ask the ACL
  54. on an instance, if we can handle it like we intend to. For creation of instances (where there is no instance to ask), it makes sense to use the instance one level up to look up the correct permission ("create-children"). E.g. to create belief data for a sensor, we can check the "create-children" - permission on the sensor.
  55. [1] https://docs.microsoft.com/en-us/windows/security/identity-protection/access-control/security-principals#a-href-idw2k3tr-princ-whatawhat-are-security-principals
  56. """
  57. return {}
  58. def check_access(context: AuthModelMixin, permission: str):
  59. """
  60. Check if current user can access this auth context if this permission
  61. is required, either with admin rights or principal(s).
  62. Raises 401 or 403 otherwise.
  63. """
  64. # check permission and current user before taking context into account
  65. if permission not in PERMISSIONS:
  66. raise Forbidden(f"Permission '{permission}' cannot be handled.")
  67. if current_user.is_anonymous:
  68. raise Unauthorized()
  69. # check context
  70. if context is None:
  71. raise Forbidden(
  72. f"Context needs {permission}-permission, but no context was passed."
  73. )
  74. if not isinstance(context, AuthModelMixin):
  75. raise Forbidden(
  76. f"Context {context} needs {permission}-permission, but is no AuthModelMixin."
  77. )
  78. # look up principals
  79. acl = context.__acl__()
  80. principals: PRINCIPALS_TYPE = acl.get(permission, [])
  81. current_app.logger.debug(
  82. f"Looking for {permission}-permission on {context} ... Principals: {principals}"
  83. )
  84. # check access
  85. if not user_has_admin_access(
  86. current_user, permission
  87. ) and not user_matches_principals(current_user, principals):
  88. raise Forbidden(
  89. f"Authorization failure (accessing {context} to {permission}) ― cannot match {current_user} against {principals}!"
  90. )
  91. def user_has_admin_access(user, permission: str) -> bool:
  92. if user.has_role(ADMIN_ROLE) or (
  93. user.has_role(ADMIN_READER_ROLE) and permission == "read"
  94. ):
  95. return True
  96. return False
  97. def user_matches_principals(user, principals: PRINCIPALS_TYPE) -> bool:
  98. """
  99. Tests if the user matches all passed principals.
  100. Returns False if no principals are passed.
  101. """
  102. if not isinstance(principals, list):
  103. principals = [principals] # now we handle a list of str or Tuple[str]
  104. for matchable_principals in principals:
  105. if isinstance(matchable_principals, str):
  106. matchable_principals = (
  107. matchable_principals,
  108. ) # now we handle only Tuple[str]
  109. if EVERY_LOGGED_IN_USER in matchable_principals:
  110. return True
  111. if user is not None and all(
  112. [
  113. (
  114. check_user_identity(user, principal)
  115. or check_user_role(user, principal)
  116. or check_account_membership(user, principal)
  117. or check_account_role(user, principal)
  118. )
  119. for principal in matchable_principals
  120. ]
  121. ):
  122. return True
  123. return False
  124. def check_user_identity(user, principal: str) -> bool:
  125. if principal.startswith("user:"):
  126. user_id = principal.split("user:")[1]
  127. if not user_id.isdigit():
  128. current_app.logger.warning(
  129. f"Cannot match principal for user ID {user_id} ― no digit."
  130. )
  131. elif user.id == int(user_id):
  132. return True
  133. return False
  134. def check_user_role(user, principal: str) -> bool:
  135. if principal.startswith("role:"):
  136. user_role = principal.split("role:")[1]
  137. if user.has_role(user_role):
  138. return True
  139. return False
  140. def check_account_membership(user, principal: str) -> bool:
  141. if principal.startswith("account:"):
  142. account_id = principal.split("account:")[1]
  143. if not account_id.isdigit():
  144. current_app.logger.warning(
  145. f"Cannot match principal for account ID {account_id} ― no digit."
  146. )
  147. elif user.account.id == int(account_id):
  148. return True
  149. return False
  150. def check_account_role(user, principal: str) -> bool:
  151. if principal.startswith("account-role:"):
  152. account_role = principal.split("account-role:")[1]
  153. if user.account.has_role(account_role):
  154. return True
  155. return False
  156. def can_modify_role(user, roles_to_modify) -> bool:
  157. """For a set of supported roles, check if the current user can modify the roles.
  158. :param user: The current user.
  159. :param roles_to_modify: A list of roles to modify.
  160. :return: True if the user can modify the roles, False otherwise.
  161. The roles are:
  162. - admin: can only be changed in CLI / directly in the DB
  163. - admin-reader: can be added and removed by admins
  164. - account-admin: can be added and removed by admins and consultants
  165. - consultant: can be added and removed by admins and account-admins
  166. """
  167. for role in roles_to_modify:
  168. if isinstance(role, int):
  169. from flexmeasures.data.models.user import Role
  170. role = current_app.db.session.get(Role, role)
  171. if not role:
  172. return False
  173. if role.name == ADMIN_ROLE:
  174. return False
  175. if role.name == ADMIN_READER_ROLE and not user.has_role(ADMIN_ROLE):
  176. return False
  177. if role.name == ACCOUNT_ADMIN_ROLE and not (
  178. user.has_role(ADMIN_ROLE) or user.has_role(CONSULTANT_ROLE)
  179. ):
  180. return False
  181. if role.name == CONSULTANT_ROLE and not (
  182. user.has_role(ADMIN_ROLE) or user.has_role(ACCOUNT_ADMIN_ROLE)
  183. ):
  184. return False
  185. return True