job_cache.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. """
  2. Logic around storing and retrieving jobs from redis cache.
  3. """
  4. from __future__ import annotations
  5. import redis
  6. from redis.exceptions import ConnectionError
  7. from rq.job import Job, NoSuchJobError
  8. class NoRedisConfigured(Exception):
  9. def __init__(self, message="Redis not configured"):
  10. super().__init__(message)
  11. class JobCache:
  12. """
  13. Class is used for storing jobs and retrieving them from redis cache.
  14. Need it to be able to get jobs for particular asset (and display them on status page).
  15. Keeps cache up to date by removing jobs that are not found in redis - were removed by TTL.
  16. Stores jobs by asset or sensor id, queue and asset or sensor type, cache key can look like this
  17. - forecasting:sensor:1 (forecasting jobs can be stored by sensor only)
  18. - scheduling:sensor:2
  19. - scheduling:asset:3
  20. """
  21. def __init__(self, connection: redis.Redis):
  22. self.connection = connection
  23. def _get_cache_key(
  24. self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str
  25. ) -> str:
  26. return f"{queue}:{asset_or_sensor_type}:{asset_or_sensor_id}"
  27. def _check_redis_connection(self):
  28. try:
  29. self.connection.ping() # Check if the Redis connection is okay
  30. except (ConnectionError, ConnectionRefusedError):
  31. raise NoRedisConfigured
  32. def add(
  33. self,
  34. asset_or_sensor_id: int,
  35. job_id: str,
  36. queue: str = None,
  37. asset_or_sensor_type: str = None,
  38. ):
  39. self._check_redis_connection()
  40. cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type)
  41. self.connection.sadd(cache_key, job_id)
  42. def _get_job(self, job_id: str) -> Job:
  43. try:
  44. job = Job.fetch(job_id, connection=self.connection)
  45. except NoSuchJobError:
  46. return None
  47. return job
  48. def get(
  49. self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str
  50. ) -> list[Job]:
  51. self._check_redis_connection()
  52. job_ids_to_remove, jobs = list(), list()
  53. cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type)
  54. for job_id in self.connection.smembers(cache_key):
  55. job_id = job_id.decode("utf-8")
  56. job = self._get_job(job_id)
  57. # remove job from cache if cant be found - was removed by TTL
  58. if job is None:
  59. job_ids_to_remove.append(job_id)
  60. continue
  61. jobs.append(job)
  62. if job_ids_to_remove:
  63. self.connection.srem(cache_key, *job_ids_to_remove)
  64. return jobs