jobs.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. """
  2. CLI commands for controlling jobs
  3. """
  4. from __future__ import annotations
  5. import os
  6. import random
  7. import string
  8. from types import TracebackType
  9. from typing import Type
  10. import click
  11. from flask import current_app as app
  12. from flask.cli import with_appcontext
  13. from rq import Queue, Worker
  14. from rq.job import Job
  15. from rq.registry import (
  16. CanceledJobRegistry,
  17. DeferredJobRegistry,
  18. FailedJobRegistry,
  19. FinishedJobRegistry,
  20. ScheduledJobRegistry,
  21. StartedJobRegistry,
  22. )
  23. from sqlalchemy.orm import configure_mappers
  24. from tabulate import tabulate
  25. import pandas as pd
  26. from flexmeasures.data.schemas import AssetIdField, SensorIdField
  27. from flexmeasures.data.services.scheduling import handle_scheduling_exception
  28. from flexmeasures.data.services.forecasting import handle_forecasting_exception
  29. from flexmeasures.cli.utils import MsgStyle
  30. REGISTRY_MAP = dict(
  31. canceled=CanceledJobRegistry,
  32. deferred=DeferredJobRegistry,
  33. failed=FailedJobRegistry,
  34. finished=FinishedJobRegistry,
  35. started=StartedJobRegistry,
  36. scheduled=ScheduledJobRegistry,
  37. )
  38. @click.group("jobs")
  39. def fm_jobs():
  40. """FlexMeasures: Job queueing."""
  41. @fm_jobs.command("run-worker")
  42. @with_appcontext
  43. @click.option(
  44. "--queue",
  45. default=None,
  46. required=True,
  47. help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
  48. )
  49. @click.option(
  50. "--name",
  51. default=None,
  52. required=False,
  53. help="Give your worker a recognizable name. Defaults to random string. Defaults to fm-worker-<randomstring>",
  54. )
  55. def run_worker(queue: str, name: str | None):
  56. """
  57. Start a worker process for forecasting and/or scheduling jobs.
  58. We use the app context to find out which redis queues to use.
  59. """
  60. q_list = parse_queue_list(queue)
  61. # https://stackoverflow.com/questions/50822822/high-sqlalchemy-initialization-overhead
  62. configure_mappers()
  63. connection = app.queues["forecasting"].connection
  64. # provide a random name if none was given
  65. if name is None:
  66. name = "fm-worker-" + "".join(random.sample(string.ascii_lowercase * 6, 6))
  67. worker_names = [w.name for w in Worker.all(connection=connection)]
  68. # making sure the name is unique
  69. used_name = name
  70. name_suffixes = iter(range(1, 51))
  71. while used_name in worker_names:
  72. used_name = f"{name}-{next(name_suffixes)}"
  73. error_handler = handle_worker_exception
  74. if queue == "scheduling":
  75. error_handler = handle_scheduling_exception
  76. elif queue == "forecasting":
  77. error_handler = handle_forecasting_exception
  78. worker = Worker(
  79. q_list,
  80. connection=connection,
  81. name=used_name,
  82. exception_handlers=[error_handler],
  83. )
  84. click.echo("\n=========================================================")
  85. click.secho(
  86. 'Worker "%s" initialised: %s ― processing %s queue(s)'
  87. % (worker.name, worker, len(q_list)),
  88. **MsgStyle.SUCCESS,
  89. )
  90. for q in q_list:
  91. click.echo("Running against %s on %s" % (q, q.connection))
  92. click.echo("=========================================================\n")
  93. worker.work()
  94. @fm_jobs.command("show-queues")
  95. @with_appcontext
  96. def show_queues():
  97. """
  98. Show the job queues and their job counts (including the "failed" registry).
  99. To inspect contents, go to the RQ-Dashboard at <flexmeasures-URL>/tasks
  100. We use the app context to find out which redis queues to use.
  101. """
  102. configure_mappers()
  103. queue_data = [
  104. (
  105. q.name,
  106. q.started_job_registry.count,
  107. q.count,
  108. q.deferred_job_registry.count,
  109. q.scheduled_job_registry.count,
  110. q.failed_job_registry.count,
  111. )
  112. for q in app.queues.values()
  113. ]
  114. click.echo(
  115. tabulate(
  116. queue_data,
  117. headers=["Queue", "Started", "Queued", "Deferred", "Scheduled", "Failed"],
  118. )
  119. )
  120. @fm_jobs.command("save-last")
  121. @with_appcontext
  122. @click.option(
  123. "--n",
  124. type=int,
  125. default=10,
  126. help="The number of last jobs to save.",
  127. )
  128. @click.option(
  129. "--queue",
  130. "queue_name",
  131. type=str,
  132. default="scheduling",
  133. help="The queue to look in.",
  134. )
  135. @click.option(
  136. "--registry",
  137. "registry_name",
  138. type=click.Choice(REGISTRY_MAP.keys()),
  139. default="failed",
  140. help="The registry to look in.",
  141. )
  142. @click.option(
  143. "--asset",
  144. "asset_id",
  145. type=AssetIdField(),
  146. callback=lambda ctx, param, value: value.id if value else None,
  147. required=False,
  148. help="The asset ID to filter by.",
  149. )
  150. @click.option(
  151. "--sensor",
  152. "sensor_id",
  153. type=SensorIdField(),
  154. callback=lambda ctx, param, value: value.id if value else None,
  155. required=False,
  156. help="The sensor ID to filter by.",
  157. )
  158. @click.option(
  159. "--file",
  160. type=click.Path(),
  161. default="last_jobs.csv",
  162. help="The CSV file to save the found jobs.",
  163. )
  164. def save_last(
  165. n: int,
  166. queue_name: str,
  167. registry_name: str,
  168. asset_id: int | None,
  169. sensor_id: int | None,
  170. file: str,
  171. ):
  172. """
  173. Save the last n jobs to a file (by default, the last 10 failed jobs).
  174. """
  175. available_queues = app.queues
  176. if queue_name not in available_queues.keys():
  177. click.secho(
  178. f"Unknown queue '{queue_name}'. Available queues: {available_queues.keys()}",
  179. **MsgStyle.ERROR,
  180. )
  181. raise click.Abort()
  182. else:
  183. queue = available_queues[queue_name]
  184. registry = REGISTRY_MAP[registry_name](queue=queue)
  185. job_ids = registry.get_job_ids()[-n:]
  186. found_jobs = []
  187. for job_id in job_ids:
  188. try:
  189. job = Job.fetch(job_id, connection=queue.connection)
  190. kwargs = job.kwargs or {}
  191. entity_info = kwargs.get("asset_or_sensor", {})
  192. if (
  193. (not asset_id and not sensor_id)
  194. or (
  195. entity_info.get("class") == "Asset"
  196. and entity_info.get("id") == asset_id
  197. )
  198. or (
  199. entity_info.get("class") == "Sensor"
  200. and entity_info.get("id") == sensor_id
  201. )
  202. ):
  203. found_jobs.append(
  204. {
  205. "Job ID": job.id,
  206. "ID": entity_info.get("id", "N/A"),
  207. "Class": entity_info.get("class", "N/A"),
  208. "Error": job.exc_info,
  209. "All kwargs": kwargs,
  210. "Function name": getattr(job, "func_name", "N/A"),
  211. "Started at": getattr(job, "started_at", "N/A"),
  212. "Ended at": getattr(job, "ended_at", "N/A"),
  213. }
  214. )
  215. except Exception as e:
  216. click.secho(
  217. f"Job {job_id} failed to fetch with error: {str(e)}", fg="yellow"
  218. )
  219. if found_jobs:
  220. if os.path.exists(file):
  221. if not click.confirm(f"{file} already exists. Overwrite?", default=False):
  222. new_file = click.prompt(
  223. "Enter a new filename (must end with .csv)", type=str
  224. )
  225. while not new_file.lower().endswith(".csv"):
  226. click.secho("Invalid filename. It must end with .csv.", fg="red")
  227. new_file = click.prompt(
  228. "Enter a new filename (must end with .csv)", type=str
  229. )
  230. file = new_file
  231. # Save the found jobs to a CSV file
  232. pd.DataFrame(found_jobs).sort_values("Started at", ascending=False).to_csv(
  233. file, index=False
  234. )
  235. click.secho(
  236. f"Saved {len(found_jobs)} {registry_name} jobs to {file}.", fg="green"
  237. )
  238. return
  239. elif asset_id:
  240. filter_message = f" for asset {asset_id} among the last {n} jobs"
  241. elif sensor_id:
  242. filter_message = f" for sensor {sensor_id} among the last {n} jobs"
  243. else:
  244. filter_message = ""
  245. click.secho(f"No {registry_name} jobs found{filter_message}.", fg="yellow")
  246. @fm_jobs.command("clear-queue")
  247. @with_appcontext
  248. @click.option(
  249. "--queue",
  250. default=None,
  251. required=True,
  252. help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
  253. )
  254. @click.option(
  255. "--deferred",
  256. is_flag=True,
  257. default=False,
  258. help="If True, the deferred registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
  259. )
  260. @click.option(
  261. "--scheduled",
  262. is_flag=True,
  263. default=False,
  264. help="If True, the scheduled registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
  265. )
  266. @click.option(
  267. "--failed",
  268. is_flag=True,
  269. default=False,
  270. help="If True, the failed registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
  271. )
  272. def clear_queue(queue: str, deferred: bool, scheduled: bool, failed: bool):
  273. """
  274. Clear a job queue (or its registry of deferred/scheduled/failed jobs).
  275. We use the app context to find out which redis queues to use.
  276. """
  277. q_list = parse_queue_list(queue)
  278. registries = dict(
  279. deferred=("deferred_job_registry", deferred),
  280. scheduled=("scheduled_job_registry", scheduled),
  281. failed=("failed_job_registry", failed),
  282. )
  283. configure_mappers()
  284. for the_queue in q_list:
  285. for _type, (registry, needs_clearing) in registries.items():
  286. if needs_clearing:
  287. reg = getattr(the_queue, registry)
  288. count_before = reg.count
  289. for job_id in reg.get_job_ids():
  290. reg.remove(job_id) # not actually deleting the job
  291. count_after = reg.count
  292. click.secho(
  293. f"Cleared {count_before - count_after} {_type} jobs from the {registry} at {the_queue}.",
  294. **MsgStyle.WARN,
  295. )
  296. wrap_up_message(count_after)
  297. if not any([deferred, scheduled, failed]):
  298. count_before = the_queue.count
  299. if count_before > 0:
  300. the_queue.empty()
  301. count_after = the_queue.count
  302. click.secho(
  303. f"Cleared {count_before - count_after} jobs from {the_queue}.",
  304. **MsgStyle.SUCCESS,
  305. )
  306. wrap_up_message(count_after)
  307. @fm_jobs.command("delete-queue")
  308. @with_appcontext
  309. @click.option(
  310. "--queue",
  311. default=None,
  312. required=True,
  313. help="State which queue to delete.",
  314. )
  315. def delete_queue(queue: str):
  316. """
  317. Delete a job queue.
  318. """
  319. if not app.redis_connection.sismember("rq:queues", f"rq:queue:{queue}"):
  320. click.secho(
  321. f"Queue '{queue}' does not exist.",
  322. **MsgStyle.ERROR,
  323. )
  324. raise click.Abort()
  325. success = app.redis_connection.srem("rq:queues", f"rq:queue:{queue}")
  326. if success:
  327. click.secho(
  328. f"Queue '{queue}' removed.",
  329. **MsgStyle.SUCCESS,
  330. )
  331. else:
  332. click.secho(
  333. f"Failed to remove queue '{queue}'.",
  334. **MsgStyle.ERROR,
  335. )
  336. raise click.Abort()
  337. def wrap_up_message(count_after: int):
  338. if count_after > 0:
  339. click.secho(
  340. f"There are {count_after} jobs which could not be removed for some reason.",
  341. **MsgStyle.WARN,
  342. )
  343. else:
  344. click.echo("No jobs left.")
  345. def handle_worker_exception(
  346. job: Job,
  347. exc_type: Type[Exception],
  348. exc_value: Exception,
  349. traceback: TracebackType,
  350. ) -> None:
  351. """
  352. Just a fallback, usually we would use the per-queue handler.
  353. """
  354. queue_name = job.origin
  355. click.echo(f"HANDLING RQ {queue_name.upper()} EXCEPTION: {exc_type}: {exc_value}")
  356. job.meta["exception"] = str(exc_value) # meta must contain JSON serializable data
  357. job.save_meta()
  358. def parse_queue_list(queue_names_str: str) -> list[Queue]:
  359. """Parse a | separated string of queue names against the app.queues dict.
  360. The app.queues dict is expected to have queue names as keys, and rq.Queue objects as values.
  361. :param queue_names_str: a string with queue names separated by the | character
  362. :returns: a list of Queue objects.
  363. """
  364. q_list = []
  365. for q_name in queue_names_str.split("|"):
  366. if q_name in app.queues:
  367. q_list.append(app.queues[q_name])
  368. else:
  369. raise ValueError(f"Unknown queue '{q_name}'.")
  370. return q_list
  371. app.cli.add_command(fm_jobs)