from __future__ import annotations from timely_beliefs.beliefs.classes import BeliefsDataFrame from typing import Sequence from datetime import timedelta from flask import current_app from numpy import array from psycopg2.errors import UniqueViolation from rq.job import Job from sqlalchemy.exc import IntegrityError from flexmeasures.data import db from flexmeasures.data.utils import save_to_db from flexmeasures.api.common.responses import ( invalid_replacement, ResponseTuple, request_processed, already_received_and_successfully_processed, ) from flexmeasures.utils.error_utils import error_handling_router def upsample_values( value_groups: list[list[float]] | list[float], from_resolution: timedelta, to_resolution: timedelta, ) -> list[list[float]] | list[float]: """Upsample the values (in value groups) to a smaller resolution. from_resolution has to be a multiple of to_resolution""" if from_resolution % to_resolution == timedelta(hours=0): n = from_resolution // to_resolution if isinstance(value_groups[0], list): value_groups = [ list(array(value_group).repeat(n)) for value_group in value_groups ] else: value_groups = list(array(value_groups).repeat(n)) return value_groups def unique_ever_seen(iterable: Sequence, selector: Sequence): """ Return unique iterable elements with corresponding lists of selector elements, preserving order. >>> a, b = unique_ever_seen([[10, 20], [10, 20], [20, 40]], [1, 2, 3]) >>> print(a) [[10, 20], [20, 40]] >>> print(b) [[1, 2], 3] """ u = [] s = [] for iterable_element, selector_element in zip(iterable, selector): if iterable_element not in u: u.append(iterable_element) s.append(selector_element) else: us = s[u.index(iterable_element)] if not isinstance(us, list): us = [us] us.append(selector_element) s[u.index(iterable_element)] = us return u, s def enqueue_forecasting_jobs( forecasting_jobs: list[Job] | None = None, ): """Enqueue forecasting jobs. :param forecasting_jobs: list of forecasting Jobs for redis queues. """ if forecasting_jobs is not None: [current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs] def save_and_enqueue( data: BeliefsDataFrame | list[BeliefsDataFrame], forecasting_jobs: list[Job] | None = None, save_changed_beliefs_only: bool = True, ) -> ResponseTuple: # Attempt to save status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only) db.session.commit() # Only enqueue forecasting jobs upon successfully saving new data if status[:7] == "success" and status != "success_but_nothing_new": enqueue_forecasting_jobs(forecasting_jobs) # Pick a response if status == "success": return request_processed() elif status in ( "success_with_unchanged_beliefs_skipped", "success_but_nothing_new", ): return already_received_and_successfully_processed() return invalid_replacement() def catch_timed_belief_replacements(error: IntegrityError): """Catch IntegrityErrors due to a UniqueViolation on the TimedBelief primary key. Return a more informative message. """ if isinstance(error.orig, UniqueViolation) and "timed_belief_pkey" in str( error.orig ): # Some beliefs represented replacements, which was forbidden return invalid_replacement() # Forward to our generic error handler return error_handling_router(error)