test_api_v3_0_users.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. from flask import url_for
  2. from flask_login import current_user, logout_user
  3. from sqlalchemy import select
  4. import pytest
  5. from flexmeasures.data.models.audit_log import AuditLog
  6. from flexmeasures.data.services.users import find_user_by_email
  7. from flexmeasures.api.tests.utils import UserContext
  8. @pytest.mark.parametrize(
  9. "requesting_user", ["test_prosumer_user_2@seita.nl", None], indirect=True
  10. )
  11. def test_get_users_bad_auth(requesting_user, client, setup_api_test_data):
  12. """
  13. Attempt to get users with insufficient or missing auth.
  14. """
  15. # the case without auth: authentication will fail
  16. query = {}
  17. if requesting_user:
  18. # in this case, we successfully authenticate,
  19. # but fail authorization (non-admin accessing another account)
  20. query = {"account_id": 2}
  21. get_users_response = client.get(url_for("UserAPI:index"), query_string=query)
  22. print("Server responded with:\n%s" % get_users_response.data)
  23. if requesting_user:
  24. assert get_users_response.status_code == 403
  25. else:
  26. assert get_users_response.status_code == 401
  27. @pytest.mark.parametrize(
  28. "requesting_user, include_inactive, sort_by, sort_dir, expected_name_of_first_user",
  29. [
  30. ("test_prosumer_user_2@seita.nl", False, None, None, None),
  31. (
  32. "test_prosumer_user_2@seita.nl",
  33. True,
  34. "username",
  35. "asc",
  36. "inactive test admin",
  37. ),
  38. (
  39. "test_prosumer_user_2@seita.nl",
  40. True,
  41. "username",
  42. "desc",
  43. "Test Prosumer User 2",
  44. ),
  45. ],
  46. indirect=["requesting_user"],
  47. )
  48. def test_get_users_inactive(
  49. requesting_user,
  50. client,
  51. setup_api_test_data,
  52. setup_inactive_user,
  53. include_inactive,
  54. sort_by,
  55. sort_dir,
  56. expected_name_of_first_user,
  57. ):
  58. query = {}
  59. if include_inactive in (True, False):
  60. query["include_inactive"] = include_inactive
  61. if sort_by:
  62. query["sort_by"] = sort_by
  63. if sort_dir:
  64. query["sort_dir"] = sort_dir
  65. get_users_response = client.get(
  66. url_for("UserAPI:index"),
  67. query_string=query,
  68. )
  69. print("Server responded with:\n%s" % get_users_response.json)
  70. assert get_users_response.status_code == 200
  71. assert isinstance(get_users_response.json, list)
  72. if include_inactive is False:
  73. assert len(get_users_response.json) == 3
  74. else:
  75. users = get_users_response.json
  76. assert len(users) == 5
  77. if sort_by:
  78. assert users[0]["username"] == expected_name_of_first_user
  79. @pytest.mark.parametrize(
  80. "requesting_user, status_code",
  81. [
  82. (None, 401), # no auth is not allowed
  83. ("test_prosumer_user_2@seita.nl", 200), # gets themselves
  84. ("test_prosumer_user@seita.nl", 200), # gets from same account
  85. ("test_dummy_user_3@seita.nl", 403), # gets from other account
  86. ("test_admin_user@seita.nl", 200), # admin can do this from another account
  87. ],
  88. indirect=["requesting_user"],
  89. )
  90. def test_get_one_user(client, setup_api_test_data, requesting_user, status_code):
  91. test_user2_id = find_user_by_email("test_prosumer_user_2@seita.nl").id
  92. get_user_response = client.get(url_for("UserAPI:get", id=test_user2_id))
  93. print("Server responded with:\n%s" % get_user_response.data)
  94. assert get_user_response.status_code == status_code
  95. if status_code == 200:
  96. assert get_user_response.json["username"] == "Test Prosumer User 2"
  97. @pytest.mark.parametrize(
  98. "requesting_user, status_code",
  99. [
  100. (None, 401), # no auth is not allowed
  101. ("test_prosumer_user@seita.nl", 200), # gets themselves
  102. (
  103. "test_prosumer_user_2@seita.nl",
  104. 200,
  105. ), # account-admin can view his account users
  106. (
  107. "test_dummy_account_admin@seita.nl",
  108. 403,
  109. ), # account-admin cannot view other account users
  110. (
  111. "test_prosumer_user_3@seita.nl",
  112. 403,
  113. ), # plain user cant view his account users
  114. ("test_dummy_user_3@seita.nl", 403), # plain user cant view other account users
  115. ("test_admin_user@seita.nl", 200), # admin can do this from another account
  116. (
  117. "test_admin_reader_user@seita.nl",
  118. 200,
  119. ), # admin reader can do this from another account
  120. ],
  121. indirect=["requesting_user"],
  122. )
  123. def test_get_one_user_audit_log(
  124. client, setup_api_test_data, requesting_user, status_code
  125. ):
  126. requesting_user_id = find_user_by_email("test_prosumer_user@seita.nl").id
  127. get_user_response = client.get(url_for("UserAPI:auditlog", id=requesting_user_id))
  128. print("Server responded with:\n%s" % get_user_response.data)
  129. assert get_user_response.status_code == status_code
  130. if status_code == 200:
  131. assert get_user_response.json[0] is not None
  132. @pytest.mark.parametrize(
  133. "requesting_user, status_code",
  134. [
  135. # Consultant users can see the audit log of all users in the client accounts.
  136. ("test_consultant@seita.nl", 200),
  137. # Has no consultant role.
  138. ("test_consultancy_user_without_consultant_access@seita.nl", 403),
  139. ],
  140. indirect=["requesting_user"],
  141. )
  142. def test_get_one_user_audit_log_consultant(
  143. client, setup_api_test_data, requesting_user, status_code
  144. ):
  145. """Check correctness of consultant account audit log access rules"""
  146. requesting_user_id = find_user_by_email("test_consultant_client@seita.nl").id
  147. get_user_response = client.get(url_for("UserAPI:auditlog", id=requesting_user_id))
  148. print("Server responded with:\n%s" % get_user_response.data)
  149. assert get_user_response.status_code == status_code
  150. if status_code == 200:
  151. assert get_user_response.json[0] is not None
  152. @pytest.mark.parametrize(
  153. "requesting_user, requested_user, status_code",
  154. [
  155. (
  156. "test_prosumer_user_2@seita.nl",
  157. "test_admin_user@seita.nl",
  158. 403,
  159. ), # without being the user themselves or an admin, the user cannot be edited
  160. (None, "test_prosumer_user_2@seita.nl", 401), # anonymous user cannot edit
  161. (
  162. "test_admin_user@seita.nl",
  163. "test_prosumer_user_2@seita.nl",
  164. 200,
  165. ), # admin can deactivate user2
  166. (
  167. "test_admin_user@seita.nl",
  168. "test_admin_user@seita.nl",
  169. 403,
  170. ), # admin can edit themselves but not sensitive fields
  171. ],
  172. indirect=["requesting_user"],
  173. )
  174. def test_edit_user(
  175. db, requesting_user, requested_user, status_code, client, setup_api_test_data
  176. ):
  177. with UserContext(requested_user) as u:
  178. requested_user_id = u.id
  179. user_edit_response = client.patch(
  180. url_for("UserAPI:patch", id=requested_user_id),
  181. json={"active": False},
  182. )
  183. assert user_edit_response.status_code == status_code
  184. if status_code == 200:
  185. assert user_edit_response.json["active"] is False
  186. user = find_user_by_email(requested_user)
  187. assert user.active is False
  188. assert user.id == requested_user_id
  189. assert db.session.execute(
  190. select(AuditLog).filter_by(
  191. affected_user_id=user.id,
  192. event="Active status set to 'False'.",
  193. active_user_id=requesting_user.id,
  194. affected_account_id=user.account_id,
  195. )
  196. ).scalar_one_or_none()
  197. @pytest.mark.parametrize(
  198. "unexpected_fields",
  199. [
  200. dict(password="I-should-not-be-sending-this"), # not part of the schema
  201. dict(id=10), # id is a dump_only field
  202. dict(account_id=10), # account_id is a dump_only field
  203. ],
  204. )
  205. @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
  206. def test_edit_user_with_unexpected_fields(
  207. requesting_user, client, setup_api_test_data, unexpected_fields: dict
  208. ):
  209. """Sending unexpected fields (not in Schema) is an Unprocessable Entity error."""
  210. with UserContext("test_prosumer_user_2@seita.nl") as user2:
  211. user2_id = user2.id
  212. user_edit_response = client.patch(
  213. url_for("UserAPI:patch", id=user2_id),
  214. json={**{"active": False}, **unexpected_fields},
  215. )
  216. print("Server responded with:\n%s" % user_edit_response.json)
  217. assert user_edit_response.status_code == 422
  218. @pytest.mark.parametrize(
  219. "email, status_code",
  220. [
  221. ("test_admin_user@seita.nl", 200),
  222. ("inactive_admin@seita.nl", 400),
  223. ],
  224. )
  225. def test_login(client, setup_api_test_data, email, status_code):
  226. """Tries to log in."""
  227. assert current_user.is_anonymous
  228. # log in
  229. login_response = client.post(
  230. url_for("security.login"),
  231. json={
  232. "email": email,
  233. "password": "testtest",
  234. },
  235. )
  236. print(login_response.json)
  237. assert login_response.status_code == status_code
  238. if status_code == 200:
  239. assert not current_user.is_anonymous
  240. logout_user()
  241. @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
  242. def test_logout(client, setup_api_test_data, requesting_user):
  243. """Tries to log out, which should succeed as a url direction."""
  244. assert not current_user.is_anonymous
  245. # log out
  246. logout_response = client.get(url_for("security.logout"))
  247. assert logout_response.status_code == 302
  248. assert current_user.is_anonymous
  249. @pytest.mark.parametrize(
  250. "requesting_user, expected_status_code, user_to_update, expected_role",
  251. [
  252. # Admin-reader tries to update user 5 (initially an account admin) to become admin-reader
  253. ("test_admin_reader_user@seita.nl", 403, 5, [3]),
  254. # Consultant tries to updates user 8 (initially consultant) to become admin-reader
  255. ("test_consultant@seita.nl", 403, 8, [3]),
  256. # Admin-reader tries to remove all roles of user 5 (initially an account admin)
  257. ("test_admin_reader_user@seita.nl", 403, 5, []),
  258. ],
  259. indirect=["requesting_user"],
  260. )
  261. def test_user_role_failed_modification_permission(
  262. client,
  263. setup_api_test_data,
  264. requesting_user,
  265. expected_status_code,
  266. user_to_update,
  267. expected_role,
  268. ):
  269. patch_user_response = client.patch(
  270. url_for("UserAPI:patch", id=user_to_update),
  271. json={"flexmeasures_roles": expected_role},
  272. )
  273. print("Server responded with:\n%s" % patch_user_response.data)
  274. assert patch_user_response.status_code == expected_status_code