123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771 |
- """
- CLI commands for listing database contents and classes
- """
- from __future__ import annotations
- from datetime import datetime, timedelta
- import click
- from flask import current_app as app
- from flask.cli import with_appcontext
- from tabulate import tabulate
- from humanize import naturaldelta, naturaltime
- import pandas as pd
- import uniplot
- import vl_convert as vlc
- from string import Template
- import pytz
- import json
- from sqlalchemy import select, func
- from flexmeasures.data import db
- from flexmeasures.data.models.user import Account, AccountRole, User, Role
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
- from flexmeasures.data.models.time_series import Sensor, TimedBelief
- from flexmeasures.data.schemas.generic_assets import (
- GenericAssetIdField,
- SensorsToShowSchema,
- )
- from flexmeasures.data.schemas.sensors import SensorIdField
- from flexmeasures.data.schemas.account import AccountIdField
- from flexmeasures.data.schemas.sources import DataSourceIdField
- from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField
- from flexmeasures.data.services.time_series import simplify_index
- from flexmeasures.utils.time_utils import determine_minimum_resampling_resolution
- from flexmeasures.cli.utils import MsgStyle, validate_unique
- from flexmeasures.utils.coding_utils import delete_key_recursive
- from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list
- from flexmeasures.cli.utils import (
- DeprecatedOptionsCommand,
- DeprecatedOption,
- get_sensor_aliases,
- )
- @click.group("show")
- def fm_show_data():
- """FlexMeasures: Show data."""
- @fm_show_data.command("accounts")
- @with_appcontext
- def list_accounts():
- """
- List all accounts on this FlexMeasures instance.
- """
- accounts = db.session.scalars(select(Account).order_by(Account.name)).all()
- if not accounts:
- click.secho("No accounts created yet.", **MsgStyle.WARN)
- raise click.Abort()
- click.echo("All accounts on this FlexMeasures instance:\n ")
- account_data = [
- (
- account.id,
- account.name,
- db.session.scalar(
- select(func.count())
- .select_from(GenericAsset)
- .filter_by(account_id=account.id)
- ),
- )
- for account in accounts
- ]
- click.echo(tabulate(account_data, headers=["ID", "Name", "Assets"]))
- @fm_show_data.command("roles")
- @with_appcontext
- def list_roles():
- """
- Show available account and user roles
- """
- account_roles = db.session.scalars(
- select(AccountRole).order_by(AccountRole.name)
- ).all()
- if not account_roles:
- click.secho("No account roles created yet.", **MsgStyle.WARN)
- raise click.Abort()
- click.echo("Account roles:\n")
- click.echo(
- tabulate(
- [(r.id, r.name, r.description) for r in account_roles],
- headers=["ID", "Name", "Description"],
- )
- )
- click.echo()
- user_roles = db.session.scalars(select(Role).order_by(Role.name)).all()
- if not user_roles:
- click.secho("No user roles created yet, not even admin.", **MsgStyle.WARN)
- raise click.Abort()
- click.echo("User roles:\n")
- click.echo(
- tabulate(
- [(r.id, r.name, r.description) for r in user_roles],
- headers=["ID", "Name", "Description"],
- )
- )
- @fm_show_data.command("account")
- @with_appcontext
- @click.option("--id", "account", type=AccountIdField(), required=True)
- def show_account(account):
- """
- Show information about an account, including users and assets.
- """
- click.echo(f"========{len(account.name) * '='}========")
- click.echo(f"Account {account.name} (ID: {account.id})")
- click.echo(f"========{len(account.name) * '='}========\n")
- if account.account_roles:
- click.echo(
- f"Account role(s): {','.join([role.name for role in account.account_roles])}"
- )
- else:
- click.secho("Account has no roles.", **MsgStyle.WARN)
- click.echo()
- users = db.session.scalars(
- select(User).filter_by(account_id=account.id).order_by(User.username)
- ).all()
- if not users:
- click.secho("No users in account ...", **MsgStyle.WARN)
- else:
- click.echo("All users:\n ")
- user_data = [
- (
- user.id,
- user.username,
- user.email,
- naturaltime(user.last_login_at),
- naturaltime(user.last_seen_at),
- ",".join([role.name for role in user.roles]),
- )
- for user in users
- ]
- click.echo(
- tabulate(
- user_data,
- headers=["ID", "Name", "Email", "Last Login", "Last Seen", "Roles"],
- )
- )
- click.echo()
- assets = db.session.scalars(
- select(GenericAsset)
- .filter_by(account_id=account.id)
- .order_by(GenericAsset.name)
- ).all()
- if not assets:
- click.secho("No assets in account ...", **MsgStyle.WARN)
- else:
- click.echo("All assets:\n ")
- asset_data = [
- (asset.id, asset.name, asset.generic_asset_type.name, asset.location)
- for asset in assets
- ]
- click.echo(tabulate(asset_data, headers=["ID", "Name", "Type", "Location"]))
- @fm_show_data.command("asset-types")
- @with_appcontext
- def list_asset_types():
- """
- Show available asset types
- """
- asset_types = db.session.scalars(
- select(GenericAssetType).order_by(GenericAssetType.name)
- ).all()
- if not asset_types:
- click.secho("No asset types created yet.", **MsgStyle.WARN)
- raise click.Abort()
- click.echo(
- tabulate(
- [(t.id, t.name, t.description) for t in asset_types],
- headers=["ID", "Name", "Description"],
- )
- )
- @fm_show_data.command("asset")
- @with_appcontext
- @click.option("--id", "asset", type=GenericAssetIdField(), required=True)
- def show_generic_asset(asset):
- """
- Show asset info and list sensors
- """
- separator_num = 18 if asset.parent_asset is not None else 8
- click.echo(f"======{len(asset.name) * '='}{separator_num * '='}")
- click.echo(f"Asset {asset.name} (ID: {asset.id})")
- if asset.parent_asset is not None:
- click.echo(
- f"Child of asset {asset.parent_asset.name} (ID: {asset.parent_asset.id})"
- )
- click.echo(f"======{len(asset.name) * '='}{separator_num * '='}\n")
- standardized_sensors_to_show = SensorsToShowSchema().deserialize(
- asset.sensors_to_show
- )
- asset_data = [
- (
- asset.generic_asset_type.name,
- asset.location,
- "".join([f"{k}: {v}\n" for k, v in asset.flex_context.items()]),
- "".join(
- [
- f"{graph['title']}: {graph['sensors']} \n"
- for graph in standardized_sensors_to_show
- ]
- ),
- "".join([f"{k}: {v}\n" for k, v in asset.attributes.items()]),
- )
- ]
- click.echo(
- tabulate(
- asset_data,
- headers=[
- "Type",
- "Location",
- "Flex-Context",
- "Sensors to show",
- "Attributes",
- ],
- )
- )
- child_asset_data = [
- (
- child.id,
- child.name,
- child.generic_asset_type.name,
- )
- for child in asset.child_assets
- ]
- click.echo()
- click.echo(f"======{len(asset.name) * '='}===================")
- click.echo(f"Child assets of {asset.name} (ID: {asset.id})")
- click.echo(f"======{len(asset.name) * '='}===================\n")
- if child_asset_data:
- click.echo(tabulate(child_asset_data, headers=["Id", "Name", "Type"]))
- else:
- click.secho("No children assets ...", **MsgStyle.WARN)
- click.echo()
- sensors = db.session.scalars(
- select(Sensor).filter_by(generic_asset_id=asset.id).order_by(Sensor.name)
- ).all()
- if not sensors:
- click.secho("No sensors in asset ...", **MsgStyle.WARN)
- raise click.Abort()
- click.echo("All sensors in asset:\n ")
- sensor_data = [
- (
- sensor.id,
- sensor.name,
- sensor.unit,
- naturaldelta(sensor.event_resolution),
- sensor.timezone,
- "".join([f"{k}: {v}\n" for k, v in sensor.attributes.items()]),
- )
- for sensor in sensors
- ]
- click.echo(
- tabulate(
- sensor_data,
- headers=["ID", "Name", "Unit", "Resolution", "Timezone", "Attributes"],
- )
- )
- @fm_show_data.command("data-sources")
- @with_appcontext
- @click.option(
- "--id",
- "source",
- required=False,
- type=DataSourceIdField(),
- help="ID of data source.",
- )
- @click.option(
- "--show-attributes",
- "show_attributes",
- type=bool,
- help="Whether to show the attributes of the DataSource.",
- is_flag=True,
- )
- def list_data_sources(source: DataSource | None = None, show_attributes: bool = False):
- """
- Show available data sources
- """
- if source is None:
- sources = db.session.scalars(
- select(DataSource)
- .order_by(DataSource.type)
- .order_by(DataSource.name)
- .order_by(DataSource.model)
- .order_by(DataSource.version)
- ).all()
- else:
- sources = [source]
- if not sources:
- click.secho("No data sources created yet.", **MsgStyle.WARN)
- raise click.Abort()
- headers = ["ID", "Name", "User ID", "Model", "Version"]
- if show_attributes:
- headers.append("Attributes")
- rows = dict()
- for source in sources:
- row = [
- source.id,
- source.name,
- source.user_id,
- source.model,
- source.version,
- ]
- if show_attributes:
- row.append(json.dumps(source.attributes, indent=4))
- if source.type not in rows:
- rows[source.type] = [row]
- else:
- rows[source.type].append(row)
- for ds_type, row in rows.items():
- click.echo(f"type: {ds_type}")
- click.echo("=" * len(ds_type))
- click.echo(tabulate(row, headers=headers))
- click.echo("\n")
- @fm_show_data.command("chart")
- @with_appcontext
- @click.option(
- "--sensor",
- "sensors",
- required=False,
- multiple=True,
- type=SensorIdField(),
- help="ID of sensor(s). This argument can be given multiple times.",
- )
- @click.option(
- "--asset",
- "assets",
- required=False,
- multiple=True,
- type=GenericAssetIdField(),
- help="ID of asset(s). This argument can be given multiple times.",
- )
- @click.option(
- "--start",
- "start",
- type=AwareDateTimeField(),
- required=True,
- help="Plot starting at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
- )
- @click.option(
- "--end",
- "end",
- type=AwareDateTimeField(),
- required=True,
- help="Plot ending at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
- )
- @click.option(
- "--belief-time",
- "belief_time",
- type=AwareDateTimeField(),
- required=False,
- help="Time at which beliefs had been known. Follow up with a timezone-aware datetime in ISO 6801 format.",
- )
- @click.option(
- "--height",
- "height",
- required=False,
- type=int,
- default=200,
- help="Height of the image in pixels..",
- )
- @click.option(
- "--width",
- "width",
- required=False,
- type=int,
- default=500,
- help="Width of the image in pixels.",
- )
- @click.option(
- "--filename",
- "filename_template",
- required=False,
- type=str,
- default="chart-$now.png",
- help="Format of the output file. Use dollar sign ($) to interpolate values among the following ones:"
- " now (current time), id (id of the sensor or asset), entity_type (either 'asset' or 'sensor')"
- " Example: 'result_file_$entity_type_$id_$now.csv' -> 'result_file_asset_1_2023-08-24T14:47:08' ",
- )
- @click.option(
- "--resolution",
- "resolution",
- type=DurationField(),
- required=False,
- help="Resolution of the data in ISO 8601 format. If not set, defaults to the minimum resolution of the sensor data. Note: Nominal durations like 'P1D' are converted to absolute timedeltas.",
- )
- def chart(
- sensors: list[Sensor] | None = None,
- assets: list[GenericAsset] | None = None,
- start: datetime | None = None,
- end: datetime | None = None,
- belief_time: datetime | None = None,
- height: int | None = None,
- width: int | None = None,
- filename_template: str | None = None,
- resolution: timedelta | None = None,
- ):
- """
- Export sensor or asset charts in PNG or SVG formats. For example:
- flexmeasures show chart --start 2023-08-15T00:00:00+02:00 --end 2023-08-16T00:00:00+02:00 --asset 1 --sensor 3 --resolution P1D
- """
- datetime_format = "%Y-%m-%dT%H:%M:%S"
- if sensors is None and assets is None:
- click.secho(
- "No sensor or asset IDs provided. Please, try passing them using the options `--asset` or `--sensor`.",
- **MsgStyle.ERROR,
- )
- raise click.Abort()
- if sensors is None:
- sensors = []
- if assets is None:
- assets = []
- for entity in sensors + assets:
- entity_type = "sensor"
- if isinstance(entity, GenericAsset):
- entity_type = "asset"
- timezone = app.config["FLEXMEASURES_TIMEZONE"]
- now = pytz.timezone(zone=timezone).localize(datetime.now())
- belief_time_str = ""
- if belief_time is not None:
- belief_time_str = belief_time.strftime(datetime_format)
- template = Template(str(filename_template))
- filename = template.safe_substitute(
- id=entity.id,
- entity_type=entity_type,
- now=now.strftime(datetime_format),
- start=start.strftime(datetime_format),
- end=end.strftime(datetime_format),
- belief_time=belief_time_str,
- )
- click.echo(f"Generating a chart for `{entity}`...")
- # need to fetch the entities as they get detached
- # and we get the (in)famous detached instance error.
- if entity_type == "asset":
- entity = db.session.get(GenericAsset, entity.id)
- else:
- entity = db.session.get(Sensor, entity.id)
- chart_description = entity.chart(
- event_starts_after=start,
- event_ends_before=end,
- beliefs_before=belief_time,
- include_data=True,
- resolution=resolution,
- )
- # remove formatType as it relies on a custom JavaScript function
- chart_description = delete_key_recursive(chart_description, "formatType")
- # set width and height
- chart_description["width"] = width
- chart_description["height"] = height
- png_data = vlc.vegalite_to_png(vl_spec=chart_description, scale=2)
- with open(filename, "wb") as f:
- f.write(png_data)
- click.secho(
- f"Chart for `{entity}` has been saved successfully as `{filename}`.",
- **MsgStyle.SUCCESS,
- )
- @fm_show_data.command("beliefs", cls=DeprecatedOptionsCommand)
- @with_appcontext
- @click.option(
- "--sensor",
- "--sensor-id",
- "sensors",
- required=True,
- multiple=True,
- callback=validate_unique,
- type=SensorIdField(),
- cls=DeprecatedOption,
- preferred="--sensor",
- deprecated=["--sensor-id"],
- help="ID of sensor(s). This argument can be given multiple times.",
- )
- @click.option(
- "--start",
- "start",
- type=AwareDateTimeField(),
- required=True,
- help="Plot starting at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
- )
- @click.option(
- "--duration",
- "duration",
- type=DurationField(),
- required=True,
- help="Duration of the plot, after --start. Follow up with a duration in ISO 6801 format, e.g. PT1H (1 hour) or PT45M (45 minutes).",
- )
- @click.option(
- "--belief-time-before",
- "belief_time_before",
- type=AwareDateTimeField(),
- required=False,
- help="Time at which beliefs had been known. Follow up with a timezone-aware datetime in ISO 6801 format.",
- )
- @click.option(
- "--source",
- "--source-id",
- "source",
- required=False,
- type=DataSourceIdField(),
- cls=DeprecatedOption,
- preferred="--source",
- deprecated=["--source-id"],
- help="Source of the beliefs (an existing source id).",
- )
- @click.option(
- "--source-type",
- "source_types",
- required=False,
- type=str,
- help="Only show beliefs from this type of source, for example, 'user', 'forecaster' or 'scheduler'.",
- )
- @click.option(
- "--resolution",
- "resolution",
- type=DurationField(),
- required=False,
- help="Resolution of the data. If not set, defaults to the minimum resolution of the sensor data.",
- )
- @click.option(
- "--timezone",
- "timezone",
- type=str,
- required=False,
- help="Timezone of the data. If not set, defaults to the timezone of the first non-empty sensor.",
- )
- @click.option(
- "--to-file",
- "filepath",
- required=False,
- type=str,
- help="Set a filepath to store the beliefs as a CSV file.",
- )
- @click.option(
- "--include-ids/--exclude-ids",
- "include_ids",
- default=False,
- type=bool,
- help="Include sensor IDs in the plot's legend labels and the file's column headers. "
- "NB non-unique sensor names will always show an ID.",
- )
- @click.option(
- "--reduced-paths/--full-paths",
- "reduce_paths",
- default=True,
- type=bool,
- help="Whether to include the full path to the asset that the sensor belongs to"
- "which shows any parent assets and their account, "
- "or a reduced version of the path, which shows as much detail as is needed to distinguish the sensors.",
- )
- def plot_beliefs(
- sensors: list[Sensor],
- start: datetime,
- duration: timedelta,
- resolution: timedelta | None,
- timezone: str | None,
- belief_time_before: datetime | None,
- source: DataSource | None,
- filepath: str | None,
- source_types: list[str] = None,
- include_ids: bool = False,
- reduce_paths: bool = True,
- ):
- """
- Show a simple plot of belief data directly in the terminal, and optionally, save the data to a CSV file.
- """
- sensors = list(sensors)
- if resolution is None:
- resolution = determine_minimum_resampling_resolution(
- [sensor.event_resolution for sensor in sensors]
- )
- # query data
- beliefs_by_sensor = TimedBelief.search(
- sensors=sensors,
- event_starts_after=start,
- event_ends_before=start + duration,
- beliefs_before=belief_time_before,
- source=source,
- source_types=source_types,
- one_deterministic_belief_per_event=True,
- resolution=resolution,
- sum_multiple=False,
- )
- # Only keep non-empty (and abort in case of no data)
- for s in sensors:
- if beliefs_by_sensor[s].empty:
- click.secho(f"No data found for sensor {s.id} ({s.name})", **MsgStyle.WARN)
- beliefs_by_sensor.pop(s)
- if len(beliefs_by_sensor) == 0:
- click.secho("No data found!", **MsgStyle.WARN)
- raise click.Abort()
- sensors = list(beliefs_by_sensor.keys())
- # Concatenate data
- df = pd.concat([simplify_index(df) for df in beliefs_by_sensor.values()], axis=1)
- # Find out whether the Y-axis should show a shared unit
- if all(sensor.unit == sensors[0].unit for sensor in sensors):
- shared_unit = sensors[0].unit
- else:
- shared_unit = ""
- click.secho(
- "The y-axis shows no unit, because the selected sensors do not share the same unit.",
- **MsgStyle.WARN,
- )
- # Decide whether to include sensor IDs
- if include_ids:
- df.columns = [f"{s.name} (ID {s.id})" for s in sensors]
- else:
- # In case of non-unique sensor names, show more of the sensor's ancestry
- duplicates = find_duplicates(sensors, "name")
- if duplicates:
- message = "The following sensor name"
- message += "s are " if len(duplicates) > 1 else " is "
- message += (
- f"duplicated: {join_words_into_a_list(duplicates)}. "
- f"To distinguish the sensors, their plot labels will include more parent assets and their account, as needed. "
- f"To show the full path for each sensor, use the --full-path flag. "
- f"Or to uniquely label them by their ID instead, use the --include-ids flag."
- )
- click.secho(message, **MsgStyle.WARN)
- sensor_aliases = get_sensor_aliases(sensors, reduce_paths=reduce_paths)
- df.columns = [sensor_aliases.get(s.id, s.name) for s in sensors]
- # Convert to the requested or default timezone
- if timezone is not None:
- timezone = sensors[0].timezone
- df.index = df.index.tz_convert(timezone)
- # Build title
- if len(sensors) == 1:
- title = f"Beliefs for Sensor '{sensors[0].name}' (ID {sensors[0].id}).\n"
- else:
- title = f"Beliefs for Sensors {join_words_into_a_list([s.name + ' (ID ' + str(s.id) + ')' for s in sensors])}.\n"
- title += f"Data spans {naturaldelta(duration)} and starts at {start}.\n"
- title += f"The time resolution (x-axis) is {naturaldelta(resolution)}.\n"
- if belief_time_before:
- title += f"\nOnly beliefs made before: {belief_time_before}."
- if source:
- title += f"\nSource: {source.description}"
- uniplot.plot(
- ys=[df[col] for col in df.columns],
- xs=[df.index for _ in df.columns],
- title=title,
- color=True,
- lines=True,
- y_unit=shared_unit,
- legend_labels=(
- df.columns if shared_unit else [f"{col} in {s.unit}" for col in df.columns]
- ),
- )
- if filepath is not None:
- df.columns = pd.MultiIndex.from_arrays(
- [df.columns, [df.sensor.unit for df in beliefs_by_sensor.values()]]
- )
- df.to_csv(filepath)
- click.secho("Data saved to file.", **MsgStyle.SUCCESS)
- def find_duplicates(_list: list, attr: str | None = None) -> list:
- """Find duplicates in a list, optionally based on a specified attribute.
- :param _list: The input list to search for duplicates.
- :param attr: The attribute name to consider when identifying duplicates.
- If None, the function will check for duplicates based on the elements themselves.
- :returns: A list containing the duplicate elements found in the input list.
- """
- if attr:
- _list = [getattr(item, attr) for item in _list]
- return [item for item in set(_list) if _list.count(item) > 1]
- def list_items(item_type):
- """
- Show available items of a specific type.
- """
- click.echo(f"{item_type.capitalize()}:\n")
- click.echo(
- tabulate(
- [
- (
- item_name,
- item_class.__version__,
- item_class.__author__,
- item_class.__module__,
- )
- for item_name, item_class in getattr(app, item_type).items()
- ],
- headers=["name", "version", "author", "module"],
- )
- )
- @fm_show_data.command("reporters")
- @with_appcontext
- def list_reporters():
- """
- Show available reporters.
- """
- with app.app_context():
- list_items("reporters")
- @fm_show_data.command("schedulers")
- @with_appcontext
- def list_schedulers():
- """
- Show available schedulers.
- """
- with app.app_context():
- list_items("schedulers")
- app.cli.add_command(fm_show_data)
|