test_scheduling_jobs.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. from datetime import datetime, timedelta
  2. import os
  3. import inspect
  4. import numpy as np
  5. import pandas as pd
  6. import pytz
  7. import pytest
  8. from rq.job import Job
  9. from sqlalchemy import select
  10. from flexmeasures.data.models.planning import Scheduler
  11. from flexmeasures.data.models.planning.exceptions import InfeasibleProblemException
  12. from flexmeasures.data.models.planning.utils import initialize_series
  13. from flexmeasures.data.models.data_sources import DataSource
  14. from flexmeasures.data.models.time_series import TimedBelief
  15. from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
  16. from flexmeasures.data.services.scheduling import (
  17. create_scheduling_job,
  18. load_custom_scheduler,
  19. handle_scheduling_exception,
  20. )
  21. from flexmeasures.utils.unit_utils import ur
  22. from flexmeasures.utils.calculations import integrate_time_series
  23. def test_scheduling_a_battery(
  24. fresh_db,
  25. app,
  26. add_battery_assets_fresh_db,
  27. setup_fresh_test_data,
  28. add_market_prices_fresh_db,
  29. ):
  30. """Test one clean run of one scheduling job:
  31. - data source was made,
  32. - schedule has been made
  33. """
  34. battery = add_battery_assets_fresh_db["Test battery"].sensors[0]
  35. tz = pytz.timezone("Europe/Amsterdam")
  36. start = tz.localize(datetime(2015, 1, 2))
  37. end = tz.localize(datetime(2015, 1, 3))
  38. resolution = timedelta(minutes=15)
  39. assert (
  40. fresh_db.session.execute(
  41. select(DataSource).filter_by(name="FlexMeasures", type="scheduler")
  42. ).scalar_one_or_none()
  43. is None
  44. ) # Make sure the scheduler data source isn't there
  45. job = create_scheduling_job(
  46. asset_or_sensor=battery,
  47. start=start,
  48. end=end,
  49. belief_time=start,
  50. resolution=resolution,
  51. flex_model={
  52. "roundtrip-efficiency": "98%",
  53. "storage-efficiency": 0.999,
  54. },
  55. )
  56. print("Job: %s" % job.id)
  57. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  58. scheduler_source = fresh_db.session.execute(
  59. select(DataSource).filter_by(name="Seita", type="scheduler")
  60. ).scalar_one_or_none()
  61. assert (
  62. scheduler_source is not None
  63. ) # Make sure the scheduler data source is now there
  64. power_values = fresh_db.session.scalars(
  65. select(TimedBelief)
  66. .filter(TimedBelief.sensor_id == battery.id)
  67. .filter(TimedBelief.source_id == scheduler_source.id)
  68. ).all()
  69. print([v.event_value for v in power_values])
  70. assert len(power_values) == 96
  71. assert (
  72. sum(v.event_value for v in power_values) < -0.5
  73. ), "some cycling should have occurred to make a profit, resulting in overall consumption due to losses"
  74. scheduler_specs = {
  75. "module": None, # use make_module_descr, see below
  76. "class": "DummyScheduler",
  77. }
  78. def make_module_descr(is_path):
  79. if is_path:
  80. path_to_here = os.path.dirname(__file__)
  81. return os.path.join(path_to_here, "dummy_scheduler.py")
  82. else:
  83. return "flexmeasures.data.tests.dummy_scheduler"
  84. @pytest.mark.parametrize("is_path", [False, True])
  85. def test_loading_custom_scheduler(is_path: bool):
  86. """
  87. Simply check if loading a custom scheduler works.
  88. """
  89. scheduler_specs["module"] = make_module_descr(is_path)
  90. custom_scheduler = load_custom_scheduler(scheduler_specs)
  91. assert custom_scheduler.__name__ == "DummyScheduler"
  92. assert "Just a dummy scheduler" in custom_scheduler.compute.__doc__
  93. data_source_info = custom_scheduler.get_data_source_info()
  94. assert data_source_info["name"] == "Test Organization"
  95. assert data_source_info["version"] == "3"
  96. assert data_source_info["model"] == "DummyScheduler"
  97. @pytest.mark.parametrize("is_path", [False, True])
  98. def test_assigning_custom_scheduler(
  99. fresh_db, app, add_battery_assets_fresh_db, is_path: bool
  100. ):
  101. """
  102. Test if the custom scheduler is picked up when we assign it to a Sensor,
  103. and that its dummy values are saved.
  104. """
  105. scheduler_specs["module"] = make_module_descr(is_path)
  106. battery = add_battery_assets_fresh_db["Test battery"].sensors[0]
  107. battery.attributes["custom-scheduler"] = scheduler_specs
  108. tz = pytz.timezone("Europe/Amsterdam")
  109. start = tz.localize(datetime(2015, 1, 2))
  110. end = tz.localize(datetime(2015, 1, 3))
  111. resolution = timedelta(minutes=15)
  112. job = create_scheduling_job(
  113. asset_or_sensor=battery,
  114. start=start,
  115. end=end,
  116. belief_time=start,
  117. resolution=resolution,
  118. )
  119. print("Job: %s" % job.id)
  120. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  121. # make sure we saved the data source for later lookup
  122. redis_connection = app.queues["scheduling"].connection
  123. finished_job = Job.fetch(job.id, connection=redis_connection)
  124. assert finished_job.meta["data_source_info"]["model"] == scheduler_specs["class"]
  125. scheduler_source = fresh_db.session.execute(
  126. select(DataSource).filter_by(
  127. type="scheduler",
  128. **finished_job.meta["data_source_info"],
  129. )
  130. ).scalar_one_or_none()
  131. assert (
  132. scheduler_source is not None
  133. ) # Make sure the scheduler data source is now there
  134. power_values = fresh_db.session.scalars(
  135. select(TimedBelief)
  136. .filter(TimedBelief.sensor_id == battery.id)
  137. .filter(TimedBelief.source_id == scheduler_source.id)
  138. ).all()
  139. assert len(power_values) == 96
  140. # test for negative value as we schedule consumption
  141. capacity = battery.get_attribute(
  142. "capacity_in_mw",
  143. ur.Quantity(battery.get_attribute("site-power-capacity")).to("MW").magnitude,
  144. )
  145. assert all([v.event_value == -1 * capacity for v in power_values])
  146. def create_test_scheduler(name, compute_fails=False, fallback_class=None):
  147. def compute(self):
  148. """
  149. This function can be set to fail by using compute_fails=True
  150. """
  151. if compute_fails:
  152. raise InfeasibleProblemException()
  153. capacity = self.sensor.get_attribute(
  154. "capacity_in_mw",
  155. ur.Quantity(self.sensor.get_attribute("site-power-capacity"))
  156. .to("MW")
  157. .magnitude,
  158. )
  159. return initialize_series( # simply creates a Pandas Series repeating one value
  160. data=capacity,
  161. start=self.start,
  162. end=self.end,
  163. resolution=self.resolution,
  164. )
  165. def deserialize_config(self):
  166. """Do not care about any config sent in."""
  167. self.config_deserialized = True
  168. return type(
  169. name,
  170. (Scheduler,),
  171. {
  172. "__author__": "Seita",
  173. "__version__": "1",
  174. "compute": compute,
  175. "deserialize_config": deserialize_config,
  176. "fallback_scheduler_class": fallback_class,
  177. },
  178. )
  179. SuccessfulScheduler = create_test_scheduler("SuccessfulScheduler", compute_fails=False)
  180. FailingScheduler2 = create_test_scheduler(
  181. "FailingScheduler2", compute_fails=True, fallback_class=SuccessfulScheduler
  182. )
  183. FailingScheduler1 = create_test_scheduler(
  184. "FailingScheduler1", compute_fails=True, fallback_class=FailingScheduler2
  185. )
  186. def test_fallback_chain(
  187. fresh_db,
  188. app,
  189. add_battery_assets_fresh_db,
  190. ):
  191. """
  192. Check that the chaining fallback schedules works.
  193. FailingScheduler1 -> FailingScheduler2 -> SuccessfulScheduler
  194. """
  195. app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = True
  196. battery = add_battery_assets_fresh_db["Test battery"].sensors[0]
  197. fresh_db.session.flush()
  198. tz = pytz.timezone("Europe/Amsterdam")
  199. start = tz.localize(datetime(2015, 1, 2))
  200. end = tz.localize(datetime(2015, 1, 3))
  201. resolution = timedelta(minutes=15)
  202. scheduler_class = FailingScheduler1
  203. scheduler_specs = {
  204. "class": scheduler_class.__name__,
  205. "module": inspect.getmodule(scheduler_class).__name__,
  206. }
  207. job = create_scheduling_job(
  208. asset_or_sensor=battery,
  209. start=start,
  210. end=end,
  211. belief_time=start,
  212. resolution=resolution,
  213. scheduler_specs=scheduler_specs,
  214. )
  215. for scheduler_class in ["FailingScheduler1", "FailingScheduler2"]:
  216. assert len(app.queues["scheduling"]) == 1
  217. job = app.queues["scheduling"].jobs[0]
  218. work_on_rq(
  219. app.queues["scheduling"],
  220. exc_handler=exception_reporter,
  221. max_jobs=1,
  222. )
  223. job.refresh()
  224. assert job.kwargs["scheduler_specs"]["class"] == scheduler_class
  225. assert job.is_failed
  226. assert isinstance(job.meta["exception"], InfeasibleProblemException)
  227. success_job = app.queues["scheduling"].jobs[0]
  228. # check that success
  229. work_on_rq(
  230. app.queues["scheduling"],
  231. exc_handler=exception_reporter,
  232. max_jobs=1,
  233. )
  234. success_job.refresh()
  235. assert success_job.is_finished
  236. assert success_job.kwargs["scheduler_specs"]["class"] == "SuccessfulScheduler"
  237. assert len(app.queues["scheduling"]) == 0
  238. app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = False
  239. @pytest.mark.parametrize(
  240. "charging_eff, discharging_eff, storage_eff, expected_avg_power",
  241. [
  242. ("100%", "100%", "100%", 0.009),
  243. ("95%", "100%", "100%", 0.009 / 0.95),
  244. ("95%", "100%", "95%", 0.009 / 0.95),
  245. ("125%", "100%", "95%", 0.009 / 1.25),
  246. ],
  247. )
  248. def test_save_state_of_charge(
  249. fresh_db,
  250. app,
  251. smart_building,
  252. charging_eff,
  253. discharging_eff,
  254. storage_eff,
  255. expected_avg_power,
  256. ):
  257. """
  258. Test saving state of charge of a Heat Buffer with a constant SOC net usage of 9 kW (10kW usage and 1kW gain)
  259. """
  260. assets, sensors, soc_sensors = smart_building
  261. assert len(soc_sensors["Test Heat Buffer"].search_beliefs()) == 0
  262. queue = app.queues["scheduling"]
  263. start = pd.Timestamp("2015-01-03").tz_localize("Europe/Amsterdam")
  264. end = pd.Timestamp("2015-01-04").tz_localize("Europe/Amsterdam")
  265. scheduler_specs = {
  266. "module": "flexmeasures.data.models.planning.storage",
  267. "class": "StorageScheduler",
  268. }
  269. flex_model = {
  270. "power-capacity": "10kW",
  271. "soc-at-start": "0kWh",
  272. "soc-unit": "kWh",
  273. "soc-min": 0.0,
  274. "soc-max": "100kWh",
  275. "soc-usage": ["10kW"],
  276. "soc-gain": ["1kW"],
  277. "state-of-charge": {"sensor": soc_sensors["Test Heat Buffer"].id},
  278. "prefer-charging-sooner": True,
  279. "storage-efficiency": storage_eff,
  280. "charging-efficiency": charging_eff,
  281. "discharging-efficiency": discharging_eff,
  282. }
  283. flex_context = {
  284. "consumption-price": "100 EUR/MWh",
  285. "production-price": "0 EUR/MWh",
  286. "site-production-capacity": "1MW",
  287. "site-consumption-capacity": "1MW",
  288. }
  289. create_scheduling_job(
  290. asset_or_sensor=sensors["Test Heat Buffer"],
  291. scheduler_specs=scheduler_specs,
  292. flex_model=flex_model,
  293. flex_context=flex_context,
  294. enqueue=True,
  295. start=start,
  296. end=end,
  297. round_to_decimals=12,
  298. resolution=timedelta(minutes=15),
  299. )
  300. # Work on jobs
  301. work_on_rq(queue, handle_scheduling_exception)
  302. # Check that the SOC data is saved
  303. soc_schedule = (
  304. soc_sensors["Test Heat Buffer"]
  305. .search_beliefs(resolution=timedelta(0))
  306. .reset_index()
  307. )
  308. power_schedule = sensors["Test Heat Buffer"].search_beliefs().reset_index()
  309. power_schedule = pd.Series(
  310. power_schedule.event_value.tolist(),
  311. index=pd.DatetimeIndex(power_schedule.event_start.tolist(), freq="15min"),
  312. )
  313. assert np.isclose(
  314. -power_schedule.mean(), expected_avg_power
  315. ) # charge to cover for the net usage (in average)
  316. soc_schedule_from_power = integrate_time_series(
  317. -power_schedule,
  318. 0.0,
  319. decimal_precision=16,
  320. stock_delta=-0.009 * 0.25,
  321. up_efficiency=ur.Quantity(charging_eff).to("dimensionless").magnitude,
  322. down_efficiency=ur.Quantity(discharging_eff).to("dimensionless").magnitude,
  323. storage_efficiency=ur.Quantity(storage_eff).to("dimensionless").magnitude,
  324. )
  325. assert all(
  326. np.isclose(soc_schedule.event_value.values, soc_schedule_from_power.values)
  327. )