123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- from __future__ import annotations
- from timely_beliefs.beliefs.classes import BeliefsDataFrame
- from typing import Sequence
- from datetime import timedelta
- from flask import current_app
- from numpy import array
- from psycopg2.errors import UniqueViolation
- from rq.job import Job
- from sqlalchemy.exc import IntegrityError
- from flexmeasures.data import db
- from flexmeasures.data.utils import save_to_db
- from flexmeasures.api.common.responses import (
- invalid_replacement,
- ResponseTuple,
- request_processed,
- already_received_and_successfully_processed,
- )
- from flexmeasures.utils.error_utils import error_handling_router
- def upsample_values(
- value_groups: list[list[float]] | list[float],
- from_resolution: timedelta,
- to_resolution: timedelta,
- ) -> list[list[float]] | list[float]:
- """Upsample the values (in value groups) to a smaller resolution.
- from_resolution has to be a multiple of to_resolution"""
- if from_resolution % to_resolution == timedelta(hours=0):
- n = from_resolution // to_resolution
- if isinstance(value_groups[0], list):
- value_groups = [
- list(array(value_group).repeat(n)) for value_group in value_groups
- ]
- else:
- value_groups = list(array(value_groups).repeat(n))
- return value_groups
- def unique_ever_seen(iterable: Sequence, selector: Sequence):
- """
- Return unique iterable elements with corresponding lists of selector elements, preserving order.
- >>> a, b = unique_ever_seen([[10, 20], [10, 20], [20, 40]], [1, 2, 3])
- >>> print(a)
- [[10, 20], [20, 40]]
- >>> print(b)
- [[1, 2], 3]
- """
- u = []
- s = []
- for iterable_element, selector_element in zip(iterable, selector):
- if iterable_element not in u:
- u.append(iterable_element)
- s.append(selector_element)
- else:
- us = s[u.index(iterable_element)]
- if not isinstance(us, list):
- us = [us]
- us.append(selector_element)
- s[u.index(iterable_element)] = us
- return u, s
- def enqueue_forecasting_jobs(
- forecasting_jobs: list[Job] | None = None,
- ):
- """Enqueue forecasting jobs.
- :param forecasting_jobs: list of forecasting Jobs for redis queues.
- """
- if forecasting_jobs is not None:
- [current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]
- def save_and_enqueue(
- data: BeliefsDataFrame | list[BeliefsDataFrame],
- forecasting_jobs: list[Job] | None = None,
- save_changed_beliefs_only: bool = True,
- ) -> ResponseTuple:
- # Attempt to save
- status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only)
- db.session.commit()
- # Only enqueue forecasting jobs upon successfully saving new data
- if status[:7] == "success" and status != "success_but_nothing_new":
- enqueue_forecasting_jobs(forecasting_jobs)
- # Pick a response
- if status == "success":
- return request_processed()
- elif status in (
- "success_with_unchanged_beliefs_skipped",
- "success_but_nothing_new",
- ):
- return already_received_and_successfully_processed()
- return invalid_replacement()
- def catch_timed_belief_replacements(error: IntegrityError):
- """Catch IntegrityErrors due to a UniqueViolation on the TimedBelief primary key.
- Return a more informative message.
- """
- if isinstance(error.orig, UniqueViolation) and "timed_belief_pkey" in str(
- error.orig
- ):
- # Some beliefs represented replacements, which was forbidden
- return invalid_replacement()
- # Forward to our generic error handler
- return error_handling_router(error)
|