123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- from __future__ import annotations
- import hashlib
- import base64
- from typing import Type
- import functools
- from copy import deepcopy
- import inspect
- import click
- from sqlalchemy import JSON, String, cast, literal
- from flask import current_app
- from rq.job import Job
- from sqlalchemy import select
- from flexmeasures import Sensor, Asset
- from flexmeasures.data import db
- from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
- from flexmeasures.data.models.planning import Scheduler
- def get_scheduler_instance(
- scheduler_class: Type[Scheduler], asset_or_sensor: Asset | Sensor, scheduler_params
- ) -> Scheduler:
- """
- Get an instance of a Scheduler adapting for the previous Scheduler signature,
- where a sensor is passed, to the new one where the asset_or_sensor is introduced.
- """
- _scheduler_params = deepcopy(scheduler_params)
- if "asset_or_sensor" not in inspect.signature(scheduler_class).parameters:
- _scheduler_params["sensor"] = asset_or_sensor
- else:
- _scheduler_params["asset_or_sensor"] = asset_or_sensor
- return scheduler_class(**_scheduler_params)
- def get_asset_or_sensor_ref(asset_or_sensor: Asset | Sensor) -> dict:
- return {"id": asset_or_sensor.id, "class": asset_or_sensor.__class__.__name__}
- def get_asset_or_sensor_from_ref(asset_or_sensor: dict):
- """
- Fetch Asset or Sensor object described by the asset_or_sensor dictionary.
- This dictionary needs to contain the class name and row id.
- We currently cannot simplify this by just passing around the object
- instead of the class name: i.e. the function arguments need to
- be serializable as job parameters.
- Examples:
- >> get_asset_or_sensor({"class" : "Asset", "id" : 1})
- Asset(id=1)
- >> get_asset_or_sensor({"class" : "Sensor", "id" : 2})
- Sensor(id=2)
- """
- if asset_or_sensor["class"] == Asset.__name__:
- klass = Asset
- elif asset_or_sensor["class"] == Sensor.__name__:
- klass = Sensor
- else:
- raise ValueError(
- f"Unrecognized class `{asset_or_sensor['class']}`. Please, consider using GenericAsset or Sensor."
- )
- return db.session.get(klass, asset_or_sensor["id"])
- def get_or_create_model(
- model_class: Type[GenericAsset | GenericAssetType | Sensor], **kwargs
- ) -> GenericAsset | GenericAssetType | Sensor:
- """Get a model from the database or add it if it's missing.
- For example:
- >>> weather_station_type = get_or_create_model(
- ... GenericAssetType,
- ... name="weather station",
- ... description="A weather station with various sensors.",
- ... )
- """
- # unpack custom initialization parameters that map to multiple database columns
- init_kwargs = kwargs.copy()
- lookup_kwargs = kwargs.copy()
- if "knowledge_horizon" in kwargs:
- (
- lookup_kwargs["knowledge_horizon_fnc"],
- lookup_kwargs["knowledge_horizon_par"],
- ) = lookup_kwargs.pop("knowledge_horizon")
- # Find out which attributes are dictionaries mapped to JSON database columns,
- # or callables mapped to string database columns (by their name)
- filter_json_kwargs = {}
- filter_by_kwargs = lookup_kwargs.copy()
- for kw, arg in lookup_kwargs.items():
- model_attribute = getattr(model_class, kw)
- if hasattr(model_attribute, "type") and isinstance(model_attribute.type, JSON):
- filter_json_kwargs[kw] = filter_by_kwargs.pop(kw)
- elif callable(arg) and isinstance(model_attribute.type, String):
- # Callables are stored in the database by their name
- # e.g. knowledge_horizon_fnc = x_days_ago_at_y_oclock
- # is stored as "x_days_ago_at_y_oclock"
- filter_by_kwargs[kw] = filter_by_kwargs[kw].__name__
- else:
- # The kw is already present in filter_by_kwargs and doesn't need to be adapted
- # i.e. it can be used as an argument to .filter_by()
- pass
- # See if the model already exists as a db row
- model_query = select(model_class).filter_by(**filter_by_kwargs)
- for kw, arg in filter_json_kwargs.items():
- model_query = model_query.filter(
- cast(getattr(model_class, kw), String) == cast(literal(arg, JSON()), String)
- )
- model = db.session.execute(model_query).scalar_one_or_none()
- # Create the model and add it to the database if it didn't already exist
- if model is None:
- model = model_class(**init_kwargs)
- db.session.add(model)
- db.session.flush() # assign ID
- click.echo(f"Created {repr(model)}")
- return model
- def make_hash_sha256(o):
- """
- SHA256 instead of Python's hash function because apparently, python native hashing function
- yields different results on restarts.
- Source: https://stackoverflow.com/a/42151923
- """
- hasher = hashlib.sha256()
- hasher.update(repr(make_hashable(o)).encode())
- return base64.b64encode(hasher.digest()).decode()
- def make_hashable(o):
- """
- Function to create hashes for dictionaries with nested objects
- Source: https://stackoverflow.com/a/42151923
- """
- if isinstance(o, (tuple, list)):
- return tuple((make_hashable(e) for e in o))
- if isinstance(o, dict):
- return tuple(sorted((k, make_hashable(v)) for k, v in o.items()))
- if isinstance(o, (set, frozenset)):
- return tuple(sorted(make_hashable(e) for e in o))
- if callable(
- getattr(o, "make_hashable", None)
- ): # checks if the object o has the method make_hashable
- return o.make_hashable()
- return o
- def hash_function_arguments(args, kwags):
- """Combines the hashes of the args and kargs
- The way to go to do h(x,y) = hash(hash(x) || hash(y)) because it avoid the following:
- 1) h(x,y) = hash(x || y), might create a collision if we delete the last n characters of x and we append them in front of y. e.g h("abc", "d") = h("ab", "cd")
- 2) we don't want to sort x and y, because we need the function h(x,y) != h(y,x)
- 3) extra hashing just avoid that we can't decompose the input arguments and track if the same args or kwarg are called several times. More of a security measure I think.
- source: https://crypto.stackexchange.com/questions/55162/best-way-to-hash-two-values-into-one
- """
- return make_hash_sha256(
- make_hash_sha256(args) + make_hash_sha256(kwags)
- ) # concat two hashes
- def job_cache(queue: str):
- """
- To avoid recomputing the same task multiple times, this decorator checks if the function has already been called with the
- same arguments. Input arguments are hashed and stored as Redis keys with the values being the job IDs `input_arguments_hash:job_id`).
- The benefits of using redis to store the input arguments over a local cache, such as LRU Cache, are:
- 1) It will work in distributed environments (in computing clusters), where multiple workers will avoid repeating
- work as the cache will be shared across them.
- 2) Cached calls are logged, which means that we can easily debug.
- 3) Cache will still be there on restarts.
- Arguments
- :param queue: name of the queue
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- # Get the redis connection
- connection = current_app.redis_connection
- kwargs_for_hash = kwargs.copy()
- requeue = kwargs_for_hash.pop("requeue", False)
- # checking if force is an input argument of `func`
- force_new_job_creation = kwargs_for_hash.pop(
- "force_new_job_creation", False
- )
- # creating a hash from args and kwargs_for_hash
- args_hash = f"{queue}:{func.__name__}:{hash_function_arguments(args, kwargs_for_hash)}"
- # check the redis connection for whether the key hash exists
- if connection.exists(args_hash) and not force_new_job_creation:
- current_app.logger.info(
- f"The function {func.__name__} has been called already with the same arguments. Skipping..."
- )
- # get job id
- job_id = connection.get(args_hash).decode()
- # check if the job exists and, if it doesn't, skip fetching and generate new job
- if Job.exists(job_id, connection=connection):
- job = Job.fetch(
- job_id, connection=connection
- ) # get job object from job id
- # requeue if failed and requeue flag is true
- if job.is_failed and requeue:
- job.requeue()
- return job # returning the same job regardless of the status (SUCCESS, FAILED, ...)
- # if the job description is new -> create job
- job = func(*args, **kwargs) # create a new job
- # store function call in redis by mapping the hash of the function arguments to its job id
- connection.set(
- args_hash, job.id, ex=current_app.config["FLEXMEASURES_JOB_CACHE_TTL"]
- )
- return job
- return wrapper
- return decorator
|