scheduling.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. """
  2. Logic around scheduling (jobs)
  3. """
  4. from __future__ import annotations
  5. from datetime import datetime, timedelta
  6. import os
  7. import sys
  8. import importlib.util
  9. from importlib.abc import Loader
  10. from typing import Callable, Type
  11. import inspect
  12. from copy import deepcopy
  13. from traceback import print_tb
  14. from flask import current_app
  15. import click
  16. from rq import get_current_job, Callback
  17. from rq.exceptions import InvalidJobOperation
  18. from rq.job import Job
  19. import timely_beliefs as tb
  20. import pandas as pd
  21. from sqlalchemy import select
  22. from flexmeasures.data import db
  23. from flexmeasures.data.models.planning import Scheduler, SchedulerOutputType
  24. from flexmeasures.data.models.planning.storage import StorageScheduler
  25. from flexmeasures.data.models.planning.exceptions import InfeasibleProblemException
  26. from flexmeasures.data.models.planning.process import ProcessScheduler
  27. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  28. from flexmeasures.data.models.generic_assets import GenericAsset as Asset
  29. from flexmeasures.data.models.data_sources import DataSource
  30. from flexmeasures.data.schemas.scheduling import MultiSensorFlexModelSchema
  31. from flexmeasures.data.utils import get_data_source, save_to_db
  32. from flexmeasures.utils.time_utils import server_now
  33. from flexmeasures.data.services.utils import (
  34. job_cache,
  35. get_asset_or_sensor_ref,
  36. get_asset_or_sensor_from_ref,
  37. get_scheduler_instance,
  38. )
  39. def load_custom_scheduler(scheduler_specs: dict) -> type:
  40. """
  41. Read in custom scheduling spec.
  42. Attempt to load the Scheduler class to use.
  43. The scheduler class should be derived from flexmeasures.data.models.planning.Scheduler.
  44. The scheduler class should have a class method named "compute".
  45. Example specs:
  46. {
  47. "module": "/path/to/module.py", # or sthg importable, e.g. "package.module"
  48. "class": "NameOfSchedulerClass",
  49. }
  50. """
  51. assert isinstance(
  52. scheduler_specs, dict
  53. ), f"Scheduler specs is {type(scheduler_specs)}, should be a dict"
  54. assert "module" in scheduler_specs, "scheduler specs have no 'module'."
  55. assert "class" in scheduler_specs, "scheduler specs have no 'class'"
  56. scheduler_name = scheduler_specs["class"]
  57. # import module
  58. module_descr = scheduler_specs["module"]
  59. if os.path.exists(module_descr):
  60. spec = importlib.util.spec_from_file_location(scheduler_name, module_descr)
  61. assert spec, f"Could not load specs for scheduling module at {module_descr}."
  62. module = importlib.util.module_from_spec(spec)
  63. sys.modules[scheduler_name] = module
  64. assert isinstance(spec.loader, Loader)
  65. spec.loader.exec_module(module)
  66. else: # assume importable module
  67. try:
  68. module = importlib.import_module(module_descr)
  69. except TypeError as te:
  70. current_app.logger.error(f"Cannot load {module_descr}: {te}.")
  71. raise
  72. except ModuleNotFoundError:
  73. current_app.logger.error(
  74. f"Attempted to import module {module_descr} (as it is not a valid file path), but it is not installed."
  75. )
  76. raise
  77. assert module, f"Module {module_descr} could not be loaded."
  78. # get scheduling function
  79. assert hasattr(
  80. module, scheduler_specs["class"]
  81. ), f"Module at {module_descr} has no class {scheduler_specs['class']}"
  82. scheduler_class = getattr(module, scheduler_specs["class"])
  83. schedule_function_name = "compute"
  84. if not hasattr(scheduler_class, schedule_function_name):
  85. raise NotImplementedError(
  86. f"No function {schedule_function_name} in {scheduler_class}. Cannot load custom scheduler."
  87. )
  88. return scheduler_class
  89. def success_callback(job, connection, result, *args, **kwargs):
  90. queue = current_app.queues["scheduling"]
  91. orginal_job = Job.fetch(job.meta["original_job_id"], connection=connection)
  92. # requeue deferred jobs
  93. for dependent_job_ids in orginal_job.dependent_ids:
  94. queue.deferred_job_registry.requeue(dependent_job_ids)
  95. def trigger_optional_fallback(job, connection, type, value, traceback):
  96. """Create a fallback schedule job when the error is of type InfeasibleProblemException"""
  97. job.meta["exception"] = value
  98. job.save_meta()
  99. if type is InfeasibleProblemException:
  100. asset_or_sensor = get_asset_or_sensor_from_ref(job.meta.get("asset_or_sensor"))
  101. scheduler_kwargs = job.meta["scheduler_kwargs"]
  102. if ("scheduler_specs" in job.kwargs) and (
  103. job.kwargs["scheduler_specs"] is not None
  104. ):
  105. scheduler_class: Type[Scheduler] = load_custom_scheduler(
  106. job.kwargs["scheduler_specs"]
  107. )
  108. else:
  109. scheduler_class: Type[Scheduler] = find_scheduler_class(asset_or_sensor)
  110. # only schedule a fallback schedule job if the original job has a fallback
  111. # mechanism
  112. if scheduler_class.fallback_scheduler_class is not None:
  113. scheduler_class = scheduler_class.fallback_scheduler_class
  114. scheduler_specs = {
  115. "class": scheduler_class.__name__,
  116. "module": inspect.getmodule(scheduler_class).__name__,
  117. }
  118. fallback_job = create_scheduling_job(
  119. asset_or_sensor,
  120. force_new_job_creation=True,
  121. enqueue=False,
  122. scheduler_specs=scheduler_specs,
  123. success_callback=Callback(success_callback),
  124. **scheduler_kwargs,
  125. )
  126. # keep track of the id of the original (non-fallback) job
  127. fallback_job.meta["original_job_id"] = job.meta.get(
  128. "original_job_id", job.id
  129. )
  130. fallback_job.save_meta()
  131. job.meta["fallback_job_id"] = fallback_job.id
  132. job.save_meta()
  133. current_app.queues["scheduling"].enqueue_job(fallback_job)
  134. @job_cache("scheduling")
  135. def create_scheduling_job(
  136. asset_or_sensor: Asset | Sensor | None = None,
  137. sensor: Sensor | None = None,
  138. job_id: str | None = None,
  139. enqueue: bool = True,
  140. requeue: bool = False,
  141. force_new_job_creation: bool = False,
  142. scheduler_specs: dict | None = None,
  143. depends_on: Job | list[Job] | None = None,
  144. success_callback: Callable | None = None,
  145. **scheduler_kwargs,
  146. ) -> Job:
  147. """
  148. Create a new Job, which is queued for later execution.
  149. To support quick retrieval of the scheduling job, the job id is the unique entity address of the UDI event.
  150. That means one event leads to one job (i.e. actions are event driven).
  151. As a rule of thumb, keep arguments to the job simple, and deserializable.
  152. The life cycle of a scheduling job:
  153. 1. A scheduling job is born here (in create_scheduling_job).
  154. 2. It is run in make_schedule which writes results to the db.
  155. 3. If an error occurs (and the worker is configured accordingly), handle_scheduling_exception comes in.
  156. Arguments:
  157. :param asset_or_sensor: Asset or sensor for which the schedule is computed.
  158. :param job_id: Optionally, set a job id explicitly.
  159. :param enqueue: If True, enqueues the job in case it is new.
  160. :param requeue: If True, requeues the job in case it is not new and had previously failed
  161. (this argument is used by the @job_cache decorator).
  162. :param force_new_job_creation: If True, this attribute forces a new job to be created (skipping cache).
  163. :param success_callback: Callback function that runs on success
  164. (this argument is used by the @job_cache decorator).
  165. :returns: The job.
  166. """
  167. # We first create a scheduler and check if deserializing works, so the flex config is checked
  168. # and errors are raised before the job is enqueued (so users get a meaningful response right away).
  169. # Note: We are putting still serialized scheduler_kwargs into the job!
  170. if sensor is not None:
  171. current_app.logger.warning(
  172. "The `sensor` keyword argument is deprecated. Please, consider using the argument `asset_or_sensor`."
  173. )
  174. asset_or_sensor = sensor
  175. if scheduler_specs:
  176. scheduler_class: Type[Scheduler] = load_custom_scheduler(scheduler_specs)
  177. else:
  178. scheduler_class: Type[Scheduler] = find_scheduler_class(asset_or_sensor)
  179. scheduler = get_scheduler_instance(
  180. scheduler_class=scheduler_class,
  181. asset_or_sensor=asset_or_sensor,
  182. scheduler_params=scheduler_kwargs,
  183. )
  184. scheduler.deserialize_config()
  185. asset_or_sensor = get_asset_or_sensor_ref(asset_or_sensor)
  186. job = Job.create(
  187. make_schedule,
  188. kwargs=dict(
  189. asset_or_sensor=asset_or_sensor,
  190. scheduler_specs=scheduler_specs,
  191. **scheduler_kwargs,
  192. ),
  193. id=job_id,
  194. connection=current_app.queues["scheduling"].connection,
  195. ttl=int(
  196. current_app.config.get(
  197. "FLEXMEASURES_JOB_TTL", timedelta(-1)
  198. ).total_seconds()
  199. ),
  200. result_ttl=int(
  201. current_app.config.get(
  202. "FLEXMEASURES_PLANNING_TTL", timedelta(-1)
  203. ).total_seconds()
  204. ), # NB job.cleanup docs says a negative number of seconds means persisting forever
  205. on_failure=Callback(trigger_optional_fallback),
  206. on_success=success_callback,
  207. depends_on=depends_on,
  208. )
  209. job.meta["asset_or_sensor"] = asset_or_sensor
  210. job.meta["scheduler_kwargs"] = scheduler_kwargs
  211. job.save_meta()
  212. # in case the function enqueues it
  213. try:
  214. job_status = job.get_status(refresh=True)
  215. except InvalidJobOperation:
  216. job_status = None
  217. # with job_status=None, we ensure that only fresh new jobs are enqueued (otherwise, they should be requeued instead)
  218. if enqueue and not job_status:
  219. current_app.queues["scheduling"].enqueue_job(job)
  220. current_app.job_cache.add(
  221. asset_or_sensor["id"],
  222. job.id,
  223. queue="scheduling",
  224. asset_or_sensor_type=asset_or_sensor["class"].lower(),
  225. )
  226. return job
  227. def cb_done_sequential_scheduling_job(jobs_ids: list[str]):
  228. """
  229. TODO: maybe check if any of the subjobs used a fallback scheduler or accrued a relaxation penalty.
  230. """
  231. current_app.logger.info("Sequential scheduling job finished its chain of subjobs.")
  232. # jobs = [Job.fetch(job_id) for job_id in jobs_ids]
  233. @job_cache("scheduling")
  234. def create_sequential_scheduling_job(
  235. asset: Asset,
  236. job_id: str | None = None,
  237. enqueue: bool = True,
  238. requeue: bool = False,
  239. force_new_job_creation: bool = False,
  240. scheduler_specs: dict | None = None,
  241. depends_on: list[Job] | None = None,
  242. success_callback: Callable | None = None,
  243. **scheduler_kwargs,
  244. ) -> Job:
  245. """Create a chain of underlying jobs, one for each device, with one additional job to wrap up.
  246. :param asset: Asset (e.g. a site) for which the schedule is computed.
  247. :param job_id: Optionally, set a job id explicitly.
  248. :param enqueue: If True, enqueues the job in case it is new.
  249. :param requeue: If True, requeues the job in case it is not new and had previously failed
  250. (this argument is used by the @job_cache decorator).
  251. :param force_new_job_creation: If True, this attribute forces a new job to be created (skipping cache).
  252. :param success_callback: Callback function that runs on success
  253. (this argument is used by the @job_cache decorator).
  254. :param scheduler_kwargs: Dict containing start and end (both deserialized) the flex-context (serialized),
  255. and the flex-model (partially deserialized, see example below).
  256. :returns: The wrap-up job.
  257. Example of a partially deserialized flex-model per sensor:
  258. scheduler_kwargs["flex_model"] = [
  259. dict(
  260. sensor=<Sensor 5: power, unit: MW res.: 0:15:00>,
  261. sensor_flex_model={
  262. 'consumption-capacity': '10 kW',
  263. },
  264. ),
  265. dict(
  266. sensor=<deserialized sensor object>,
  267. sensor_flex_model=<still serialized flex-model>,
  268. ),
  269. ]
  270. """
  271. if enqueue is False:
  272. raise NotImplementedError(
  273. "See why: https://github.com/FlexMeasures/flexmeasures/pull/1313/files#r1971479492"
  274. )
  275. flex_model = scheduler_kwargs["flex_model"]
  276. jobs = []
  277. previous_sensors = []
  278. previous_job = depends_on
  279. for child_flex_model in flex_model:
  280. sensor = child_flex_model.pop("sensor")
  281. current_scheduler_kwargs = deepcopy(scheduler_kwargs)
  282. current_scheduler_kwargs["flex_model"] = child_flex_model["sensor_flex_model"]
  283. if "inflexible-device-sensors" not in current_scheduler_kwargs["flex_context"]:
  284. current_scheduler_kwargs["flex_context"]["inflexible-device-sensors"] = []
  285. current_scheduler_kwargs["flex_context"]["inflexible-device-sensors"].extend(
  286. previous_sensors
  287. )
  288. current_scheduler_kwargs["resolution"] = sensor.event_resolution
  289. current_scheduler_kwargs["sensor"] = sensor
  290. job = create_scheduling_job(
  291. **current_scheduler_kwargs,
  292. scheduler_specs=scheduler_specs,
  293. requeue=requeue,
  294. job_id=job_id,
  295. enqueue=enqueue,
  296. depends_on=previous_job,
  297. force_new_job_creation=force_new_job_creation,
  298. )
  299. jobs.append(job)
  300. previous_sensors.append(sensor.id)
  301. previous_job = job
  302. # create job that triggers when the last job is done
  303. job = Job.create(
  304. func=cb_done_sequential_scheduling_job,
  305. args=([j.id for j in jobs],),
  306. depends_on=previous_job,
  307. ttl=int(
  308. current_app.config.get(
  309. "FLEXMEASURES_JOB_TTL", timedelta(-1)
  310. ).total_seconds()
  311. ),
  312. result_ttl=int(
  313. current_app.config.get(
  314. "FLEXMEASURES_PLANNING_TTL", timedelta(-1)
  315. ).total_seconds()
  316. ), # NB job.cleanup docs says a negative number of seconds means persisting forever
  317. on_success=success_callback,
  318. connection=current_app.queues["scheduling"].connection,
  319. )
  320. try:
  321. job_status = job.get_status(refresh=True)
  322. except InvalidJobOperation:
  323. job_status = None
  324. # with job_status=None, we ensure that only fresh new jobs are enqueued (otherwise, they should be requeued instead)
  325. if enqueue and not job_status:
  326. current_app.queues["scheduling"].enqueue_job(job)
  327. current_app.job_cache.add(
  328. asset.id,
  329. job.id,
  330. queue="scheduling",
  331. asset_or_sensor_type="asset",
  332. )
  333. return job
  334. @job_cache("scheduling")
  335. def create_simultaneous_scheduling_job(
  336. asset: Asset,
  337. job_id: str | None = None,
  338. enqueue: bool = True,
  339. requeue: bool = False,
  340. force_new_job_creation: bool = False,
  341. scheduler_specs: dict | None = None,
  342. depends_on: list[Job] | None = None,
  343. success_callback: Callable | None = None,
  344. **scheduler_kwargs,
  345. ) -> Job:
  346. """Create a single job to schedule all devices at once.
  347. :param asset: Asset (e.g. a site) for which the schedule is computed.
  348. :param job_id: Optionally, set a job id explicitly.
  349. :param enqueue: If True, enqueues the job in case it is new.
  350. :param requeue: If True, requeues the job in case it is not new and had previously failed
  351. (this argument is used by the @job_cache decorator).
  352. :param force_new_job_creation: If True, this attribute forces a new job to be created (skipping cache).
  353. :param success_callback: Callback function that runs on success
  354. (this argument is used by the @job_cache decorator).
  355. :param scheduler_kwargs: Dict containing start and end (both deserialized) the flex-context (serialized),
  356. and the flex-model (partially deserialized, see example below).
  357. :returns: The wrap-up job.
  358. Example of a partially deserialized flex-model per sensor:
  359. scheduler_kwargs["flex_model"] = [
  360. dict(
  361. sensor=<Sensor 5: power, unit: MW res.: 0:15:00>,
  362. sensor_flex_model={
  363. 'consumption-capacity': '10 kW',
  364. },
  365. ),
  366. dict(
  367. sensor=<deserialized sensor object>,
  368. sensor_flex_model=<still serialized flex-model>,
  369. ),
  370. ]
  371. """
  372. # Convert (partially) deserialized fields back to serialized form
  373. scheduler_kwargs["flex_model"] = MultiSensorFlexModelSchema(many=True).dump(
  374. scheduler_kwargs["flex_model"]
  375. )
  376. job = create_scheduling_job(
  377. asset_or_sensor=asset,
  378. **scheduler_kwargs,
  379. scheduler_specs=scheduler_specs,
  380. requeue=requeue,
  381. job_id=job_id,
  382. enqueue=False, # we enqueue all jobs later in this method
  383. depends_on=depends_on,
  384. success_callback=success_callback,
  385. force_new_job_creation=force_new_job_creation,
  386. )
  387. try:
  388. job_status = job.get_status(refresh=True)
  389. except InvalidJobOperation:
  390. job_status = None
  391. # with job_status=None, we ensure that only fresh new jobs are enqueued (otherwise, they should be requeued instead)
  392. if enqueue and not job_status:
  393. current_app.queues["scheduling"].enqueue_job(job)
  394. current_app.job_cache.add(
  395. asset.id,
  396. job.id,
  397. queue="scheduling",
  398. asset_or_sensor_type="asset",
  399. )
  400. return job
  401. def make_schedule(
  402. sensor_id: int | None = None,
  403. start: datetime | None = None,
  404. end: datetime | None = None,
  405. resolution: timedelta | None = None,
  406. asset_or_sensor: dict | None = None,
  407. belief_time: datetime | None = None,
  408. flex_model: dict | None = None,
  409. flex_context: dict | None = None,
  410. flex_config_has_been_deserialized: bool = False,
  411. scheduler_specs: dict | None = None,
  412. **scheduler_kwargs: dict,
  413. ) -> bool:
  414. """
  415. This function computes a schedule. It returns True if it ran successfully.
  416. It can be queued as a job (see create_scheduling_job).
  417. In that case, it will probably run on a different FlexMeasures node than where the job is created.
  418. In any case, this function expects flex_model and flex_context to not have been deserialized yet.
  419. This is what this function does:
  420. - Find out which scheduler should be used & compute the schedule
  421. - Turn scheduled values into beliefs and save them to db
  422. """
  423. # https://docs.sqlalchemy.org/en/13/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork
  424. db.engine.dispose()
  425. if sensor_id is not None:
  426. current_app.logger.warning(
  427. "The `sensor_id` keyword argument is deprecated. Please, consider using the argument `asset_or_sensor`."
  428. )
  429. asset_or_sensor = {"class": "Sensor", "id": sensor_id}
  430. asset_or_sensor: Asset | Sensor = get_asset_or_sensor_from_ref(asset_or_sensor)
  431. rq_job = get_current_job()
  432. if rq_job:
  433. click.echo(
  434. "Running Scheduling Job %s: %s, from %s to %s"
  435. % (rq_job.id, asset_or_sensor, start, end)
  436. )
  437. if scheduler_specs:
  438. scheduler_class: Type[Scheduler] = load_custom_scheduler(scheduler_specs)
  439. else:
  440. scheduler_class: Type[Scheduler] = find_scheduler_class(asset_or_sensor)
  441. data_source_info = scheduler_class.get_data_source_info()
  442. if belief_time is None:
  443. belief_time = server_now()
  444. scheduler_params = dict(
  445. start=start,
  446. end=end,
  447. resolution=resolution,
  448. belief_time=belief_time,
  449. flex_model=flex_model,
  450. flex_context=flex_context,
  451. return_multiple=True,
  452. **scheduler_kwargs,
  453. )
  454. scheduler: Scheduler = get_scheduler_instance(
  455. scheduler_class=scheduler_class,
  456. asset_or_sensor=asset_or_sensor,
  457. scheduler_params=scheduler_params,
  458. )
  459. if flex_config_has_been_deserialized:
  460. scheduler.config_deserialized = True
  461. # we get the default scheduler info in case it fails in the compute step
  462. if rq_job:
  463. click.echo("Job %s made schedule." % rq_job.id)
  464. rq_job.meta["scheduler_info"] = scheduler.info
  465. consumption_schedule: SchedulerOutputType = scheduler.compute()
  466. # in case we are getting a custom Scheduler that hasn't implemented the multiple output return
  467. # this should only be called whenever the Scheduler applies to the Sensor.
  468. if isinstance(consumption_schedule, pd.Series):
  469. assert isinstance(asset_or_sensor, Sensor), ""
  470. consumption_schedule = [
  471. {
  472. "name": "consumption_schedule",
  473. "data": consumption_schedule,
  474. "sensor": asset_or_sensor,
  475. }
  476. ]
  477. if rq_job:
  478. click.echo("Job %s made schedule." % rq_job.id)
  479. rq_job.meta["scheduler_info"] = scheduler.info
  480. data_source = get_data_source(
  481. data_source_name=data_source_info["name"],
  482. data_source_model=data_source_info["model"],
  483. data_source_version=data_source_info["version"],
  484. data_source_type="scheduler",
  485. )
  486. # saving info on the job, so the API for a job can look the data up
  487. if rq_job:
  488. data_source_info["id"] = data_source.id
  489. rq_job.meta["data_source_info"] = data_source_info
  490. rq_job.save_meta()
  491. # Save any result that specifies a sensor to save it to
  492. for result in consumption_schedule:
  493. if "sensor" not in result:
  494. continue
  495. sign = 1
  496. if result["sensor"].measures_power and result["sensor"].get_attribute(
  497. "consumption_is_positive", True
  498. ):
  499. sign = -1
  500. ts_value_schedule = [
  501. TimedBelief(
  502. event_start=dt,
  503. belief_time=belief_time,
  504. event_value=sign * value,
  505. sensor=result["sensor"],
  506. source=data_source,
  507. )
  508. for dt, value in result["data"].items()
  509. ] # For consumption schedules, positive values denote consumption. For the db, consumption is negative
  510. bdf = tb.BeliefsDataFrame(ts_value_schedule)
  511. save_to_db(bdf)
  512. scheduler.persist_flex_model()
  513. db.session.commit()
  514. return True
  515. def find_scheduler_class(asset_or_sensor: Asset | Sensor) -> type:
  516. """
  517. Find out which scheduler to use, given an asset or sensor.
  518. This will morph into a logic store utility, and schedulers should be registered for asset types there,
  519. instead of this fixed lookup logic.
  520. """
  521. # Choose which algorithm to use TODO: unify loading this into a func store concept
  522. # first try to look if there's a "custom-scheduler" defined
  523. if "custom-scheduler" in asset_or_sensor.attributes:
  524. scheduler_specs = asset_or_sensor.attributes.get("custom-scheduler")
  525. scheduler_class = load_custom_scheduler(scheduler_specs)
  526. return scheduler_class
  527. if isinstance(asset_or_sensor, Sensor):
  528. asset = asset_or_sensor.generic_asset
  529. else:
  530. asset = asset_or_sensor
  531. if asset.generic_asset_type.name in (
  532. "battery",
  533. "one-way_evse",
  534. "two-way_evse",
  535. ):
  536. scheduler_class = StorageScheduler
  537. elif asset.generic_asset_type.name in ("process", "load"):
  538. scheduler_class = ProcessScheduler
  539. else:
  540. raise ValueError(
  541. "Scheduling is not (yet) supported for asset type %s."
  542. % asset.generic_asset_type
  543. )
  544. return scheduler_class
  545. def handle_scheduling_exception(job, exc_type, exc_value, traceback):
  546. """
  547. Store exception as job meta data.
  548. """
  549. click.echo(
  550. "HANDLING RQ SCHEDULING WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value)
  551. )
  552. print_tb(traceback)
  553. job.meta["exception"] = exc_value
  554. job.save_meta()
  555. def get_data_source_for_job(job: Job) -> DataSource | None:
  556. """
  557. Try to find the data source linked by this scheduling job.
  558. We expect that enough info on the source was placed in the meta dict, either:
  559. - the DataSource ID itself (i.e. the normal situation), or
  560. - enough info to facilitate a DataSource query (as a fallback).
  561. """
  562. data_source_info = job.meta.get("data_source_info")
  563. if data_source_info and "id" in data_source_info:
  564. # this is the expected outcome
  565. return db.session.get(DataSource, data_source_info["id"])
  566. if data_source_info is None:
  567. raise ValueError(
  568. "Cannot look up scheduling data without knowing the full data_source_info (version)."
  569. )
  570. scheduler_sources = db.session.scalars(
  571. select(DataSource)
  572. .filter_by(
  573. type="scheduler",
  574. **data_source_info,
  575. )
  576. .order_by(DataSource.version.desc())
  577. ).all() # Might still be more than one, e.g. per user
  578. if len(scheduler_sources) == 0:
  579. return None
  580. return scheduler_sources[0]