123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- """
- CLI commands for removing data
- """
- from __future__ import annotations
- from datetime import datetime, timedelta
- from itertools import chain
- import click
- from flask import current_app as app
- from flask.cli import with_appcontext
- from timely_beliefs.beliefs.queries import query_unchanged_beliefs
- from sqlalchemy import delete, func, select
- from flexmeasures.data import db
- from flexmeasures.data.models.user import Account, AccountRole, RolesAccounts, User
- from flexmeasures.data.models.generic_assets import GenericAsset
- from flexmeasures.data.models.time_series import Sensor, TimedBelief
- from flexmeasures.data.schemas import AwareDateTimeField, SensorIdField, AssetIdField
- from flexmeasures.data.services.users import find_user_by_email, delete_user
- from flexmeasures.cli.utils import (
- abort,
- done,
- DeprecatedOption,
- DeprecatedOptionsCommand,
- )
- from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list
- @click.group("delete")
- def fm_delete_data():
- """FlexMeasures: Delete data."""
- @fm_delete_data.command("account-role")
- @with_appcontext
- @click.option("--name", required=True)
- def delete_account_role(name: str):
- """
- Delete an account role.
- If it has accounts connected, print them before deleting the connection.
- """
- role: AccountRole = db.session.execute(
- select(AccountRole).filter_by(name=name)
- ).scalar_one_or_none()
- if role is None:
- abort(f"Account role '{name}' does not exist.")
- accounts = role.accounts.all()
- if len(accounts) > 0:
- click.secho(
- f"The following accounts have role '{role.name}': {','.join([a.name for a in accounts])}. Removing this role from them ...",
- )
- for account in accounts:
- account.account_roles.remove(role)
- db.session.execute(delete(AccountRole).filter_by(id=role.id))
- db.session.commit()
- done(f"Account role '{name}' has been deleted.")
- @fm_delete_data.command("account")
- @with_appcontext
- @click.option("--id", type=int)
- @click.option(
- "--force/--no-force", default=False, help="Skip warning about consequences."
- )
- def delete_account(id: int, force: bool):
- """
- Delete an account, including their users & data.
- """
- account: Account = db.session.get(Account, id)
- if account is None:
- abort(f"Account with ID '{id}' does not exist.")
- if not force:
- prompt = f"Delete account '{account.name}', including generic assets, users and all their data?\n"
- users = db.session.scalars(select(User).filter_by(account_id=id)).all()
- if users:
- prompt += "Affected users: " + ",".join([u.username for u in users]) + "\n"
- generic_assets = db.session.scalars(
- select(GenericAsset).filter_by(account_id=id)
- ).all()
- if generic_assets:
- prompt += (
- "Affected generic assets: "
- + ",".join([ga.name for ga in generic_assets])
- + "\n"
- )
- click.confirm(prompt, abort=True)
- for user in account.users:
- click.secho(f"Deleting user {user} ...")
- delete_user(user)
- for role_account_association in db.session.scalars(
- select(RolesAccounts).filter_by(account_id=account.id)
- ).all():
- role = db.session.get(AccountRole, role_account_association.role_id)
- click.echo(
- f"Deleting association of account {account.name} and role {role.name} ...",
- )
- db.session.execute(
- delete(RolesAccounts).filter_by(
- account_id=role_account_association.account_id
- )
- )
- for asset in account.generic_assets:
- click.echo(f"Deleting generic asset {asset} (and sensors & beliefs) ...")
- db.session.execute(delete(GenericAsset).filter_by(id=asset.id))
- account_name = account.name
- db.session.execute(delete(Account).filter_by(id=account.id))
- db.session.commit()
- done(f"Account {account_name} has been deleted.")
- @fm_delete_data.command("user")
- @with_appcontext
- @click.option("--email")
- @click.option(
- "--force/--no-force", default=False, help="Skip warning about consequences."
- )
- def delete_a_user(email: str, force: bool):
- """
- Delete a user & also their assets and data.
- """
- if not force:
- prompt = f"Delete user '{email}'?"
- click.confirm(prompt, abort=True)
- the_user = find_user_by_email(email)
- if the_user is None:
- abort(f"Could not find user with email address '{email}' ...")
- delete_user(the_user)
- db.session.commit()
- @fm_delete_data.command("asset")
- @with_appcontext
- @click.option("--id", "asset", type=AssetIdField())
- @click.option(
- "--force/--no-force", default=False, help="Skip warning about consequences."
- )
- def delete_asset_and_data(asset: GenericAsset, force: bool):
- """
- Delete an asset & also its sensors and data.
- """
- if not force:
- prompt = (
- f"Delete {asset.__repr__()}, including all its sensors, data and children?"
- )
- click.confirm(prompt, abort=True)
- db.session.execute(delete(GenericAsset).filter_by(id=asset.id))
- db.session.commit()
- @fm_delete_data.command("structure")
- @with_appcontext
- @click.option(
- "--force/--no-force", default=False, help="Skip warning about consequences."
- )
- def delete_structure(force):
- """
- Delete all structural (non time-series) data like assets (types),
- sources, roles and users.
- """
- if not force:
- click.confirm(
- f"Sure to delete all asset(type)s, sources, roles and users from {db.engine}?",
- abort=True,
- )
- from flexmeasures.data.scripts.data_gen import depopulate_structure
- depopulate_structure(db)
- @fm_delete_data.command("measurements", cls=DeprecatedOptionsCommand)
- @with_appcontext
- @click.option(
- "--sensor",
- "--sensor-id",
- "sensor_id",
- type=int,
- cls=DeprecatedOption,
- deprecated=["--sensor-id"],
- preferred="--sensor",
- help="Delete (time series) data for a single sensor only. Follow up with the sensor's ID.",
- )
- @click.option(
- "--force/--no-force", default=False, help="Skip warning about consequences."
- )
- def delete_measurements(
- force: bool,
- sensor_id: int | None = None,
- ):
- """Delete measurements (ex-post beliefs, i.e. with belief_horizon <= 0)."""
- if not force:
- click.confirm(f"Sure to delete all measurements from {db.engine}?", abort=True)
- from flexmeasures.data.scripts.data_gen import depopulate_measurements
- depopulate_measurements(db, sensor_id)
- @fm_delete_data.command("prognoses", cls=DeprecatedOptionsCommand)
- @with_appcontext
- @click.option(
- "--force/--no-force", default=False, help="Skip warning about consequences."
- )
- @click.option(
- "--sensor",
- "--sensor-id",
- "sensor_id",
- type=int,
- cls=DeprecatedOption,
- deprecated=["--sensor-id"],
- preferred="--sensor",
- help="Delete (time series) data for a single sensor only. Follow up with the sensor's ID. ",
- )
- def delete_prognoses(
- force: bool,
- sensor_id: int | None = None,
- ):
- """Delete forecasts and schedules (ex-ante beliefs, i.e. with belief_horizon > 0)."""
- if not force:
- click.confirm(f"Sure to delete all prognoses from {db.engine}?", abort=True)
- from flexmeasures.data.scripts.data_gen import depopulate_prognoses
- depopulate_prognoses(db, sensor_id)
- @fm_delete_data.command("beliefs")
- @with_appcontext
- @click.option(
- "--asset",
- "generic_assets",
- required=False,
- multiple=True,
- type=AssetIdField(),
- help="Delete all beliefs associated with (sensors of) this asset.",
- )
- @click.option(
- "--sensor",
- "sensors",
- required=False,
- multiple=True,
- type=SensorIdField(),
- help="Delete all beliefs associated with this sensor.",
- )
- @click.option(
- "--start",
- "start",
- type=AwareDateTimeField(),
- required=False,
- help="Remove beliefs about events starting at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
- )
- @click.option(
- "--end",
- "end",
- type=AwareDateTimeField(),
- required=False,
- help="Remove beliefs about events ending at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
- )
- @click.option("--offspring", type=bool, required=False, default=False, is_flag=True)
- def delete_beliefs( # noqa: C901
- generic_assets: list[GenericAsset],
- sensors: list[Sensor],
- start: datetime | None = None,
- end: datetime | None = None,
- offspring: bool = False,
- ):
- """Delete all beliefs recorded on a given sensor (or on sensors of a given asset)."""
- # Validate input
- if not generic_assets and not sensors:
- abort("Must pass at least one sensor or asset.")
- elif generic_assets and sensors:
- abort("Passing both sensors and assets at the same time is not supported.")
- if start is not None and end is not None and start > end:
- abort("Start should not exceed end.")
- if offspring and len(generic_assets) == 0:
- abort("Must pass at least one asset when the offspring option is employed.")
- # Time window filter
- event_filters = []
- if start is not None:
- event_filters += [TimedBelief.event_start >= start]
- if end is not None:
- event_filters += [TimedBelief.event_start + Sensor.event_resolution <= end]
- # Entity filter
- entity_filters = []
- if sensors:
- entity_filters += [TimedBelief.sensor_id.in_([sensor.id for sensor in sensors])]
- if generic_assets:
- # get the offspring of all generic assets
- generic_assets_offspring = []
- for asset in generic_assets:
- generic_assets_offspring.extend(asset.offspring)
- generic_assets = list(generic_assets) + generic_assets_offspring
- entity_filters += [
- TimedBelief.sensor_id == Sensor.id,
- Sensor.generic_asset_id.in_([asset.id for asset in generic_assets]),
- ]
- # Create query
- q = select(TimedBelief).join(Sensor).where(*entity_filters, *event_filters)
- # Prompt based on count of query
- num_beliefs_up_for_deletion = db.session.scalar(select(func.count()).select_from(q))
- # repr(entity) includes the IDs, which matters for the confirmation prompt
- if sensors:
- prompt = f"Delete all {num_beliefs_up_for_deletion} beliefs on {join_words_into_a_list([repr(sensor) for sensor in sensors])}?"
- elif generic_assets:
- prompt = f"Delete all {num_beliefs_up_for_deletion} beliefs on sensors of {join_words_into_a_list([repr(asset) for asset in generic_assets])}?"
- click.confirm(prompt, abort=True)
- db.session.execute(delete(TimedBelief).where(*entity_filters, *event_filters))
- click.secho(f"Removing {num_beliefs_up_for_deletion} beliefs ...")
- db.session.commit()
- num_beliefs_after = db.session.scalar(select(func.count()).select_from(q))
- # only show the entity names for the final confirmation
- message = f"{num_beliefs_after} beliefs left on sensors "
- if sensors:
- message += f"{join_words_into_a_list([sensor.name for sensor in sensors])}"
- elif generic_assets:
- message += (
- f"of {join_words_into_a_list([asset.name for asset in generic_assets])}"
- )
- if start is not None or end is not None:
- message += " within the specified time window"
- message += "."
- done(message)
- @fm_delete_data.command("unchanged-beliefs", cls=DeprecatedOptionsCommand)
- @with_appcontext
- @click.option(
- "--sensor",
- "--sensor-id",
- "sensor_id",
- type=int,
- cls=DeprecatedOption,
- deprecated=["--sensor-id"],
- preferred="--sensor",
- help="Delete unchanged (time series) data for a single sensor only. Follow up with the sensor's ID. ",
- )
- @click.option(
- "--delete-forecasts/--keep-forecasts",
- "delete_unchanged_forecasts",
- default=True,
- help="Use the --keep-forecasts flag to keep unchanged beliefs with a positive belief horizon (forecasts).",
- )
- @click.option(
- "--delete-measurements/--keep-measurements",
- "delete_unchanged_measurements",
- default=True,
- help="Use the --keep-measurements flag to keep beliefs with a zero or negative belief horizon (measurements, nowcasts and backcasts).",
- )
- def delete_unchanged_beliefs(
- sensor_id: int | None = None,
- delete_unchanged_forecasts: bool = True,
- delete_unchanged_measurements: bool = True,
- ):
- """Delete unchanged beliefs (i.e. updated beliefs with a later belief time, but with the same event value)."""
- q = select(TimedBelief)
- if sensor_id:
- sensor = db.session.get(Sensor, sensor_id)
- if sensor is None:
- abort(f"Failed to delete any beliefs: no sensor found with id {sensor_id}.")
- q = q.filter_by(sensor_id=sensor.id)
- num_beliefs_before = db.session.scalar(select(func.count()).select_from(q))
- unchanged_queries = []
- num_forecasts_up_for_deletion = 0
- num_measurements_up_for_deletion = 0
- if delete_unchanged_forecasts:
- q_unchanged_forecasts = query_unchanged_beliefs(
- db.session,
- TimedBelief,
- q.filter(
- TimedBelief.belief_horizon > timedelta(0),
- ),
- include_non_positive_horizons=False,
- )
- unchanged_queries.append(q_unchanged_forecasts)
- num_forecasts_up_for_deletion = db.session.scalar(
- select(func.count()).select_from(q_unchanged_forecasts)
- )
- if delete_unchanged_measurements:
- q_unchanged_measurements = query_unchanged_beliefs(
- db.session,
- TimedBelief,
- q.filter(
- TimedBelief.belief_horizon <= timedelta(0),
- ),
- include_positive_horizons=False,
- )
- unchanged_queries.append(q_unchanged_measurements)
- num_measurements_up_for_deletion = db.session.scalar(
- select(func.count()).select_from(q_unchanged_measurements)
- )
- num_beliefs_up_for_deletion = (
- num_forecasts_up_for_deletion + num_measurements_up_for_deletion
- )
- prompt = f"Delete {num_beliefs_up_for_deletion} unchanged beliefs ({num_measurements_up_for_deletion} measurements and {num_forecasts_up_for_deletion} forecasts) out of {num_beliefs_before} beliefs?"
- click.confirm(prompt, abort=True)
- beliefs_up_for_deletion = list(
- chain(*[db.session.scalars(q).all() for q in unchanged_queries])
- )
- batch_size = 10000
- for i, b in enumerate(beliefs_up_for_deletion, start=1):
- if i % batch_size == 0 or i == num_beliefs_up_for_deletion:
- click.echo(f"{i} beliefs processed ...")
- db.session.delete(b)
- click.secho(f"Removing {num_beliefs_up_for_deletion} beliefs ...")
- db.session.commit()
- num_beliefs_after = db.session.scalar(select(func.count()).select_from(q))
- done(f"{num_beliefs_after} beliefs left.")
- @fm_delete_data.command("nan-beliefs", cls=DeprecatedOptionsCommand)
- @with_appcontext
- @click.option(
- "--sensor",
- "--sensor-id",
- "sensor_id",
- type=int,
- cls=DeprecatedOption,
- deprecated=["--sensor-id"],
- preferred="--sensor",
- help="Delete NaN time series data for a single sensor only. Follow up with the sensor's ID.",
- )
- def delete_nan_beliefs(sensor_id: int | None = None):
- """Delete NaN beliefs."""
- q = db.session.query(TimedBelief)
- if sensor_id is not None:
- q = q.filter(TimedBelief.sensor_id == sensor_id)
- query = q.filter(TimedBelief.event_value == float("NaN"))
- prompt = f"Delete {query.count()} NaN beliefs out of {q.count()} beliefs?"
- click.confirm(prompt, abort=True)
- query.delete()
- db.session.commit()
- done(f"Done! {q.count()} beliefs left")
- @fm_delete_data.command("sensor")
- @with_appcontext
- @click.option(
- "--id",
- "sensors",
- type=SensorIdField(),
- required=True,
- multiple=True,
- help="Delete a sensor and its (time series) data. Follow up with the sensor's ID. "
- "This argument can be given multiple times",
- )
- def delete_sensor(
- sensors: list[Sensor],
- ):
- """Delete sensors and their (time series) data."""
- n = delete(TimedBelief).where(
- TimedBelief.sensor_id.in_(sensor.id for sensor in sensors)
- )
- statements = []
- for sensor in sensors:
- statements.append(delete(Sensor).filter_by(id=sensor.id))
- click.confirm(
- f"Delete {', '.join(sensor.__repr__() for sensor in sensors)}, along with {n} beliefs?",
- abort=True,
- )
- for statement in statements:
- db.session.execute(statement)
- db.session.commit()
- app.cli.add_command(fm_delete_data)
|