data_delete.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. """
  2. CLI commands for removing data
  3. """
  4. from __future__ import annotations
  5. from datetime import datetime, timedelta
  6. from itertools import chain
  7. import click
  8. from flask import current_app as app
  9. from flask.cli import with_appcontext
  10. from timely_beliefs.beliefs.queries import query_unchanged_beliefs
  11. from sqlalchemy import delete, func, select
  12. from flexmeasures.data import db
  13. from flexmeasures.data.models.user import Account, AccountRole, RolesAccounts, User
  14. from flexmeasures.data.models.generic_assets import GenericAsset
  15. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  16. from flexmeasures.data.schemas import AwareDateTimeField, SensorIdField, AssetIdField
  17. from flexmeasures.data.services.users import find_user_by_email, delete_user
  18. from flexmeasures.cli.utils import (
  19. abort,
  20. done,
  21. DeprecatedOption,
  22. DeprecatedOptionsCommand,
  23. )
  24. from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list
  25. @click.group("delete")
  26. def fm_delete_data():
  27. """FlexMeasures: Delete data."""
  28. @fm_delete_data.command("account-role")
  29. @with_appcontext
  30. @click.option("--name", required=True)
  31. def delete_account_role(name: str):
  32. """
  33. Delete an account role.
  34. If it has accounts connected, print them before deleting the connection.
  35. """
  36. role: AccountRole = db.session.execute(
  37. select(AccountRole).filter_by(name=name)
  38. ).scalar_one_or_none()
  39. if role is None:
  40. abort(f"Account role '{name}' does not exist.")
  41. accounts = role.accounts.all()
  42. if len(accounts) > 0:
  43. click.secho(
  44. f"The following accounts have role '{role.name}': {','.join([a.name for a in accounts])}. Removing this role from them ...",
  45. )
  46. for account in accounts:
  47. account.account_roles.remove(role)
  48. db.session.execute(delete(AccountRole).filter_by(id=role.id))
  49. db.session.commit()
  50. done(f"Account role '{name}' has been deleted.")
  51. @fm_delete_data.command("account")
  52. @with_appcontext
  53. @click.option("--id", type=int)
  54. @click.option(
  55. "--force/--no-force", default=False, help="Skip warning about consequences."
  56. )
  57. def delete_account(id: int, force: bool):
  58. """
  59. Delete an account, including their users & data.
  60. """
  61. account: Account = db.session.get(Account, id)
  62. if account is None:
  63. abort(f"Account with ID '{id}' does not exist.")
  64. if not force:
  65. prompt = f"Delete account '{account.name}', including generic assets, users and all their data?\n"
  66. users = db.session.scalars(select(User).filter_by(account_id=id)).all()
  67. if users:
  68. prompt += "Affected users: " + ",".join([u.username for u in users]) + "\n"
  69. generic_assets = db.session.scalars(
  70. select(GenericAsset).filter_by(account_id=id)
  71. ).all()
  72. if generic_assets:
  73. prompt += (
  74. "Affected generic assets: "
  75. + ",".join([ga.name for ga in generic_assets])
  76. + "\n"
  77. )
  78. click.confirm(prompt, abort=True)
  79. for user in account.users:
  80. click.secho(f"Deleting user {user} ...")
  81. delete_user(user)
  82. for role_account_association in db.session.scalars(
  83. select(RolesAccounts).filter_by(account_id=account.id)
  84. ).all():
  85. role = db.session.get(AccountRole, role_account_association.role_id)
  86. click.echo(
  87. f"Deleting association of account {account.name} and role {role.name} ...",
  88. )
  89. db.session.execute(
  90. delete(RolesAccounts).filter_by(
  91. account_id=role_account_association.account_id
  92. )
  93. )
  94. for asset in account.generic_assets:
  95. click.echo(f"Deleting generic asset {asset} (and sensors & beliefs) ...")
  96. db.session.execute(delete(GenericAsset).filter_by(id=asset.id))
  97. account_name = account.name
  98. db.session.execute(delete(Account).filter_by(id=account.id))
  99. db.session.commit()
  100. done(f"Account {account_name} has been deleted.")
  101. @fm_delete_data.command("user")
  102. @with_appcontext
  103. @click.option("--email")
  104. @click.option(
  105. "--force/--no-force", default=False, help="Skip warning about consequences."
  106. )
  107. def delete_a_user(email: str, force: bool):
  108. """
  109. Delete a user & also their assets and data.
  110. """
  111. if not force:
  112. prompt = f"Delete user '{email}'?"
  113. click.confirm(prompt, abort=True)
  114. the_user = find_user_by_email(email)
  115. if the_user is None:
  116. abort(f"Could not find user with email address '{email}' ...")
  117. delete_user(the_user)
  118. db.session.commit()
  119. @fm_delete_data.command("asset")
  120. @with_appcontext
  121. @click.option("--id", "asset", type=AssetIdField())
  122. @click.option(
  123. "--force/--no-force", default=False, help="Skip warning about consequences."
  124. )
  125. def delete_asset_and_data(asset: GenericAsset, force: bool):
  126. """
  127. Delete an asset & also its sensors and data.
  128. """
  129. if not force:
  130. prompt = (
  131. f"Delete {asset.__repr__()}, including all its sensors, data and children?"
  132. )
  133. click.confirm(prompt, abort=True)
  134. db.session.execute(delete(GenericAsset).filter_by(id=asset.id))
  135. db.session.commit()
  136. @fm_delete_data.command("structure")
  137. @with_appcontext
  138. @click.option(
  139. "--force/--no-force", default=False, help="Skip warning about consequences."
  140. )
  141. def delete_structure(force):
  142. """
  143. Delete all structural (non time-series) data like assets (types),
  144. sources, roles and users.
  145. """
  146. if not force:
  147. click.confirm(
  148. f"Sure to delete all asset(type)s, sources, roles and users from {db.engine}?",
  149. abort=True,
  150. )
  151. from flexmeasures.data.scripts.data_gen import depopulate_structure
  152. depopulate_structure(db)
  153. @fm_delete_data.command("measurements", cls=DeprecatedOptionsCommand)
  154. @with_appcontext
  155. @click.option(
  156. "--sensor",
  157. "--sensor-id",
  158. "sensor_id",
  159. type=int,
  160. cls=DeprecatedOption,
  161. deprecated=["--sensor-id"],
  162. preferred="--sensor",
  163. help="Delete (time series) data for a single sensor only. Follow up with the sensor's ID.",
  164. )
  165. @click.option(
  166. "--force/--no-force", default=False, help="Skip warning about consequences."
  167. )
  168. def delete_measurements(
  169. force: bool,
  170. sensor_id: int | None = None,
  171. ):
  172. """Delete measurements (ex-post beliefs, i.e. with belief_horizon <= 0)."""
  173. if not force:
  174. click.confirm(f"Sure to delete all measurements from {db.engine}?", abort=True)
  175. from flexmeasures.data.scripts.data_gen import depopulate_measurements
  176. depopulate_measurements(db, sensor_id)
  177. @fm_delete_data.command("prognoses", cls=DeprecatedOptionsCommand)
  178. @with_appcontext
  179. @click.option(
  180. "--force/--no-force", default=False, help="Skip warning about consequences."
  181. )
  182. @click.option(
  183. "--sensor",
  184. "--sensor-id",
  185. "sensor_id",
  186. type=int,
  187. cls=DeprecatedOption,
  188. deprecated=["--sensor-id"],
  189. preferred="--sensor",
  190. help="Delete (time series) data for a single sensor only. Follow up with the sensor's ID. ",
  191. )
  192. def delete_prognoses(
  193. force: bool,
  194. sensor_id: int | None = None,
  195. ):
  196. """Delete forecasts and schedules (ex-ante beliefs, i.e. with belief_horizon > 0)."""
  197. if not force:
  198. click.confirm(f"Sure to delete all prognoses from {db.engine}?", abort=True)
  199. from flexmeasures.data.scripts.data_gen import depopulate_prognoses
  200. depopulate_prognoses(db, sensor_id)
  201. @fm_delete_data.command("beliefs")
  202. @with_appcontext
  203. @click.option(
  204. "--asset",
  205. "generic_assets",
  206. required=False,
  207. multiple=True,
  208. type=AssetIdField(),
  209. help="Delete all beliefs associated with (sensors of) this asset.",
  210. )
  211. @click.option(
  212. "--sensor",
  213. "sensors",
  214. required=False,
  215. multiple=True,
  216. type=SensorIdField(),
  217. help="Delete all beliefs associated with this sensor.",
  218. )
  219. @click.option(
  220. "--start",
  221. "start",
  222. type=AwareDateTimeField(),
  223. required=False,
  224. help="Remove beliefs about events starting at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
  225. )
  226. @click.option(
  227. "--end",
  228. "end",
  229. type=AwareDateTimeField(),
  230. required=False,
  231. help="Remove beliefs about events ending at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
  232. )
  233. @click.option("--offspring", type=bool, required=False, default=False, is_flag=True)
  234. def delete_beliefs( # noqa: C901
  235. generic_assets: list[GenericAsset],
  236. sensors: list[Sensor],
  237. start: datetime | None = None,
  238. end: datetime | None = None,
  239. offspring: bool = False,
  240. ):
  241. """Delete all beliefs recorded on a given sensor (or on sensors of a given asset)."""
  242. # Validate input
  243. if not generic_assets and not sensors:
  244. abort("Must pass at least one sensor or asset.")
  245. elif generic_assets and sensors:
  246. abort("Passing both sensors and assets at the same time is not supported.")
  247. if start is not None and end is not None and start > end:
  248. abort("Start should not exceed end.")
  249. if offspring and len(generic_assets) == 0:
  250. abort("Must pass at least one asset when the offspring option is employed.")
  251. # Time window filter
  252. event_filters = []
  253. if start is not None:
  254. event_filters += [TimedBelief.event_start >= start]
  255. if end is not None:
  256. event_filters += [TimedBelief.event_start + Sensor.event_resolution <= end]
  257. # Entity filter
  258. entity_filters = []
  259. if sensors:
  260. entity_filters += [TimedBelief.sensor_id.in_([sensor.id for sensor in sensors])]
  261. if generic_assets:
  262. # get the offspring of all generic assets
  263. generic_assets_offspring = []
  264. for asset in generic_assets:
  265. generic_assets_offspring.extend(asset.offspring)
  266. generic_assets = list(generic_assets) + generic_assets_offspring
  267. entity_filters += [
  268. TimedBelief.sensor_id == Sensor.id,
  269. Sensor.generic_asset_id.in_([asset.id for asset in generic_assets]),
  270. ]
  271. # Create query
  272. q = select(TimedBelief).join(Sensor).where(*entity_filters, *event_filters)
  273. # Prompt based on count of query
  274. num_beliefs_up_for_deletion = db.session.scalar(select(func.count()).select_from(q))
  275. # repr(entity) includes the IDs, which matters for the confirmation prompt
  276. if sensors:
  277. prompt = f"Delete all {num_beliefs_up_for_deletion} beliefs on {join_words_into_a_list([repr(sensor) for sensor in sensors])}?"
  278. elif generic_assets:
  279. prompt = f"Delete all {num_beliefs_up_for_deletion} beliefs on sensors of {join_words_into_a_list([repr(asset) for asset in generic_assets])}?"
  280. click.confirm(prompt, abort=True)
  281. db.session.execute(delete(TimedBelief).where(*entity_filters, *event_filters))
  282. click.secho(f"Removing {num_beliefs_up_for_deletion} beliefs ...")
  283. db.session.commit()
  284. num_beliefs_after = db.session.scalar(select(func.count()).select_from(q))
  285. # only show the entity names for the final confirmation
  286. message = f"{num_beliefs_after} beliefs left on sensors "
  287. if sensors:
  288. message += f"{join_words_into_a_list([sensor.name for sensor in sensors])}"
  289. elif generic_assets:
  290. message += (
  291. f"of {join_words_into_a_list([asset.name for asset in generic_assets])}"
  292. )
  293. if start is not None or end is not None:
  294. message += " within the specified time window"
  295. message += "."
  296. done(message)
  297. @fm_delete_data.command("unchanged-beliefs", cls=DeprecatedOptionsCommand)
  298. @with_appcontext
  299. @click.option(
  300. "--sensor",
  301. "--sensor-id",
  302. "sensor_id",
  303. type=int,
  304. cls=DeprecatedOption,
  305. deprecated=["--sensor-id"],
  306. preferred="--sensor",
  307. help="Delete unchanged (time series) data for a single sensor only. Follow up with the sensor's ID. ",
  308. )
  309. @click.option(
  310. "--delete-forecasts/--keep-forecasts",
  311. "delete_unchanged_forecasts",
  312. default=True,
  313. help="Use the --keep-forecasts flag to keep unchanged beliefs with a positive belief horizon (forecasts).",
  314. )
  315. @click.option(
  316. "--delete-measurements/--keep-measurements",
  317. "delete_unchanged_measurements",
  318. default=True,
  319. help="Use the --keep-measurements flag to keep beliefs with a zero or negative belief horizon (measurements, nowcasts and backcasts).",
  320. )
  321. def delete_unchanged_beliefs(
  322. sensor_id: int | None = None,
  323. delete_unchanged_forecasts: bool = True,
  324. delete_unchanged_measurements: bool = True,
  325. ):
  326. """Delete unchanged beliefs (i.e. updated beliefs with a later belief time, but with the same event value)."""
  327. q = select(TimedBelief)
  328. if sensor_id:
  329. sensor = db.session.get(Sensor, sensor_id)
  330. if sensor is None:
  331. abort(f"Failed to delete any beliefs: no sensor found with id {sensor_id}.")
  332. q = q.filter_by(sensor_id=sensor.id)
  333. num_beliefs_before = db.session.scalar(select(func.count()).select_from(q))
  334. unchanged_queries = []
  335. num_forecasts_up_for_deletion = 0
  336. num_measurements_up_for_deletion = 0
  337. if delete_unchanged_forecasts:
  338. q_unchanged_forecasts = query_unchanged_beliefs(
  339. db.session,
  340. TimedBelief,
  341. q.filter(
  342. TimedBelief.belief_horizon > timedelta(0),
  343. ),
  344. include_non_positive_horizons=False,
  345. )
  346. unchanged_queries.append(q_unchanged_forecasts)
  347. num_forecasts_up_for_deletion = db.session.scalar(
  348. select(func.count()).select_from(q_unchanged_forecasts)
  349. )
  350. if delete_unchanged_measurements:
  351. q_unchanged_measurements = query_unchanged_beliefs(
  352. db.session,
  353. TimedBelief,
  354. q.filter(
  355. TimedBelief.belief_horizon <= timedelta(0),
  356. ),
  357. include_positive_horizons=False,
  358. )
  359. unchanged_queries.append(q_unchanged_measurements)
  360. num_measurements_up_for_deletion = db.session.scalar(
  361. select(func.count()).select_from(q_unchanged_measurements)
  362. )
  363. num_beliefs_up_for_deletion = (
  364. num_forecasts_up_for_deletion + num_measurements_up_for_deletion
  365. )
  366. prompt = f"Delete {num_beliefs_up_for_deletion} unchanged beliefs ({num_measurements_up_for_deletion} measurements and {num_forecasts_up_for_deletion} forecasts) out of {num_beliefs_before} beliefs?"
  367. click.confirm(prompt, abort=True)
  368. beliefs_up_for_deletion = list(
  369. chain(*[db.session.scalars(q).all() for q in unchanged_queries])
  370. )
  371. batch_size = 10000
  372. for i, b in enumerate(beliefs_up_for_deletion, start=1):
  373. if i % batch_size == 0 or i == num_beliefs_up_for_deletion:
  374. click.echo(f"{i} beliefs processed ...")
  375. db.session.delete(b)
  376. click.secho(f"Removing {num_beliefs_up_for_deletion} beliefs ...")
  377. db.session.commit()
  378. num_beliefs_after = db.session.scalar(select(func.count()).select_from(q))
  379. done(f"{num_beliefs_after} beliefs left.")
  380. @fm_delete_data.command("nan-beliefs", cls=DeprecatedOptionsCommand)
  381. @with_appcontext
  382. @click.option(
  383. "--sensor",
  384. "--sensor-id",
  385. "sensor_id",
  386. type=int,
  387. cls=DeprecatedOption,
  388. deprecated=["--sensor-id"],
  389. preferred="--sensor",
  390. help="Delete NaN time series data for a single sensor only. Follow up with the sensor's ID.",
  391. )
  392. def delete_nan_beliefs(sensor_id: int | None = None):
  393. """Delete NaN beliefs."""
  394. q = db.session.query(TimedBelief)
  395. if sensor_id is not None:
  396. q = q.filter(TimedBelief.sensor_id == sensor_id)
  397. query = q.filter(TimedBelief.event_value == float("NaN"))
  398. prompt = f"Delete {query.count()} NaN beliefs out of {q.count()} beliefs?"
  399. click.confirm(prompt, abort=True)
  400. query.delete()
  401. db.session.commit()
  402. done(f"Done! {q.count()} beliefs left")
  403. @fm_delete_data.command("sensor")
  404. @with_appcontext
  405. @click.option(
  406. "--id",
  407. "sensors",
  408. type=SensorIdField(),
  409. required=True,
  410. multiple=True,
  411. help="Delete a sensor and its (time series) data. Follow up with the sensor's ID. "
  412. "This argument can be given multiple times",
  413. )
  414. def delete_sensor(
  415. sensors: list[Sensor],
  416. ):
  417. """Delete sensors and their (time series) data."""
  418. n = delete(TimedBelief).where(
  419. TimedBelief.sensor_id.in_(sensor.id for sensor in sensors)
  420. )
  421. statements = []
  422. for sensor in sensors:
  423. statements.append(delete(Sensor).filter_by(id=sensor.id))
  424. click.confirm(
  425. f"Delete {', '.join(sensor.__repr__() for sensor in sensors)}, along with {n} beliefs?",
  426. abort=True,
  427. )
  428. for statement in statements:
  429. db.session.execute(statement)
  430. db.session.commit()
  431. app.cli.add_command(fm_delete_data)