123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- """
- Populate the database with data we know or read in.
- """
- from __future__ import annotations
- from pathlib import Path
- from shutil import rmtree
- from datetime import datetime, timedelta
- import pandas as pd
- from flask import current_app as app
- from flask_sqlalchemy import SQLAlchemy
- import click
- from sqlalchemy import func, and_, select, delete
- from sqlalchemy.exc import SQLAlchemyError
- from sqlalchemy.ext.serializer import loads, dumps
- from timetomodel.forecasting import make_rolling_forecasts
- from timetomodel.exceptions import MissingData, NaNData
- from humanize import naturaldelta
- import inflect
- from flexmeasures.data.models.time_series import Sensor, TimedBelief
- from flexmeasures.data.models.generic_assets import GenericAssetType, GenericAsset
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.models.user import User, Role, RolesUsers, AccountRole
- from flexmeasures.data.models.forecasting import lookup_model_specs_configurator
- from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException
- from flexmeasures.utils.time_utils import ensure_local_timezone
- from flexmeasures.data.transactional import as_transaction
- from flexmeasures.cli.utils import MsgStyle
- BACKUP_PATH = app.config.get("FLEXMEASURES_DB_BACKUP_PATH")
- LOCAL_TIME_ZONE = app.config.get("FLEXMEASURES_TIMEZONE")
- infl_eng = inflect.engine()
- def add_default_data_sources(db: SQLAlchemy):
- for source_name, source_type in (
- ("Seita", "demo script"),
- ("Seita", "forecaster"),
- ("Seita", "scheduler"),
- ):
- source = db.session.execute(
- select(DataSource).filter(
- and_(DataSource.name == source_name, DataSource.type == source_type)
- )
- ).scalar_one_or_none()
- if source:
- click.echo(f"Source {source_name} ({source_type}) already exists.")
- else:
- db.session.add(DataSource(name=source_name, type=source_type))
- def add_default_asset_types(db: SQLAlchemy) -> dict[str, GenericAssetType]:
- """
- Add a few useful asset types.
- """
- types = {}
- for type_name, type_description in (
- ("solar", "solar panel(s)"),
- ("wind", "wind turbine"),
- ("one-way_evse", "uni-directional Electric Vehicle Supply Equipment"),
- ("two-way_evse", "bi-directional Electric Vehicle Supply Equipment"),
- ("battery", "stationary battery"),
- ("building", "building"),
- ("process", "process"),
- ):
- _type = db.session.execute(
- select(GenericAssetType).filter_by(name=type_name)
- ).scalar_one_or_none()
- if _type is None:
- _type = GenericAssetType(name=type_name, description=type_description)
- db.session.add(_type)
- click.secho(
- f"Generic asset type `{type_name}` created successfully.",
- **MsgStyle.SUCCESS,
- )
- types[type_name] = _type
- return types
- def add_default_user_roles(db: SQLAlchemy):
- """
- Add a few useful user roles.
- """
- from flexmeasures.auth import policy as auth_policy
- for role_name, role_description in (
- (auth_policy.ADMIN_ROLE, "Super user"),
- (auth_policy.ADMIN_READER_ROLE, "Can read everything"),
- (
- auth_policy.ACCOUNT_ADMIN_ROLE,
- "Can update and delete data in their account (e.g. assets, sensors, users, beliefs)",
- ),
- (
- auth_policy.CONSULTANT_ROLE,
- "Can read everything in consultancy client accounts",
- ),
- ):
- role = db.session.execute(
- select(Role).filter_by(name=role_name)
- ).scalar_one_or_none()
- if role:
- click.echo(f"Role {role_name} already exists.")
- else:
- db.session.add(Role(name=role_name, description=role_description))
- def add_default_account_roles(db: SQLAlchemy):
- """
- Add a few useful account roles, inspired by USEF.
- """
- for role_name, role_description in (
- ("Prosumer", "A consumer who might also produce"),
- ("MDC", "Metering Data Company"),
- ("Supplier", "Supplier of energy"),
- ("Aggregator", "Aggregator of energy flexibility"),
- ("ESCO", "Energy Service Company"),
- ):
- role = db.session.execute(
- select(AccountRole).filter_by(name=role_name)
- ).scalar_one_or_none()
- if role:
- click.echo(f"Account role {role_name} already exists.")
- else:
- db.session.add(AccountRole(name=role_name, description=role_description))
- def add_transmission_zone_asset(country_code: str, db: SQLAlchemy) -> GenericAsset:
- """
- Ensure a GenericAsset exists to model a transmission zone for a country.
- """
- transmission_zone_type = db.session.execute(
- select(GenericAssetType).filter_by(name="transmission zone")
- ).scalar_one_or_none()
- if not transmission_zone_type:
- click.echo("Adding transmission zone type ...")
- transmission_zone_type = GenericAssetType(
- name="transmission zone",
- description="A grid regulated & balanced as a whole, usually a national grid.",
- )
- db.session.add(transmission_zone_type)
- ga_name = f"{country_code} transmission zone"
- transmission_zone = db.session.execute(
- select(GenericAsset).filter_by(name=ga_name)
- ).scalar_one_or_none()
- if not transmission_zone:
- click.echo(f"Adding {ga_name} ...")
- transmission_zone = GenericAsset(
- name=ga_name,
- generic_asset_type=transmission_zone_type,
- account_id=None, # public
- )
- db.session.add(transmission_zone)
- return transmission_zone
- # ------------ Main functions --------------------------------
- # These can registered at the app object as cli functions
- @as_transaction
- def populate_initial_structure(db: SQLAlchemy):
- """
- Add initially useful structural data.
- """
- click.echo("Populating the database %s with structural data ..." % db.engine)
- add_default_data_sources(db)
- add_default_user_roles(db)
- add_default_account_roles(db)
- add_default_asset_types(db)
- click.echo(
- "DB now has %d DataSource(s)"
- % db.session.scalar(select(func.count()).select_from(DataSource))
- )
- click.echo(
- "DB now has %d AssetType(s)"
- % db.session.scalar(select(func.count()).select_from(GenericAssetType))
- )
- click.echo(
- "DB now has %d Role(s) for users"
- % db.session.scalar(select(func.count()).select_from(Role))
- )
- click.echo(
- "DB now has %d AccountRole(s)"
- % db.session.scalar(select(func.count()).select_from(AccountRole))
- )
- @as_transaction # noqa: C901
- def populate_time_series_forecasts( # noqa: C901
- db: SQLAlchemy,
- sensor_ids: list[int],
- horizons: list[timedelta],
- forecast_start: datetime,
- forecast_end: datetime,
- event_resolution: timedelta | None = None,
- ):
- training_and_testing_period = timedelta(days=30)
- click.echo(
- "Populating the database %s with time series forecasts of %s ahead ..."
- % (db.engine, infl_eng.join([naturaldelta(horizon) for horizon in horizons]))
- )
- # Set a data source for the forecasts
- data_source = db.session.execute(
- select(DataSource).filter_by(name="Seita", type="demo script")
- ).scalar_one_or_none()
- # List all sensors for which to forecast.
- sensors = [
- db.session.execute(
- select(Sensor).filter(Sensor.id.in_(sensor_ids))
- ).scalar_one_or_none()
- ]
- if not sensors:
- click.echo("No such sensors in db, so I will not add any forecasts.")
- return
- # Make a model for each sensor and horizon, make rolling forecasts and save to database.
- # We cannot use (faster) bulk save, as forecasts might become regressors in other forecasts.
- for sensor in sensors:
- for horizon in horizons:
- try:
- default_model = lookup_model_specs_configurator()
- model_specs, model_identifier, model_fallback = default_model(
- sensor=sensor,
- forecast_start=forecast_start,
- forecast_end=forecast_end,
- forecast_horizon=horizon,
- custom_model_params=dict(
- training_and_testing_period=training_and_testing_period,
- event_resolution=event_resolution,
- ),
- )
- click.echo(
- "Computing forecasts of %s ahead for sensor %s, "
- "from %s to %s with a training and testing period of %s, using %s ..."
- % (
- naturaldelta(horizon),
- sensor.id,
- forecast_start,
- forecast_end,
- naturaldelta(training_and_testing_period),
- model_identifier,
- )
- )
- model_specs.creation_time = forecast_start
- forecasts, model_state = make_rolling_forecasts(
- start=forecast_start, end=forecast_end, model_specs=model_specs
- )
- # Upsample to sensor resolution if needed
- if forecasts.index.freq > pd.Timedelta(sensor.event_resolution):
- forecasts = model_specs.outcome_var.resample_data(
- forecasts,
- time_window=(forecasts.index.min(), forecasts.index.max()),
- expected_frequency=sensor.event_resolution,
- )
- except (NotEnoughDataException, MissingData, NaNData) as e:
- click.echo("Skipping forecasts for sensor %s: %s" % (sensor, str(e)))
- continue
- beliefs = [
- TimedBelief(
- event_start=ensure_local_timezone(dt, tz_name=LOCAL_TIME_ZONE),
- belief_horizon=horizon,
- event_value=value,
- sensor=sensor,
- source=data_source,
- )
- for dt, value in forecasts.items()
- ]
- click.echo(
- "Saving %s %s-forecasts for %s..."
- % (len(beliefs), naturaldelta(horizon), sensor.id)
- )
- for belief in beliefs:
- db.session.add(belief)
- click.echo(
- "DB now has %d forecasts"
- % db.session.scalar(
- select(func.count())
- .select_from(TimedBelief)
- .filter(TimedBelief.belief_horizon > timedelta(hours=0))
- )
- )
- @as_transaction
- def depopulate_structure(db: SQLAlchemy):
- click.echo("Depopulating structural data from the database %s ..." % db.engine)
- num_assets_deleted = db.session.execute(delete(GenericAsset))
- num_asset_types_deleted = db.session.execute(delete(GenericAssetType))
- num_data_sources_deleted = db.session.execute(delete(DataSource))
- num_roles_deleted = db.session.execute(delete(Role))
- num_users_deleted = db.session.execute(delete(User))
- click.echo("Deleted %d AssetTypes" % num_asset_types_deleted)
- click.echo("Deleted %d Assets" % num_assets_deleted)
- click.echo("Deleted %d DataSources" % num_data_sources_deleted)
- click.echo("Deleted %d Roles" % num_roles_deleted)
- click.echo("Deleted %d Users" % num_users_deleted)
- @as_transaction
- def depopulate_measurements(
- db: SQLAlchemy,
- sensor_id: id | None = None,
- ):
- click.echo("Deleting (time series) data from the database %s ..." % db.engine)
- query = delete(TimedBelief).filter(TimedBelief.belief_horizon <= timedelta(hours=0))
- if sensor_id is not None:
- query = query.filter(TimedBelief.sensor_id == sensor_id)
- deletion_result = db.session.execute(query)
- num_measurements_deleted = deletion_result.rowcount
- click.echo("Deleted %d measurements (ex-post beliefs)" % num_measurements_deleted)
- @as_transaction
- def depopulate_prognoses(
- db: SQLAlchemy,
- sensor_id: id | None = None,
- ):
- """
- Delete all prognosis data (with an horizon > 0).
- This affects forecasts as well as schedules.
- Pass a sensor ID to restrict to data on one sensor only.
- If no sensor is specified, this function also deletes forecasting and scheduling jobs.
- (Doing this only for jobs which forecast/schedule one sensor is not implemented and also tricky.)
- """
- click.echo(
- "Deleting (time series) forecasts and schedules data from the database %s ..."
- % db.engine
- )
- if not sensor_id:
- num_forecasting_jobs_deleted = app.queues["forecasting"].empty()
- num_scheduling_jobs_deleted = app.queues["scheduling"].empty()
- # Clear all forecasts (data with positive horizon)
- query = delete(TimedBelief).filter(TimedBelief.belief_horizon > timedelta(hours=0))
- if sensor_id is not None:
- query = query.filter(TimedBelief.sensor_id == sensor_id)
- deletion_result = db.session.execute(query)
- num_forecasts_deleted = deletion_result.rowcount
- if not sensor_id:
- click.echo("Deleted %d Forecast Jobs" % num_forecasting_jobs_deleted)
- click.echo("Deleted %d Schedule Jobs" % num_scheduling_jobs_deleted)
- click.echo("Deleted %d forecasts (ex-ante beliefs)" % num_forecasts_deleted)
- def reset_db(db: SQLAlchemy):
- db.session.commit() # close any existing sessions
- click.echo("Dropping everything in %s ..." % db.engine)
- db.reflect() # see http://jrheard.tumblr.com/post/12759432733/dropping-all-tables-on-postgres-using
- db.drop_all()
- click.echo("Recreating everything ...")
- db.create_all()
- click.echo("Committing ...")
- db.session.commit()
- def save_tables(
- db: SQLAlchemy,
- backup_name: str = "",
- structure: bool = True,
- data: bool = False,
- backup_path: str = BACKUP_PATH,
- ):
- # Make a new folder for the backup
- backup_folder = Path("%s/%s" % (backup_path, backup_name))
- try:
- backup_folder.mkdir(parents=True, exist_ok=False)
- except FileExistsError:
- click.echo(
- "Can't save backup, because directory %s/%s already exists."
- % (backup_path, backup_name)
- )
- return
- affected_classes = get_affected_classes(structure, data)
- c = None
- try:
- for c in affected_classes:
- file_path = "%s/%s/%s.obj" % (backup_path, backup_name, c.__tablename__)
- with open(file_path, "xb") as file_handler:
- file_handler.write(dumps(db.session.scalars(select(c))).all())
- click.echo("Successfully saved %s/%s." % (backup_name, c.__tablename__))
- except SQLAlchemyError as e:
- click.echo(
- "Can't save table %s because of the following error:\n\n\t%s\n\nCleaning up..."
- % (c.__tablename__, e)
- )
- rmtree(backup_folder)
- click.echo("Removed directory %s/%s." % (backup_path, backup_name))
- @as_transaction
- def load_tables(
- db: SQLAlchemy,
- backup_name: str = "",
- structure: bool = True,
- data: bool = False,
- backup_path: str = BACKUP_PATH,
- ):
- if (
- Path("%s/%s" % (backup_path, backup_name)).exists()
- and Path("%s/%s" % (backup_path, backup_name)).is_dir()
- ):
- affected_classes = get_affected_classes(structure, data)
- statement = "SELECT sequence_name from information_schema.sequences;"
- data = db.session.execute(statement).fetchall()
- sequence_names = [s.sequence_name for s in data]
- for c in affected_classes:
- file_path = "%s/%s/%s.obj" % (backup_path, backup_name, c.__tablename__)
- sequence_name = "%s_id_seq" % c.__tablename__
- try:
- with open(file_path, "rb") as file_handler:
- for row in loads(file_handler.read()):
- db.session.merge(row)
- if sequence_name in sequence_names:
- # Get max id
- max_id = db.session.execute(
- select(func.max(c.id)).select_from(c)
- ).scalar_one_or_none()
- max_id = 1 if max_id is None else max_id
- # Set table seq to max id
- db.engine.execute(
- "SELECT setval('%s', %s, true);" % (sequence_name, max_id)
- )
- click.echo(
- "Successfully loaded %s/%s." % (backup_name, c.__tablename__)
- )
- except FileNotFoundError:
- click.echo(
- "Can't load table, because filename %s does not exist."
- % c.__tablename__
- )
- else:
- click.echo(
- "Can't load backup, because directory %s/%s does not exist."
- % (backup_path, backup_name)
- )
- def get_affected_classes(structure: bool = True, data: bool = False) -> list:
- affected_classes = []
- if structure:
- affected_classes += [
- Role,
- User,
- RolesUsers,
- Sensor,
- GenericAssetType,
- GenericAsset,
- DataSource,
- ]
- if data:
- affected_classes += [TimedBelief]
- return affected_classes
|