test_scheduling_jobs_fresh_db.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from datetime import timedelta, datetime
  2. import pytz
  3. import pandas as pd
  4. from sqlalchemy import select
  5. from flexmeasures.data.models.data_sources import DataSource
  6. from flexmeasures.data.models.time_series import TimedBelief
  7. from flexmeasures.data.services.scheduling import create_scheduling_job
  8. from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
  9. def test_scheduling_a_charging_station(
  10. db, app, add_charging_station_assets, setup_test_data
  11. ):
  12. """Test one clean run of one scheduling job:
  13. - data source was made,
  14. - schedule has been made
  15. Starting with a state of charge 1 kWh, within 2 hours we should be able to reach 5 kWh.
  16. """
  17. soc_at_start = 1
  18. target_soc = 5
  19. duration_until_target = timedelta(hours=2)
  20. charging_station = add_charging_station_assets["Test charging station"].sensors[0]
  21. tz = pytz.timezone("Europe/Amsterdam")
  22. start = tz.localize(datetime(2015, 1, 2))
  23. end = tz.localize(datetime(2015, 1, 3))
  24. target_datetime = start + duration_until_target
  25. resolution = timedelta(minutes=15)
  26. soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]
  27. assert (
  28. db.session.execute(
  29. select(DataSource).filter_by(name="Seita", type="scheduler")
  30. ).scalar_one_or_none()
  31. is None
  32. ) # Make sure the scheduler data source isn't there
  33. job = create_scheduling_job(
  34. asset_or_sensor=charging_station,
  35. start=start,
  36. end=end,
  37. belief_time=start,
  38. resolution=resolution,
  39. flex_model={
  40. "soc-at-start": soc_at_start,
  41. "soc-targets": soc_targets,
  42. "roundtrip-efficiency": "100%",
  43. "storage-efficiency": 1,
  44. },
  45. )
  46. print("Job: %s" % job.id)
  47. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  48. scheduler_source = db.session.execute(
  49. select(DataSource).filter_by(name="Seita", type="scheduler")
  50. ).scalar_one_or_none()
  51. assert (
  52. scheduler_source is not None
  53. ) # Make sure the scheduler data source is now there
  54. power_values = db.session.scalars(
  55. select(TimedBelief)
  56. .filter(TimedBelief.sensor_id == charging_station.id)
  57. .filter(TimedBelief.source_id == scheduler_source.id)
  58. ).all()
  59. consumption_schedule = pd.Series(
  60. [-v.event_value for v in power_values],
  61. index=pd.DatetimeIndex([v.event_start for v in power_values]),
  62. ) # For consumption schedules, positive values denote consumption. For the db, consumption is negative
  63. assert len(consumption_schedule) == 96
  64. print(consumption_schedule.head(12))
  65. assert (
  66. consumption_schedule.head(8).sum() * (resolution / timedelta(hours=1)) == 4.0
  67. ) # The first 2 hours should consume 4 kWh to charge from 1 to 5 kWh