test_user_services.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import pytest
  2. from sqlalchemy import select, func
  3. from flexmeasures.data.models.audit_log import AuditLog
  4. from flexmeasures.data.models.user import User, Role
  5. from flexmeasures.data.services.users import (
  6. create_user,
  7. find_user_by_email,
  8. delete_user,
  9. InvalidFlexMeasuresUser,
  10. )
  11. from flexmeasures.data.models.generic_assets import GenericAsset
  12. from flexmeasures.data.models.data_sources import DataSource
  13. from flexmeasures.data.models.time_series import TimedBelief
  14. def test_create_user(
  15. fresh_db, setup_accounts_fresh_db, setup_roles_users_fresh_db, app
  16. ):
  17. """Create a user"""
  18. num_users = fresh_db.session.scalar(select(func.count()).select_from(User))
  19. prosumer_account = setup_accounts_fresh_db["Prosumer"]
  20. user = create_user(
  21. email="new_user@seita.nl",
  22. password="testtest",
  23. account_name=prosumer_account.name,
  24. user_roles=["SomeRole"],
  25. )
  26. assert (
  27. fresh_db.session.scalar(select(func.count()).select_from(User)) == num_users + 1
  28. )
  29. assert user.email == "new_user@seita.nl"
  30. assert user.username == "new_user"
  31. assert user.account.name == "Test Prosumer Account"
  32. assert user.roles == [
  33. fresh_db.session.execute(
  34. select(Role).filter_by(name="SomeRole")
  35. ).scalar_one_or_none()
  36. ]
  37. assert fresh_db.session.execute(
  38. select(DataSource).filter_by(user_id=user.id)
  39. ).scalar_one_or_none()
  40. assert fresh_db.session.execute(
  41. select(DataSource).filter_by(name=user.username)
  42. ).scalar_one_or_none()
  43. user_audit_log = (
  44. fresh_db.session.query(AuditLog)
  45. .filter_by(affected_user_id=user.id)
  46. .one_or_none()
  47. )
  48. assert user_audit_log.event == "User new_user created"
  49. assert user_audit_log.affected_account_id == prosumer_account.id
  50. assert user_audit_log.active_user_id is None
  51. def test_create_user_no_account(
  52. fresh_db, setup_accounts_fresh_db, setup_roles_users_fresh_db, app
  53. ):
  54. """Create a user where the account still needs creation, test for both audit log entries"""
  55. user = create_user(
  56. email="new_user@seita.nl",
  57. password="testtest",
  58. account_name="new_account",
  59. user_roles=["SomeRole"],
  60. )
  61. user_audit_log = (
  62. fresh_db.session.query(AuditLog)
  63. .filter_by(event="User new_user created")
  64. .one_or_none()
  65. )
  66. assert user_audit_log.affected_user_id == user.id
  67. assert user_audit_log.affected_account_id == user.account_id
  68. account_audit_log = (
  69. fresh_db.session.query(AuditLog)
  70. .filter_by(event="Account new_account created")
  71. .one_or_none()
  72. )
  73. assert account_audit_log.affected_user_id is None
  74. assert account_audit_log.affected_account_id == user.account_id
  75. assert account_audit_log.active_user_id is None
  76. def test_create_invalid_user(
  77. fresh_db, setup_accounts_fresh_db, setup_roles_users_fresh_db, app
  78. ):
  79. """A few invalid attempts to create a user"""
  80. with pytest.raises(InvalidFlexMeasuresUser) as exc_info:
  81. create_user(password="testtest")
  82. assert "No email" in str(exc_info.value)
  83. with pytest.raises(InvalidFlexMeasuresUser) as exc_info:
  84. create_user(
  85. email="test_user_AT_seita.nl",
  86. password="testtest",
  87. account_name=setup_accounts_fresh_db["Prosumer"].name,
  88. )
  89. assert "not a valid" in str(exc_info.value)
  90. """ # This check is disabled during testing, as testing should work without internet and be fast
  91. with pytest.raises(InvalidFlexMeasuresUser) as exc_info:
  92. create_user(
  93. email="test_prosumer@sdkkhflzsxlgjxhglkzxjhfglkxhzlzxcvlzxvb.nl",
  94. password="testtest",
  95. account_name=setup_account_fresh_db.name,
  96. )
  97. assert "not seem to be deliverable" in str(exc_info.value)
  98. """
  99. with pytest.raises(InvalidFlexMeasuresUser) as exc_info:
  100. create_user(
  101. email="test_prosumer_user@seita.nl",
  102. password="testtest",
  103. account_name=setup_accounts_fresh_db["Prosumer"].name,
  104. )
  105. assert "already exists" in str(exc_info.value)
  106. with pytest.raises(InvalidFlexMeasuresUser) as exc_info:
  107. create_user(
  108. email="new_user@seita.nl",
  109. username="Test Prosumer User",
  110. password="testtest",
  111. account_name=setup_accounts_fresh_db["Prosumer"].name,
  112. )
  113. assert "already exists" in str(exc_info.value)
  114. with pytest.raises(InvalidFlexMeasuresUser) as exc_info:
  115. create_user(
  116. email="new_user@seita.nl",
  117. username="New Test Prosumer User",
  118. password="testtest",
  119. )
  120. assert "without knowing the name of the account" in str(exc_info.value)
  121. def test_delete_user(fresh_db, setup_roles_users_fresh_db, setup_assets_fresh_db, app):
  122. """Check that deleting a user does not lead to deleting their organisation's (asset/sensor/beliefs) data.
  123. Also check that an audit log entry is created + old audit log entries get affected_user_id set to None.
  124. """
  125. prosumer: User = find_user_by_email("test_prosumer_user@seita.nl")
  126. prosumer_account_id = prosumer.account_id
  127. num_users_before = fresh_db.session.scalar(select(func.count(User.id)))
  128. # Find assets belonging to the user's organisation
  129. asset_query = select(GenericAsset).filter_by(account_id=prosumer_account_id)
  130. assets_before = fresh_db.session.scalars(asset_query).all()
  131. assert (
  132. len(assets_before) > 0
  133. ), "Test assets should have been set up, otherwise we'd not be testing whether they're kept."
  134. # Find all the organisation's sensors
  135. sensors_before = []
  136. for asset in assets_before:
  137. sensors_before.extend(asset.sensors)
  138. # Count all the organisation's beliefs
  139. beliefs_query = select(func.count()).filter(
  140. TimedBelief.sensor_id.in_([sensor.id for sensor in sensors_before])
  141. )
  142. num_beliefs_before = fresh_db.session.scalar(beliefs_query)
  143. assert (
  144. num_beliefs_before > 0
  145. ), "Some beliefs should have been set up, otherwise we'd not be testing whether they're kept."
  146. # Add creation audit log record
  147. user_creation_audit_log = AuditLog(
  148. event="User Test Prosumer User created test",
  149. affected_user_id=prosumer.id,
  150. affected_account_id=prosumer_account_id,
  151. )
  152. fresh_db.session.add(user_creation_audit_log)
  153. # Delete the user
  154. delete_user(prosumer)
  155. assert find_user_by_email("test_prosumer_user@seita.nl") is None
  156. assert fresh_db.session.scalar(select(func.count(User.id))) == num_users_before - 1
  157. # Check whether the organisation's assets, sensors and beliefs were kept
  158. assets_after = fresh_db.session.scalars(asset_query).all()
  159. assert assets_after == assets_before
  160. num_beliefs_after = fresh_db.session.scalar(beliefs_query)
  161. assert num_beliefs_after == num_beliefs_before
  162. user_deletion_audit_log = (
  163. fresh_db.session.query(AuditLog)
  164. .filter_by(event="User Test Prosumer User deleted")
  165. .one_or_none()
  166. )
  167. assert user_deletion_audit_log.affected_user_id is None
  168. assert user_deletion_audit_log.affected_account_id == prosumer_account_id
  169. assert user_deletion_audit_log.active_user_id is None
  170. fresh_db.session.refresh(user_creation_audit_log)
  171. assert user_creation_audit_log.affected_user_id is None