api_utils.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from __future__ import annotations
  2. from timely_beliefs.beliefs.classes import BeliefsDataFrame
  3. from typing import Sequence
  4. from datetime import timedelta
  5. from flask import current_app
  6. from numpy import array
  7. from psycopg2.errors import UniqueViolation
  8. from rq.job import Job
  9. from sqlalchemy.exc import IntegrityError
  10. from flexmeasures.data import db
  11. from flexmeasures.data.utils import save_to_db
  12. from flexmeasures.api.common.responses import (
  13. invalid_replacement,
  14. ResponseTuple,
  15. request_processed,
  16. already_received_and_successfully_processed,
  17. )
  18. from flexmeasures.utils.error_utils import error_handling_router
  19. def upsample_values(
  20. value_groups: list[list[float]] | list[float],
  21. from_resolution: timedelta,
  22. to_resolution: timedelta,
  23. ) -> list[list[float]] | list[float]:
  24. """Upsample the values (in value groups) to a smaller resolution.
  25. from_resolution has to be a multiple of to_resolution"""
  26. if from_resolution % to_resolution == timedelta(hours=0):
  27. n = from_resolution // to_resolution
  28. if isinstance(value_groups[0], list):
  29. value_groups = [
  30. list(array(value_group).repeat(n)) for value_group in value_groups
  31. ]
  32. else:
  33. value_groups = list(array(value_groups).repeat(n))
  34. return value_groups
  35. def unique_ever_seen(iterable: Sequence, selector: Sequence):
  36. """
  37. Return unique iterable elements with corresponding lists of selector elements, preserving order.
  38. >>> a, b = unique_ever_seen([[10, 20], [10, 20], [20, 40]], [1, 2, 3])
  39. >>> print(a)
  40. [[10, 20], [20, 40]]
  41. >>> print(b)
  42. [[1, 2], 3]
  43. """
  44. u = []
  45. s = []
  46. for iterable_element, selector_element in zip(iterable, selector):
  47. if iterable_element not in u:
  48. u.append(iterable_element)
  49. s.append(selector_element)
  50. else:
  51. us = s[u.index(iterable_element)]
  52. if not isinstance(us, list):
  53. us = [us]
  54. us.append(selector_element)
  55. s[u.index(iterable_element)] = us
  56. return u, s
  57. def enqueue_forecasting_jobs(
  58. forecasting_jobs: list[Job] | None = None,
  59. ):
  60. """Enqueue forecasting jobs.
  61. :param forecasting_jobs: list of forecasting Jobs for redis queues.
  62. """
  63. if forecasting_jobs is not None:
  64. [current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]
  65. def save_and_enqueue(
  66. data: BeliefsDataFrame | list[BeliefsDataFrame],
  67. forecasting_jobs: list[Job] | None = None,
  68. save_changed_beliefs_only: bool = True,
  69. ) -> ResponseTuple:
  70. # Attempt to save
  71. status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only)
  72. db.session.commit()
  73. # Only enqueue forecasting jobs upon successfully saving new data
  74. if status[:7] == "success" and status != "success_but_nothing_new":
  75. enqueue_forecasting_jobs(forecasting_jobs)
  76. # Pick a response
  77. if status == "success":
  78. return request_processed()
  79. elif status in (
  80. "success_with_unchanged_beliefs_skipped",
  81. "success_but_nothing_new",
  82. ):
  83. return already_received_and_successfully_processed()
  84. return invalid_replacement()
  85. def catch_timed_belief_replacements(error: IntegrityError):
  86. """Catch IntegrityErrors due to a UniqueViolation on the TimedBelief primary key.
  87. Return a more informative message.
  88. """
  89. if isinstance(error.orig, UniqueViolation) and "timed_belief_pkey" in str(
  90. error.orig
  91. ):
  92. # Some beliefs represented replacements, which was forbidden
  93. return invalid_replacement()
  94. # Forward to our generic error handler
  95. return error_handling_router(error)