decorators.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. """
  2. Auth decorators for endpoints
  3. """
  4. from __future__ import annotations
  5. from typing import Callable
  6. from functools import wraps
  7. import inspect
  8. from flask import current_app
  9. from flask_json import as_json
  10. from flask_security import (
  11. current_user,
  12. roles_accepted as roles_accepted_fs,
  13. roles_required as roles_required_fs,
  14. )
  15. from werkzeug.local import LocalProxy
  16. from werkzeug.exceptions import Forbidden
  17. from flexmeasures.data import db
  18. from flexmeasures.auth.policy import ADMIN_ROLE, AuthModelMixin, check_access
  19. _security = LocalProxy(lambda: current_app.extensions["security"])
  20. def roles_accepted(*roles):
  21. """As in Flask-Security, but also accept admin"""
  22. if ADMIN_ROLE not in roles:
  23. roles = roles + (ADMIN_ROLE,)
  24. return roles_accepted_fs(*roles)
  25. def roles_required(*roles):
  26. """As in Flask-Security, but wave through if user is admin"""
  27. if current_user and current_user.has_role(ADMIN_ROLE):
  28. roles = []
  29. return roles_required_fs(*roles)
  30. def account_roles_accepted(*account_roles):
  31. """Decorator which specifies that a user's account must have at least one of the
  32. specified roles (or must be an admin). Example:
  33. @app.route('/postMeterData')
  34. @account_roles_accepted('Prosumer', 'MDC')
  35. def post_meter_data():
  36. return 'Meter data posted'
  37. The current user's account must have either the `Prosumer` role or `MDC` role in
  38. order to use the service.
  39. :param account_roles: The possible roles.
  40. """
  41. def wrapper(fn):
  42. @wraps(fn)
  43. @as_json
  44. def decorated_service(*args, **kwargs):
  45. if current_user and (
  46. current_user.has_role(ADMIN_ROLE)
  47. or any([current_user.account.has_role(role) for role in account_roles])
  48. ):
  49. return fn(*args, **kwargs)
  50. raise Forbidden(
  51. f"User {current_user}'s account does not have any of the following roles: {','.join(account_roles)}."
  52. )
  53. return decorated_service
  54. return wrapper
  55. def account_roles_required(*account_roles):
  56. """Decorator which specifies that a user's account must have all the specified roles.
  57. Example::
  58. @app.route('/dashboard')
  59. @account_roles_required('Prosumer', 'App-subscriber')
  60. def dashboard():
  61. return 'Dashboard'
  62. The current user's account must have both the `Prosumer` role and
  63. `App-subscriber` role in order to view the page.
  64. :param roles: The required roles.
  65. """
  66. def wrapper(fn):
  67. @wraps(fn)
  68. def decorated_view(*args, **kwargs):
  69. if not current_user or (
  70. not current_user.has_role(ADMIN_ROLE)
  71. and not all(
  72. [current_user.account.has_role(role) for role in account_roles]
  73. )
  74. ):
  75. raise Forbidden(
  76. f"User {current_user}'s account does not have all of the following roles: {','.join(account_roles)}."
  77. )
  78. return fn(*args, **kwargs)
  79. return decorated_view
  80. return wrapper
  81. def permission_required_for_context(
  82. permission: str,
  83. ctx_arg_pos: int | None = None,
  84. ctx_arg_name: str | None = None,
  85. ctx_loader: Callable | None = None,
  86. pass_ctx_to_loader: bool = False,
  87. ):
  88. """
  89. This decorator can be used to make sure that the current user has the necessary permission to access the context.
  90. The permission needs to be a known permission and is checked with principal descriptions from the context's access control list (see AuthModelMixin.__acl__).
  91. This decorator will first load the context (see below for details) and then call check_access to make sure the current user has the permission.
  92. A 403 response is raised if there is no principal for the required permission.
  93. A 401 response is raised if the user is not authenticated at all.
  94. We will now explain how to load a context, and give an example:
  95. The context needs to be an AuthModelMixin and is found ...
  96. - by loading it via the ctx_loader callable;
  97. - otherwise:
  98. * by the keyword argument ctx_arg_name;
  99. * and/or by a position in the non-keyword arguments (ctx_arg_pos).
  100. If nothing is passed, the context lookup defaults to ctx_arg_pos=0.
  101. Let's look at an example. Usually, you'd place a marshmallow field further up in the decorator chain, e.g.:
  102. @app.route("/resource/<resource_id>", methods=["GET"])
  103. @use_kwargs(
  104. {"the_resource": ResourceIdField(data_key="resource_id")},
  105. location="path",
  106. )
  107. @permission_required_for_context("read", ctx_arg_name="the_resource")
  108. @as_json
  109. def view(resource_id: int, the_resource: Resource):
  110. return dict(name=the_resource.name)
  111. Note that in this example, `ResourceIdField._deserialize()` turns the id parameter into a Resource context (if possible).
  112. The ctx_loader:
  113. The ctx_loader can be a function without arguments or it takes the context loaded from the arguments as input (using pass_ctx_to_loader=True).
  114. A special case is useful when the arguments contain the context ID (not the instance).
  115. Then, the loader can be a subclass of AuthModelMixin, and this decorator will look up the instance.
  116. Using both arg name and position:
  117. Using both ctx_arg_name and ctx_arg_pos arguments is useful when Marshmallow de-serializes to a dict and you are using use_args. In this case, the context lookup applies first ctx_arg_pos, then ctx_arg_name.
  118. Let's look at a slightly more complex example where we combine both special cases from above.
  119. We parse a dictionary from the input with a Marshmallow schema, in which a context ID can be found which we need to instantiate:
  120. @app.route("/resource", methods=["POST"])
  121. @use_args(resource_schema)
  122. @permission_required_for_context(
  123. "create-children", ctx_arg_pos=1, ctx_arg_name="resource_id", ctx_loader=Resource, pass_ctx_to_loader=True
  124. )
  125. def post(self, resource_data: dict):
  126. Note that in this example, resource_data is the input parsed by resource_schema, "resource_id" is one of the parameters in this schema, and Resource is a subclass of AuthModelMixin.
  127. """
  128. def wrapper(fn):
  129. @wraps(fn)
  130. def decorated_view(*args, **kwargs):
  131. # load & check context
  132. context: AuthModelMixin = None
  133. # first set context_from_args, if possible
  134. context_from_args: AuthModelMixin = None
  135. if ctx_arg_pos is not None and ctx_arg_name is not None:
  136. context_from_args = args[ctx_arg_pos][ctx_arg_name]
  137. elif ctx_arg_pos is not None:
  138. context_from_args = args[ctx_arg_pos]
  139. elif ctx_arg_name is not None:
  140. context_from_args = kwargs[ctx_arg_name]
  141. elif len(args) > 0:
  142. context_from_args = args[0]
  143. # if a loader is given, use that, otherwise fall back to context_from_args
  144. if ctx_loader is not None:
  145. if pass_ctx_to_loader:
  146. if inspect.isclass(ctx_loader) and issubclass(
  147. ctx_loader, AuthModelMixin
  148. ):
  149. context = db.session.get(ctx_loader, context_from_args)
  150. else:
  151. context = ctx_loader(context_from_args)
  152. else:
  153. context = ctx_loader()
  154. else:
  155. context = context_from_args
  156. check_access(context, permission)
  157. return fn(*args, **kwargs)
  158. return decorated_view
  159. return wrapper