test_job_cache.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # flake8: noqa: E402
  2. from __future__ import annotations
  3. import pytest
  4. import pytz
  5. import unittest
  6. from datetime import datetime, timedelta
  7. from unittest.mock import MagicMock, patch
  8. from redis.exceptions import ConnectionError
  9. from rq.job import NoSuchJobError
  10. from flexmeasures.data.models.time_series import Sensor
  11. from flexmeasures.data.services.job_cache import JobCache, NoRedisConfigured
  12. from flexmeasures.data.services.forecasting import create_forecasting_jobs
  13. from flexmeasures.data.services.scheduling import create_scheduling_job
  14. from flexmeasures.utils.time_utils import as_server_time
  15. def custom_model_params():
  16. """little training as we have little data, turn off transformations until they let this test run (TODO)"""
  17. return dict(
  18. training_and_testing_period=timedelta(hours=2),
  19. outcome_var_transformation=None,
  20. regressor_transformation={},
  21. )
  22. def test_cache_on_create_forecasting_jobs(db, run_as_cli, app, setup_test_data):
  23. """Test we add job to cache on creating forecasting job + get job from cache"""
  24. wind_device_1: Sensor = setup_test_data["wind-asset-1"].sensors[0]
  25. job = create_forecasting_jobs(
  26. start_of_roll=as_server_time(datetime(2015, 1, 1, 6)),
  27. end_of_roll=as_server_time(datetime(2015, 1, 1, 7)),
  28. horizons=[timedelta(hours=1)],
  29. sensor_id=wind_device_1.id,
  30. custom_model_params=custom_model_params(),
  31. )
  32. assert app.job_cache.get(wind_device_1.id, "forecasting", "sensor") == [job[0]]
  33. def test_cache_on_create_scheduling_jobs(db, app, add_battery_assets, setup_test_data):
  34. """Test we add job to cache on creating scheduling job + get job from cache"""
  35. battery = add_battery_assets["Test battery"].sensors[0]
  36. tz = pytz.timezone("Europe/Amsterdam")
  37. start, end = tz.localize(datetime(2015, 1, 2)), tz.localize(datetime(2015, 1, 3))
  38. job = create_scheduling_job(
  39. asset_or_sensor=battery,
  40. start=start,
  41. end=end,
  42. belief_time=start,
  43. resolution=timedelta(minutes=15),
  44. )
  45. assert app.job_cache.get(battery.id, "scheduling", "sensor") == [job]
  46. class TestJobCache(unittest.TestCase):
  47. def setUp(self):
  48. self.connection = MagicMock(spec_set=["sadd", "smembers", "srem", "ping"])
  49. self.job_cache = JobCache(self.connection)
  50. self.cache_key = "forecasting:sensor:sensor_id"
  51. self.mock_redis_job = MagicMock(spec_set=["fetch"])
  52. def test_no_redis_configured(self):
  53. """Test raising NoRedisConfigured"""
  54. self.connection.ping.side_effect = ConnectionError
  55. with pytest.raises(NoRedisConfigured):
  56. self.job_cache.add(
  57. "sensor_id",
  58. "job_id",
  59. queue="forecasting",
  60. asset_or_sensor_type="sensor",
  61. )
  62. self.connection.sadd.assert_not_called()
  63. with pytest.raises(NoRedisConfigured):
  64. self.job_cache.get("sensor_id", "forecasting", "sensor")
  65. self.connection.smembers.assert_not_called()
  66. def test_add(self):
  67. """Test adding to cache"""
  68. self.job_cache.add(
  69. "sensor_id", "job_id", queue="forecasting", asset_or_sensor_type="sensor"
  70. )
  71. self.connection.sadd.assert_called_with(self.cache_key, "job_id")
  72. def test_get_empty_queue(self):
  73. """Test getting from cache with empty queue"""
  74. self.job_cache.add(
  75. "sensor_id", "job_id", queue="forecasting", asset_or_sensor_type="sensor"
  76. )
  77. self.connection.smembers.return_value = [b"job_id"]
  78. self.mock_redis_job.fetch.side_effect = NoSuchJobError
  79. with patch("flexmeasures.data.services.job_cache.Job", new=self.mock_redis_job):
  80. assert self.job_cache.get("sensor_id", "forecasting", "sensor") == []
  81. assert self.connection.srem.call_count == 1
  82. def test_get_non_empty_queue(self):
  83. """Test getting from cache with non empty forecasting queue"""
  84. self.job_cache.add(
  85. "sensor_id", "job_id", queue="forecasting", asset_or_sensor_type="sensor"
  86. )
  87. forecasting_job = MagicMock()
  88. self.connection.smembers.return_value = [b"job_id"]
  89. self.mock_redis_job.fetch.return_value = forecasting_job
  90. with patch("flexmeasures.data.services.job_cache.Job", new=self.mock_redis_job):
  91. assert self.job_cache.get("sensor_id", "forecasting", "sensor") == [
  92. forecasting_job
  93. ]
  94. assert self.connection.srem.call_count == 0