123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- from __future__ import annotations
- from flask_security import current_user
- from flask_security.core import AnonymousUser
- from sqlalchemy import DateTime, Column, Integer, String, ForeignKey
- from flexmeasures.auth.policy import AuthModelMixin, CONSULTANT_ROLE, ACCOUNT_ADMIN_ROLE
- from flexmeasures.data import db
- from flexmeasures.data.models.generic_assets import GenericAsset
- from flexmeasures.data.models.time_series import Sensor
- from flexmeasures.data.models.user import User, Account
- from flexmeasures.utils.time_utils import server_now
- def get_current_user_id_name():
- current_user_id, current_user_name = None, None
- if current_user and not isinstance(current_user, AnonymousUser):
- current_user_id, current_user_name = current_user.id, current_user.username
- return current_user_id, current_user_name
- class AuditLog(db.Model, AuthModelMixin):
- """
- Model for storing actions that happen to user and tenant accounts
- E.g user creation, password reset etc.
- """
- __tablename__ = "audit_log"
- id = Column(Integer, primary_key=True)
- event_datetime = Column(DateTime())
- event = Column(String(255))
- active_user_name = Column(String(255))
- active_user_id = Column(
- "active_user_id", Integer(), ForeignKey("fm_user.id", ondelete="SET NULL")
- )
- affected_user_id = Column(
- "affected_user_id", Integer(), ForeignKey("fm_user.id", ondelete="SET NULL")
- )
- affected_account_id = Column(
- "affected_account_id", Integer(), ForeignKey("account.id", ondelete="SET NULL")
- )
- @classmethod
- def user_table_acl(cls, user: User):
- """
- Table-level access rules for user-affecting audit logs. Use directly in check_access or in @permission_required_for_context with pass_ctx_to_loader, ctx_loader=AuditLog.user_acl.
- Permissions:
- User can see his own audit logs.
- Account-admin users can see audit logs for all users of their account.
- Admins / admin-readers can see audit logs for all users.
- Consultant users can see the audit log of all users in the client accounts.
- """
- class AuditLogAccess(AuthModelMixin):
- def __init__(self, user: User):
- if user:
- self.user_id = user.id
- self.account_id = user.account_id
- self.consultancy_account_id = user.account.consultancy_account_id
- def __acl__(self):
- if not self.user_id:
- return {}
- return {
- "read": [
- f"user:{self.user_id}",
- (f"account:{self.account_id}", f"role:{ACCOUNT_ADMIN_ROLE}"),
- (
- f"account:{self.consultancy_account_id}",
- f"role:{CONSULTANT_ROLE}",
- ),
- ],
- }
- return AuditLogAccess(user)
- @classmethod
- def account_table_acl(cls, account: Account):
- """
- Table-level access rules for account-affecting audit logs. Use directly in check_access or in @permission_required_for_context with pass_ctx_to_loader, ctx_loader=AuditLog.user_acl.
- Permissions:
- Account-admin users can see audit logs for their account.
- Admins / admin-readers can see audit logs for all accounts.
- Consultant users can see the audit log of all client accounts.
- """
- class AuditLogAccess(AuthModelMixin):
- def __init__(self, account: Account):
- if account:
- self.account_id = account.id
- self.consultancy_account_id = account.consultancy_account_id
- def __acl__(self):
- if not self.account_id:
- return {}
- return {
- "read": [
- (f"account:{self.account_id}", f"role:{ACCOUNT_ADMIN_ROLE}"),
- (
- f"account:{self.consultancy_account_id}",
- f"role:{CONSULTANT_ROLE}",
- ),
- ],
- }
- return AuditLogAccess(account)
- class AssetAuditLog(db.Model, AuthModelMixin):
- """
- Model for storing actions that happen to an asset.
- E.g asset creation, editing etc.
- """
- __tablename__ = "asset_audit_log"
- id = Column(Integer, primary_key=True)
- event_datetime = Column(DateTime())
- event = Column(String(255))
- active_user_name = Column(String(255))
- active_user_id = Column(
- "active_user_id", Integer(), ForeignKey("fm_user.id", ondelete="SET NULL")
- )
- affected_asset_id = Column(
- "affected_asset_id",
- Integer(),
- ForeignKey("generic_asset.id", ondelete="SET NULL"),
- )
- @classmethod
- def add_record_for_attribute_update(
- cls,
- attribute_key: str,
- attribute_value: float | int | bool | str | list | dict | None,
- entity_type: str,
- asset_or_sensor: GenericAsset | Sensor,
- ) -> None:
- """Add audit log record about asset or sensor attribute update.
- :param attribute_key: attribute key to update
- :param attribute_value: new attribute value
- :param entity_type: 'asset' or 'sensor'
- :param asset_or_sensor: asset or sensor object
- """
- current_user_id, current_user_name = get_current_user_id_name()
- old_value = asset_or_sensor.attributes.get(attribute_key)
- if entity_type == "sensor":
- event = f"Updated sensor '{asset_or_sensor.name}': {asset_or_sensor.id}; "
- affected_asset_id = (asset_or_sensor.generic_asset_id,)
- else:
- event = f"Updated asset '{asset_or_sensor.name}': {asset_or_sensor.id}; "
- affected_asset_id = asset_or_sensor.id
- event += f"Attr '{attribute_key}' To {attribute_value} From {old_value}"
- audit_log = cls(
- event_datetime=server_now(),
- event=truncate_string(
- event, 255
- ), # we truncate the event string if it 255 characters by adding ellipses in the middle
- active_user_id=current_user_id,
- active_user_name=current_user_name,
- affected_asset_id=affected_asset_id,
- )
- db.session.add(audit_log)
- @classmethod
- def add_record(
- cls,
- asset: GenericAsset | Sensor,
- event: str,
- ) -> None:
- """Add audit log record about asset related crud actions.
- :param asset: asset or sensor object
- :param event: event to log
- """
- current_user_id, current_user_name = get_current_user_id_name()
- audit_log = AssetAuditLog(
- event_datetime=server_now(),
- event=truncate_string(
- event, 255
- ), # we truncate the event string if it exceeds 255 characters by adding ellipses in the middle
- active_user_id=current_user_id,
- active_user_name=current_user_name,
- affected_asset_id=asset.id,
- )
- db.session.add(audit_log)
- def truncate_string(value: str, max_length: int) -> str:
- """Truncate a string and add ellipses in the middle if it exceeds max_length."""
- if len(value) <= max_length:
- return value
- half_length = (max_length - 5) // 2
- return f"{value[:half_length]} ... {value[-half_length:]}"
|