audit_log.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. from __future__ import annotations
  2. from flask_security import current_user
  3. from flask_security.core import AnonymousUser
  4. from sqlalchemy import DateTime, Column, Integer, String, ForeignKey
  5. from flexmeasures.auth.policy import AuthModelMixin, CONSULTANT_ROLE, ACCOUNT_ADMIN_ROLE
  6. from flexmeasures.data import db
  7. from flexmeasures.data.models.generic_assets import GenericAsset
  8. from flexmeasures.data.models.time_series import Sensor
  9. from flexmeasures.data.models.user import User, Account
  10. from flexmeasures.utils.time_utils import server_now
  11. def get_current_user_id_name():
  12. current_user_id, current_user_name = None, None
  13. if current_user and not isinstance(current_user, AnonymousUser):
  14. current_user_id, current_user_name = current_user.id, current_user.username
  15. return current_user_id, current_user_name
  16. class AuditLog(db.Model, AuthModelMixin):
  17. """
  18. Model for storing actions that happen to user and tenant accounts
  19. E.g user creation, password reset etc.
  20. """
  21. __tablename__ = "audit_log"
  22. id = Column(Integer, primary_key=True)
  23. event_datetime = Column(DateTime())
  24. event = Column(String(255))
  25. active_user_name = Column(String(255))
  26. active_user_id = Column(
  27. "active_user_id", Integer(), ForeignKey("fm_user.id", ondelete="SET NULL")
  28. )
  29. affected_user_id = Column(
  30. "affected_user_id", Integer(), ForeignKey("fm_user.id", ondelete="SET NULL")
  31. )
  32. affected_account_id = Column(
  33. "affected_account_id", Integer(), ForeignKey("account.id", ondelete="SET NULL")
  34. )
  35. @classmethod
  36. def user_table_acl(cls, user: User):
  37. """
  38. Table-level access rules for user-affecting audit logs. Use directly in check_access or in @permission_required_for_context with pass_ctx_to_loader, ctx_loader=AuditLog.user_acl.
  39. Permissions:
  40. User can see his own audit logs.
  41. Account-admin users can see audit logs for all users of their account.
  42. Admins / admin-readers can see audit logs for all users.
  43. Consultant users can see the audit log of all users in the client accounts.
  44. """
  45. class AuditLogAccess(AuthModelMixin):
  46. def __init__(self, user: User):
  47. if user:
  48. self.user_id = user.id
  49. self.account_id = user.account_id
  50. self.consultancy_account_id = user.account.consultancy_account_id
  51. def __acl__(self):
  52. if not self.user_id:
  53. return {}
  54. return {
  55. "read": [
  56. f"user:{self.user_id}",
  57. (f"account:{self.account_id}", f"role:{ACCOUNT_ADMIN_ROLE}"),
  58. (
  59. f"account:{self.consultancy_account_id}",
  60. f"role:{CONSULTANT_ROLE}",
  61. ),
  62. ],
  63. }
  64. return AuditLogAccess(user)
  65. @classmethod
  66. def account_table_acl(cls, account: Account):
  67. """
  68. Table-level access rules for account-affecting audit logs. Use directly in check_access or in @permission_required_for_context with pass_ctx_to_loader, ctx_loader=AuditLog.user_acl.
  69. Permissions:
  70. Account-admin users can see audit logs for their account.
  71. Admins / admin-readers can see audit logs for all accounts.
  72. Consultant users can see the audit log of all client accounts.
  73. """
  74. class AuditLogAccess(AuthModelMixin):
  75. def __init__(self, account: Account):
  76. if account:
  77. self.account_id = account.id
  78. self.consultancy_account_id = account.consultancy_account_id
  79. def __acl__(self):
  80. if not self.account_id:
  81. return {}
  82. return {
  83. "read": [
  84. (f"account:{self.account_id}", f"role:{ACCOUNT_ADMIN_ROLE}"),
  85. (
  86. f"account:{self.consultancy_account_id}",
  87. f"role:{CONSULTANT_ROLE}",
  88. ),
  89. ],
  90. }
  91. return AuditLogAccess(account)
  92. class AssetAuditLog(db.Model, AuthModelMixin):
  93. """
  94. Model for storing actions that happen to an asset.
  95. E.g asset creation, editing etc.
  96. """
  97. __tablename__ = "asset_audit_log"
  98. id = Column(Integer, primary_key=True)
  99. event_datetime = Column(DateTime())
  100. event = Column(String(255))
  101. active_user_name = Column(String(255))
  102. active_user_id = Column(
  103. "active_user_id", Integer(), ForeignKey("fm_user.id", ondelete="SET NULL")
  104. )
  105. affected_asset_id = Column(
  106. "affected_asset_id",
  107. Integer(),
  108. ForeignKey("generic_asset.id", ondelete="SET NULL"),
  109. )
  110. @classmethod
  111. def add_record_for_attribute_update(
  112. cls,
  113. attribute_key: str,
  114. attribute_value: float | int | bool | str | list | dict | None,
  115. entity_type: str,
  116. asset_or_sensor: GenericAsset | Sensor,
  117. ) -> None:
  118. """Add audit log record about asset or sensor attribute update.
  119. :param attribute_key: attribute key to update
  120. :param attribute_value: new attribute value
  121. :param entity_type: 'asset' or 'sensor'
  122. :param asset_or_sensor: asset or sensor object
  123. """
  124. current_user_id, current_user_name = get_current_user_id_name()
  125. old_value = asset_or_sensor.attributes.get(attribute_key)
  126. if entity_type == "sensor":
  127. event = f"Updated sensor '{asset_or_sensor.name}': {asset_or_sensor.id}; "
  128. affected_asset_id = (asset_or_sensor.generic_asset_id,)
  129. else:
  130. event = f"Updated asset '{asset_or_sensor.name}': {asset_or_sensor.id}; "
  131. affected_asset_id = asset_or_sensor.id
  132. event += f"Attr '{attribute_key}' To {attribute_value} From {old_value}"
  133. audit_log = cls(
  134. event_datetime=server_now(),
  135. event=truncate_string(
  136. event, 255
  137. ), # we truncate the event string if it 255 characters by adding ellipses in the middle
  138. active_user_id=current_user_id,
  139. active_user_name=current_user_name,
  140. affected_asset_id=affected_asset_id,
  141. )
  142. db.session.add(audit_log)
  143. @classmethod
  144. def add_record(
  145. cls,
  146. asset: GenericAsset | Sensor,
  147. event: str,
  148. ) -> None:
  149. """Add audit log record about asset related crud actions.
  150. :param asset: asset or sensor object
  151. :param event: event to log
  152. """
  153. current_user_id, current_user_name = get_current_user_id_name()
  154. audit_log = AssetAuditLog(
  155. event_datetime=server_now(),
  156. event=truncate_string(
  157. event, 255
  158. ), # we truncate the event string if it exceeds 255 characters by adding ellipses in the middle
  159. active_user_id=current_user_id,
  160. active_user_name=current_user_name,
  161. affected_asset_id=asset.id,
  162. )
  163. db.session.add(audit_log)
  164. def truncate_string(value: str, max_length: int) -> str:
  165. """Truncate a string and add ellipses in the middle if it exceeds max_length."""
  166. if len(value) <= max_length:
  167. return value
  168. half_length = (max_length - 5) // 2
  169. return f"{value[:half_length]} ... {value[-half_length:]}"