data_gen.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. """
  2. Populate the database with data we know or read in.
  3. """
  4. from __future__ import annotations
  5. from pathlib import Path
  6. from shutil import rmtree
  7. from datetime import datetime, timedelta
  8. import pandas as pd
  9. from flask import current_app as app
  10. from flask_sqlalchemy import SQLAlchemy
  11. import click
  12. from sqlalchemy import func, and_, select, delete
  13. from sqlalchemy.exc import SQLAlchemyError
  14. from sqlalchemy.ext.serializer import loads, dumps
  15. from timetomodel.forecasting import make_rolling_forecasts
  16. from timetomodel.exceptions import MissingData, NaNData
  17. from humanize import naturaldelta
  18. import inflect
  19. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  20. from flexmeasures.data.models.generic_assets import GenericAssetType, GenericAsset
  21. from flexmeasures.data.models.data_sources import DataSource
  22. from flexmeasures.data.models.user import User, Role, RolesUsers, AccountRole
  23. from flexmeasures.data.models.forecasting import lookup_model_specs_configurator
  24. from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException
  25. from flexmeasures.utils.time_utils import ensure_local_timezone
  26. from flexmeasures.data.transactional import as_transaction
  27. from flexmeasures.cli.utils import MsgStyle
  28. BACKUP_PATH = app.config.get("FLEXMEASURES_DB_BACKUP_PATH")
  29. LOCAL_TIME_ZONE = app.config.get("FLEXMEASURES_TIMEZONE")
  30. infl_eng = inflect.engine()
  31. def add_default_data_sources(db: SQLAlchemy):
  32. for source_name, source_type in (
  33. ("Seita", "demo script"),
  34. ("Seita", "forecaster"),
  35. ("Seita", "scheduler"),
  36. ):
  37. source = db.session.execute(
  38. select(DataSource).filter(
  39. and_(DataSource.name == source_name, DataSource.type == source_type)
  40. )
  41. ).scalar_one_or_none()
  42. if source:
  43. click.echo(f"Source {source_name} ({source_type}) already exists.")
  44. else:
  45. db.session.add(DataSource(name=source_name, type=source_type))
  46. def add_default_asset_types(db: SQLAlchemy) -> dict[str, GenericAssetType]:
  47. """
  48. Add a few useful asset types.
  49. """
  50. types = {}
  51. for type_name, type_description in (
  52. ("solar", "solar panel(s)"),
  53. ("wind", "wind turbine"),
  54. ("one-way_evse", "uni-directional Electric Vehicle Supply Equipment"),
  55. ("two-way_evse", "bi-directional Electric Vehicle Supply Equipment"),
  56. ("battery", "stationary battery"),
  57. ("building", "building"),
  58. ("process", "process"),
  59. ):
  60. _type = db.session.execute(
  61. select(GenericAssetType).filter_by(name=type_name)
  62. ).scalar_one_or_none()
  63. if _type is None:
  64. _type = GenericAssetType(name=type_name, description=type_description)
  65. db.session.add(_type)
  66. click.secho(
  67. f"Generic asset type `{type_name}` created successfully.",
  68. **MsgStyle.SUCCESS,
  69. )
  70. types[type_name] = _type
  71. return types
  72. def add_default_user_roles(db: SQLAlchemy):
  73. """
  74. Add a few useful user roles.
  75. """
  76. from flexmeasures.auth import policy as auth_policy
  77. for role_name, role_description in (
  78. (auth_policy.ADMIN_ROLE, "Super user"),
  79. (auth_policy.ADMIN_READER_ROLE, "Can read everything"),
  80. (
  81. auth_policy.ACCOUNT_ADMIN_ROLE,
  82. "Can update and delete data in their account (e.g. assets, sensors, users, beliefs)",
  83. ),
  84. (
  85. auth_policy.CONSULTANT_ROLE,
  86. "Can read everything in consultancy client accounts",
  87. ),
  88. ):
  89. role = db.session.execute(
  90. select(Role).filter_by(name=role_name)
  91. ).scalar_one_or_none()
  92. if role:
  93. click.echo(f"Role {role_name} already exists.")
  94. else:
  95. db.session.add(Role(name=role_name, description=role_description))
  96. def add_default_account_roles(db: SQLAlchemy):
  97. """
  98. Add a few useful account roles, inspired by USEF.
  99. """
  100. for role_name, role_description in (
  101. ("Prosumer", "A consumer who might also produce"),
  102. ("MDC", "Metering Data Company"),
  103. ("Supplier", "Supplier of energy"),
  104. ("Aggregator", "Aggregator of energy flexibility"),
  105. ("ESCO", "Energy Service Company"),
  106. ):
  107. role = db.session.execute(
  108. select(AccountRole).filter_by(name=role_name)
  109. ).scalar_one_or_none()
  110. if role:
  111. click.echo(f"Account role {role_name} already exists.")
  112. else:
  113. db.session.add(AccountRole(name=role_name, description=role_description))
  114. def add_transmission_zone_asset(country_code: str, db: SQLAlchemy) -> GenericAsset:
  115. """
  116. Ensure a GenericAsset exists to model a transmission zone for a country.
  117. """
  118. transmission_zone_type = db.session.execute(
  119. select(GenericAssetType).filter_by(name="transmission zone")
  120. ).scalar_one_or_none()
  121. if not transmission_zone_type:
  122. click.echo("Adding transmission zone type ...")
  123. transmission_zone_type = GenericAssetType(
  124. name="transmission zone",
  125. description="A grid regulated & balanced as a whole, usually a national grid.",
  126. )
  127. db.session.add(transmission_zone_type)
  128. ga_name = f"{country_code} transmission zone"
  129. transmission_zone = db.session.execute(
  130. select(GenericAsset).filter_by(name=ga_name)
  131. ).scalar_one_or_none()
  132. if not transmission_zone:
  133. click.echo(f"Adding {ga_name} ...")
  134. transmission_zone = GenericAsset(
  135. name=ga_name,
  136. generic_asset_type=transmission_zone_type,
  137. account_id=None, # public
  138. )
  139. db.session.add(transmission_zone)
  140. return transmission_zone
  141. # ------------ Main functions --------------------------------
  142. # These can registered at the app object as cli functions
  143. @as_transaction
  144. def populate_initial_structure(db: SQLAlchemy):
  145. """
  146. Add initially useful structural data.
  147. """
  148. click.echo("Populating the database %s with structural data ..." % db.engine)
  149. add_default_data_sources(db)
  150. add_default_user_roles(db)
  151. add_default_account_roles(db)
  152. add_default_asset_types(db)
  153. click.echo(
  154. "DB now has %d DataSource(s)"
  155. % db.session.scalar(select(func.count()).select_from(DataSource))
  156. )
  157. click.echo(
  158. "DB now has %d AssetType(s)"
  159. % db.session.scalar(select(func.count()).select_from(GenericAssetType))
  160. )
  161. click.echo(
  162. "DB now has %d Role(s) for users"
  163. % db.session.scalar(select(func.count()).select_from(Role))
  164. )
  165. click.echo(
  166. "DB now has %d AccountRole(s)"
  167. % db.session.scalar(select(func.count()).select_from(AccountRole))
  168. )
  169. @as_transaction # noqa: C901
  170. def populate_time_series_forecasts( # noqa: C901
  171. db: SQLAlchemy,
  172. sensor_ids: list[int],
  173. horizons: list[timedelta],
  174. forecast_start: datetime,
  175. forecast_end: datetime,
  176. event_resolution: timedelta | None = None,
  177. ):
  178. training_and_testing_period = timedelta(days=30)
  179. click.echo(
  180. "Populating the database %s with time series forecasts of %s ahead ..."
  181. % (db.engine, infl_eng.join([naturaldelta(horizon) for horizon in horizons]))
  182. )
  183. # Set a data source for the forecasts
  184. data_source = db.session.execute(
  185. select(DataSource).filter_by(name="Seita", type="demo script")
  186. ).scalar_one_or_none()
  187. # List all sensors for which to forecast.
  188. sensors = [
  189. db.session.execute(
  190. select(Sensor).filter(Sensor.id.in_(sensor_ids))
  191. ).scalar_one_or_none()
  192. ]
  193. if not sensors:
  194. click.echo("No such sensors in db, so I will not add any forecasts.")
  195. return
  196. # Make a model for each sensor and horizon, make rolling forecasts and save to database.
  197. # We cannot use (faster) bulk save, as forecasts might become regressors in other forecasts.
  198. for sensor in sensors:
  199. for horizon in horizons:
  200. try:
  201. default_model = lookup_model_specs_configurator()
  202. model_specs, model_identifier, model_fallback = default_model(
  203. sensor=sensor,
  204. forecast_start=forecast_start,
  205. forecast_end=forecast_end,
  206. forecast_horizon=horizon,
  207. custom_model_params=dict(
  208. training_and_testing_period=training_and_testing_period,
  209. event_resolution=event_resolution,
  210. ),
  211. )
  212. click.echo(
  213. "Computing forecasts of %s ahead for sensor %s, "
  214. "from %s to %s with a training and testing period of %s, using %s ..."
  215. % (
  216. naturaldelta(horizon),
  217. sensor.id,
  218. forecast_start,
  219. forecast_end,
  220. naturaldelta(training_and_testing_period),
  221. model_identifier,
  222. )
  223. )
  224. model_specs.creation_time = forecast_start
  225. forecasts, model_state = make_rolling_forecasts(
  226. start=forecast_start, end=forecast_end, model_specs=model_specs
  227. )
  228. # Upsample to sensor resolution if needed
  229. if forecasts.index.freq > pd.Timedelta(sensor.event_resolution):
  230. forecasts = model_specs.outcome_var.resample_data(
  231. forecasts,
  232. time_window=(forecasts.index.min(), forecasts.index.max()),
  233. expected_frequency=sensor.event_resolution,
  234. )
  235. except (NotEnoughDataException, MissingData, NaNData) as e:
  236. click.echo("Skipping forecasts for sensor %s: %s" % (sensor, str(e)))
  237. continue
  238. beliefs = [
  239. TimedBelief(
  240. event_start=ensure_local_timezone(dt, tz_name=LOCAL_TIME_ZONE),
  241. belief_horizon=horizon,
  242. event_value=value,
  243. sensor=sensor,
  244. source=data_source,
  245. )
  246. for dt, value in forecasts.items()
  247. ]
  248. click.echo(
  249. "Saving %s %s-forecasts for %s..."
  250. % (len(beliefs), naturaldelta(horizon), sensor.id)
  251. )
  252. for belief in beliefs:
  253. db.session.add(belief)
  254. click.echo(
  255. "DB now has %d forecasts"
  256. % db.session.scalar(
  257. select(func.count())
  258. .select_from(TimedBelief)
  259. .filter(TimedBelief.belief_horizon > timedelta(hours=0))
  260. )
  261. )
  262. @as_transaction
  263. def depopulate_structure(db: SQLAlchemy):
  264. click.echo("Depopulating structural data from the database %s ..." % db.engine)
  265. num_assets_deleted = db.session.execute(delete(GenericAsset))
  266. num_asset_types_deleted = db.session.execute(delete(GenericAssetType))
  267. num_data_sources_deleted = db.session.execute(delete(DataSource))
  268. num_roles_deleted = db.session.execute(delete(Role))
  269. num_users_deleted = db.session.execute(delete(User))
  270. click.echo("Deleted %d AssetTypes" % num_asset_types_deleted)
  271. click.echo("Deleted %d Assets" % num_assets_deleted)
  272. click.echo("Deleted %d DataSources" % num_data_sources_deleted)
  273. click.echo("Deleted %d Roles" % num_roles_deleted)
  274. click.echo("Deleted %d Users" % num_users_deleted)
  275. @as_transaction
  276. def depopulate_measurements(
  277. db: SQLAlchemy,
  278. sensor_id: id | None = None,
  279. ):
  280. click.echo("Deleting (time series) data from the database %s ..." % db.engine)
  281. query = delete(TimedBelief).filter(TimedBelief.belief_horizon <= timedelta(hours=0))
  282. if sensor_id is not None:
  283. query = query.filter(TimedBelief.sensor_id == sensor_id)
  284. deletion_result = db.session.execute(query)
  285. num_measurements_deleted = deletion_result.rowcount
  286. click.echo("Deleted %d measurements (ex-post beliefs)" % num_measurements_deleted)
  287. @as_transaction
  288. def depopulate_prognoses(
  289. db: SQLAlchemy,
  290. sensor_id: id | None = None,
  291. ):
  292. """
  293. Delete all prognosis data (with an horizon > 0).
  294. This affects forecasts as well as schedules.
  295. Pass a sensor ID to restrict to data on one sensor only.
  296. If no sensor is specified, this function also deletes forecasting and scheduling jobs.
  297. (Doing this only for jobs which forecast/schedule one sensor is not implemented and also tricky.)
  298. """
  299. click.echo(
  300. "Deleting (time series) forecasts and schedules data from the database %s ..."
  301. % db.engine
  302. )
  303. if not sensor_id:
  304. num_forecasting_jobs_deleted = app.queues["forecasting"].empty()
  305. num_scheduling_jobs_deleted = app.queues["scheduling"].empty()
  306. # Clear all forecasts (data with positive horizon)
  307. query = delete(TimedBelief).filter(TimedBelief.belief_horizon > timedelta(hours=0))
  308. if sensor_id is not None:
  309. query = query.filter(TimedBelief.sensor_id == sensor_id)
  310. deletion_result = db.session.execute(query)
  311. num_forecasts_deleted = deletion_result.rowcount
  312. if not sensor_id:
  313. click.echo("Deleted %d Forecast Jobs" % num_forecasting_jobs_deleted)
  314. click.echo("Deleted %d Schedule Jobs" % num_scheduling_jobs_deleted)
  315. click.echo("Deleted %d forecasts (ex-ante beliefs)" % num_forecasts_deleted)
  316. def reset_db(db: SQLAlchemy):
  317. db.session.commit() # close any existing sessions
  318. click.echo("Dropping everything in %s ..." % db.engine)
  319. db.reflect() # see http://jrheard.tumblr.com/post/12759432733/dropping-all-tables-on-postgres-using
  320. db.drop_all()
  321. click.echo("Recreating everything ...")
  322. db.create_all()
  323. click.echo("Committing ...")
  324. db.session.commit()
  325. def save_tables(
  326. db: SQLAlchemy,
  327. backup_name: str = "",
  328. structure: bool = True,
  329. data: bool = False,
  330. backup_path: str = BACKUP_PATH,
  331. ):
  332. # Make a new folder for the backup
  333. backup_folder = Path("%s/%s" % (backup_path, backup_name))
  334. try:
  335. backup_folder.mkdir(parents=True, exist_ok=False)
  336. except FileExistsError:
  337. click.echo(
  338. "Can't save backup, because directory %s/%s already exists."
  339. % (backup_path, backup_name)
  340. )
  341. return
  342. affected_classes = get_affected_classes(structure, data)
  343. c = None
  344. try:
  345. for c in affected_classes:
  346. file_path = "%s/%s/%s.obj" % (backup_path, backup_name, c.__tablename__)
  347. with open(file_path, "xb") as file_handler:
  348. file_handler.write(dumps(db.session.scalars(select(c))).all())
  349. click.echo("Successfully saved %s/%s." % (backup_name, c.__tablename__))
  350. except SQLAlchemyError as e:
  351. click.echo(
  352. "Can't save table %s because of the following error:\n\n\t%s\n\nCleaning up..."
  353. % (c.__tablename__, e)
  354. )
  355. rmtree(backup_folder)
  356. click.echo("Removed directory %s/%s." % (backup_path, backup_name))
  357. @as_transaction
  358. def load_tables(
  359. db: SQLAlchemy,
  360. backup_name: str = "",
  361. structure: bool = True,
  362. data: bool = False,
  363. backup_path: str = BACKUP_PATH,
  364. ):
  365. if (
  366. Path("%s/%s" % (backup_path, backup_name)).exists()
  367. and Path("%s/%s" % (backup_path, backup_name)).is_dir()
  368. ):
  369. affected_classes = get_affected_classes(structure, data)
  370. statement = "SELECT sequence_name from information_schema.sequences;"
  371. data = db.session.execute(statement).fetchall()
  372. sequence_names = [s.sequence_name for s in data]
  373. for c in affected_classes:
  374. file_path = "%s/%s/%s.obj" % (backup_path, backup_name, c.__tablename__)
  375. sequence_name = "%s_id_seq" % c.__tablename__
  376. try:
  377. with open(file_path, "rb") as file_handler:
  378. for row in loads(file_handler.read()):
  379. db.session.merge(row)
  380. if sequence_name in sequence_names:
  381. # Get max id
  382. max_id = db.session.execute(
  383. select(func.max(c.id)).select_from(c)
  384. ).scalar_one_or_none()
  385. max_id = 1 if max_id is None else max_id
  386. # Set table seq to max id
  387. db.engine.execute(
  388. "SELECT setval('%s', %s, true);" % (sequence_name, max_id)
  389. )
  390. click.echo(
  391. "Successfully loaded %s/%s." % (backup_name, c.__tablename__)
  392. )
  393. except FileNotFoundError:
  394. click.echo(
  395. "Can't load table, because filename %s does not exist."
  396. % c.__tablename__
  397. )
  398. else:
  399. click.echo(
  400. "Can't load backup, because directory %s/%s does not exist."
  401. % (backup_path, backup_name)
  402. )
  403. def get_affected_classes(structure: bool = True, data: bool = False) -> list:
  404. affected_classes = []
  405. if structure:
  406. affected_classes += [
  407. Role,
  408. User,
  409. RolesUsers,
  410. Sensor,
  411. GenericAssetType,
  412. GenericAsset,
  413. DataSource,
  414. ]
  415. if data:
  416. affected_classes += [TimedBelief]
  417. return affected_classes