testing.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # flake8: noqa: E402
  2. from __future__ import annotations
  3. from datetime import datetime, timedelta
  4. import os
  5. from flask import current_app as app
  6. import click
  7. from timetomodel import ModelState, create_fitted_model, evaluate_models
  8. if os.name == "nt":
  9. from rq_win import WindowsWorker as Worker
  10. else:
  11. from rq import Worker
  12. from flexmeasures.data.models.forecasting import lookup_model_specs_configurator
  13. from flexmeasures.data.models.time_series import TimedBelief
  14. from flexmeasures.data.queries.sensors import (
  15. query_sensor_by_name_and_generic_asset_type_name,
  16. )
  17. from flexmeasures.utils.time_utils import as_server_time
  18. from flexmeasures.data.services.forecasting import (
  19. create_forecasting_jobs,
  20. handle_forecasting_exception,
  21. )
  22. """
  23. These functions are meant for FlexMeasures developers to manually test some internal
  24. functionality.
  25. They are not registered as app command per default, as we don't need to show them to users.
  26. """
  27. # un-comment to use as CLI function
  28. # @app.cli.command()
  29. def test_making_forecasts():
  30. """
  31. Manual test to enqueue and process a forecasting job via redis queue
  32. """
  33. click.echo("Manual forecasting job queuing started ...")
  34. sensor_id = 1
  35. forecast_filter = (
  36. TimedBelief.query.filter(TimedBelief.sensor_id == sensor_id)
  37. .filter(TimedBelief.belief_horizon == timedelta(hours=6))
  38. .filter(
  39. (TimedBelief.event_start >= as_server_time(datetime(2015, 4, 1, 6)))
  40. & (TimedBelief.event_start < as_server_time(datetime(2015, 4, 3, 6)))
  41. )
  42. )
  43. click.echo("Delete forecasts ...")
  44. forecast_filter.delete()
  45. click.echo("Forecasts found before : %d" % forecast_filter.count())
  46. create_forecasting_jobs(
  47. sensor_id=sensor_id,
  48. horizons=[timedelta(hours=6)],
  49. start_of_roll=as_server_time(datetime(2015, 4, 1)),
  50. end_of_roll=as_server_time(datetime(2015, 4, 3)),
  51. )
  52. click.echo("Queue before working: %s" % app.queues["forecasting"].jobs)
  53. worker = Worker(
  54. [app.queues["forecasting"]],
  55. connection=app.queues["forecasting"].connection,
  56. name="Test CLI Forecaster",
  57. exception_handlers=[handle_forecasting_exception],
  58. )
  59. worker.work()
  60. click.echo("Queue after working: %s" % app.queues["forecasting"].jobs)
  61. click.echo(
  62. "Forecasts found after (should be 24 * 2 * 4 = 192): %d"
  63. % forecast_filter.count()
  64. )
  65. # un-comment to use as CLI function
  66. # @app.cli.command()
  67. @click.option(
  68. "--asset-type",
  69. "generic_asset_type_names",
  70. multiple=True,
  71. required=True,
  72. help="Name of generic asset type.",
  73. )
  74. @click.option("--sensor", "sensor_name", help="Name of sensor.")
  75. @click.option(
  76. "--from_date",
  77. default="2015-03-10",
  78. help="Forecast from date. Follow up with a date in the form yyyy-mm-dd.",
  79. )
  80. @click.option("--period", default=3, help="Forecasting period in days.")
  81. @click.option(
  82. "--horizon", "horizon_hours", default=1, help="Forecasting horizon in hours."
  83. )
  84. @click.option(
  85. "--training", default=30, help="Number of days in the training and testing period."
  86. )
  87. def test_generic_model(
  88. generic_asset_type_names: list[str],
  89. sensor_name: str | None = None,
  90. from_date: str = "2015-03-10",
  91. period: int = 3,
  92. horizon_hours: int = 1,
  93. training: int = 30,
  94. ):
  95. """Manually test integration of timetomodel for our generic model."""
  96. start = as_server_time(datetime.strptime(from_date, "%Y-%m-%d"))
  97. end = start + timedelta(days=period)
  98. training_and_testing_period = timedelta(days=training)
  99. horizon = timedelta(hours=horizon_hours)
  100. with app.app_context():
  101. sensors = query_sensor_by_name_and_generic_asset_type_name(
  102. sensor_name=sensor_name,
  103. generic_asset_type_names=generic_asset_type_names,
  104. ).all()
  105. if len(sensors) == 0:
  106. click.echo("No such sensor in db, so I will not add any forecasts.")
  107. raise click.Abort()
  108. elif len(sensors) > 1:
  109. click.echo("No unique sensor found in db, so I will not add any forecasts.")
  110. raise click.Abort()
  111. linear_model_configurator = lookup_model_specs_configurator("linear")
  112. (
  113. model_specs,
  114. model_identifier,
  115. fallback_model_identifier,
  116. ) = linear_model_configurator(
  117. sensor=sensors[0],
  118. forecast_start=start,
  119. forecast_end=end,
  120. forecast_horizon=horizon,
  121. custom_model_params=dict(
  122. training_and_testing_period=training_and_testing_period
  123. ),
  124. )
  125. # Create and train the model
  126. model = create_fitted_model(model_specs, model_identifier)
  127. print("\n\nparams:\n%s\n\n" % model.params)
  128. evaluate_models(m1=ModelState(model, model_specs), plot_path=None)
  129. return ModelState(model, model_specs)