123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- """
- CLI commands for controlling jobs
- """
- from __future__ import annotations
- import os
- import random
- import string
- from types import TracebackType
- from typing import Type
- import click
- from flask import current_app as app
- from flask.cli import with_appcontext
- from rq import Queue, Worker
- from rq.job import Job
- from rq.registry import (
- CanceledJobRegistry,
- DeferredJobRegistry,
- FailedJobRegistry,
- FinishedJobRegistry,
- ScheduledJobRegistry,
- StartedJobRegistry,
- )
- from sqlalchemy.orm import configure_mappers
- from tabulate import tabulate
- import pandas as pd
- from flexmeasures.data.schemas import AssetIdField, SensorIdField
- from flexmeasures.data.services.scheduling import handle_scheduling_exception
- from flexmeasures.data.services.forecasting import handle_forecasting_exception
- from flexmeasures.cli.utils import MsgStyle
- REGISTRY_MAP = dict(
- canceled=CanceledJobRegistry,
- deferred=DeferredJobRegistry,
- failed=FailedJobRegistry,
- finished=FinishedJobRegistry,
- started=StartedJobRegistry,
- scheduled=ScheduledJobRegistry,
- )
- @click.group("jobs")
- def fm_jobs():
- """FlexMeasures: Job queueing."""
- @fm_jobs.command("run-worker")
- @with_appcontext
- @click.option(
- "--queue",
- default=None,
- required=True,
- help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
- )
- @click.option(
- "--name",
- default=None,
- required=False,
- help="Give your worker a recognizable name. Defaults to random string. Defaults to fm-worker-<randomstring>",
- )
- def run_worker(queue: str, name: str | None):
- """
- Start a worker process for forecasting and/or scheduling jobs.
- We use the app context to find out which redis queues to use.
- """
- q_list = parse_queue_list(queue)
- # https://stackoverflow.com/questions/50822822/high-sqlalchemy-initialization-overhead
- configure_mappers()
- connection = app.queues["forecasting"].connection
- # provide a random name if none was given
- if name is None:
- name = "fm-worker-" + "".join(random.sample(string.ascii_lowercase * 6, 6))
- worker_names = [w.name for w in Worker.all(connection=connection)]
- # making sure the name is unique
- used_name = name
- name_suffixes = iter(range(1, 51))
- while used_name in worker_names:
- used_name = f"{name}-{next(name_suffixes)}"
- error_handler = handle_worker_exception
- if queue == "scheduling":
- error_handler = handle_scheduling_exception
- elif queue == "forecasting":
- error_handler = handle_forecasting_exception
- worker = Worker(
- q_list,
- connection=connection,
- name=used_name,
- exception_handlers=[error_handler],
- )
- click.echo("\n=========================================================")
- click.secho(
- 'Worker "%s" initialised: %s ― processing %s queue(s)'
- % (worker.name, worker, len(q_list)),
- **MsgStyle.SUCCESS,
- )
- for q in q_list:
- click.echo("Running against %s on %s" % (q, q.connection))
- click.echo("=========================================================\n")
- worker.work()
- @fm_jobs.command("show-queues")
- @with_appcontext
- def show_queues():
- """
- Show the job queues and their job counts (including the "failed" registry).
- To inspect contents, go to the RQ-Dashboard at <flexmeasures-URL>/tasks
- We use the app context to find out which redis queues to use.
- """
- configure_mappers()
- queue_data = [
- (
- q.name,
- q.started_job_registry.count,
- q.count,
- q.deferred_job_registry.count,
- q.scheduled_job_registry.count,
- q.failed_job_registry.count,
- )
- for q in app.queues.values()
- ]
- click.echo(
- tabulate(
- queue_data,
- headers=["Queue", "Started", "Queued", "Deferred", "Scheduled", "Failed"],
- )
- )
- @fm_jobs.command("save-last")
- @with_appcontext
- @click.option(
- "--n",
- type=int,
- default=10,
- help="The number of last jobs to save.",
- )
- @click.option(
- "--queue",
- "queue_name",
- type=str,
- default="scheduling",
- help="The queue to look in.",
- )
- @click.option(
- "--registry",
- "registry_name",
- type=click.Choice(REGISTRY_MAP.keys()),
- default="failed",
- help="The registry to look in.",
- )
- @click.option(
- "--asset",
- "asset_id",
- type=AssetIdField(),
- callback=lambda ctx, param, value: value.id if value else None,
- required=False,
- help="The asset ID to filter by.",
- )
- @click.option(
- "--sensor",
- "sensor_id",
- type=SensorIdField(),
- callback=lambda ctx, param, value: value.id if value else None,
- required=False,
- help="The sensor ID to filter by.",
- )
- @click.option(
- "--file",
- type=click.Path(),
- default="last_jobs.csv",
- help="The CSV file to save the found jobs.",
- )
- def save_last(
- n: int,
- queue_name: str,
- registry_name: str,
- asset_id: int | None,
- sensor_id: int | None,
- file: str,
- ):
- """
- Save the last n jobs to a file (by default, the last 10 failed jobs).
- """
- available_queues = app.queues
- if queue_name not in available_queues.keys():
- click.secho(
- f"Unknown queue '{queue_name}'. Available queues: {available_queues.keys()}",
- **MsgStyle.ERROR,
- )
- raise click.Abort()
- else:
- queue = available_queues[queue_name]
- registry = REGISTRY_MAP[registry_name](queue=queue)
- job_ids = registry.get_job_ids()[-n:]
- found_jobs = []
- for job_id in job_ids:
- try:
- job = Job.fetch(job_id, connection=queue.connection)
- kwargs = job.kwargs or {}
- entity_info = kwargs.get("asset_or_sensor", {})
- if (
- (not asset_id and not sensor_id)
- or (
- entity_info.get("class") == "Asset"
- and entity_info.get("id") == asset_id
- )
- or (
- entity_info.get("class") == "Sensor"
- and entity_info.get("id") == sensor_id
- )
- ):
- found_jobs.append(
- {
- "Job ID": job.id,
- "ID": entity_info.get("id", "N/A"),
- "Class": entity_info.get("class", "N/A"),
- "Error": job.exc_info,
- "All kwargs": kwargs,
- "Function name": getattr(job, "func_name", "N/A"),
- "Started at": getattr(job, "started_at", "N/A"),
- "Ended at": getattr(job, "ended_at", "N/A"),
- }
- )
- except Exception as e:
- click.secho(
- f"Job {job_id} failed to fetch with error: {str(e)}", fg="yellow"
- )
- if found_jobs:
- if os.path.exists(file):
- if not click.confirm(f"{file} already exists. Overwrite?", default=False):
- new_file = click.prompt(
- "Enter a new filename (must end with .csv)", type=str
- )
- while not new_file.lower().endswith(".csv"):
- click.secho("Invalid filename. It must end with .csv.", fg="red")
- new_file = click.prompt(
- "Enter a new filename (must end with .csv)", type=str
- )
- file = new_file
- # Save the found jobs to a CSV file
- pd.DataFrame(found_jobs).sort_values("Started at", ascending=False).to_csv(
- file, index=False
- )
- click.secho(
- f"Saved {len(found_jobs)} {registry_name} jobs to {file}.", fg="green"
- )
- return
- elif asset_id:
- filter_message = f" for asset {asset_id} among the last {n} jobs"
- elif sensor_id:
- filter_message = f" for sensor {sensor_id} among the last {n} jobs"
- else:
- filter_message = ""
- click.secho(f"No {registry_name} jobs found{filter_message}.", fg="yellow")
- @fm_jobs.command("clear-queue")
- @with_appcontext
- @click.option(
- "--queue",
- default=None,
- required=True,
- help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
- )
- @click.option(
- "--deferred",
- is_flag=True,
- default=False,
- help="If True, the deferred registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
- )
- @click.option(
- "--scheduled",
- is_flag=True,
- default=False,
- help="If True, the scheduled registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
- )
- @click.option(
- "--failed",
- is_flag=True,
- default=False,
- help="If True, the failed registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
- )
- def clear_queue(queue: str, deferred: bool, scheduled: bool, failed: bool):
- """
- Clear a job queue (or its registry of deferred/scheduled/failed jobs).
- We use the app context to find out which redis queues to use.
- """
- q_list = parse_queue_list(queue)
- registries = dict(
- deferred=("deferred_job_registry", deferred),
- scheduled=("scheduled_job_registry", scheduled),
- failed=("failed_job_registry", failed),
- )
- configure_mappers()
- for the_queue in q_list:
- for _type, (registry, needs_clearing) in registries.items():
- if needs_clearing:
- reg = getattr(the_queue, registry)
- count_before = reg.count
- for job_id in reg.get_job_ids():
- reg.remove(job_id) # not actually deleting the job
- count_after = reg.count
- click.secho(
- f"Cleared {count_before - count_after} {_type} jobs from the {registry} at {the_queue}.",
- **MsgStyle.WARN,
- )
- wrap_up_message(count_after)
- if not any([deferred, scheduled, failed]):
- count_before = the_queue.count
- if count_before > 0:
- the_queue.empty()
- count_after = the_queue.count
- click.secho(
- f"Cleared {count_before - count_after} jobs from {the_queue}.",
- **MsgStyle.SUCCESS,
- )
- wrap_up_message(count_after)
- @fm_jobs.command("delete-queue")
- @with_appcontext
- @click.option(
- "--queue",
- default=None,
- required=True,
- help="State which queue to delete.",
- )
- def delete_queue(queue: str):
- """
- Delete a job queue.
- """
- if not app.redis_connection.sismember("rq:queues", f"rq:queue:{queue}"):
- click.secho(
- f"Queue '{queue}' does not exist.",
- **MsgStyle.ERROR,
- )
- raise click.Abort()
- success = app.redis_connection.srem("rq:queues", f"rq:queue:{queue}")
- if success:
- click.secho(
- f"Queue '{queue}' removed.",
- **MsgStyle.SUCCESS,
- )
- else:
- click.secho(
- f"Failed to remove queue '{queue}'.",
- **MsgStyle.ERROR,
- )
- raise click.Abort()
- def wrap_up_message(count_after: int):
- if count_after > 0:
- click.secho(
- f"There are {count_after} jobs which could not be removed for some reason.",
- **MsgStyle.WARN,
- )
- else:
- click.echo("No jobs left.")
- def handle_worker_exception(
- job: Job,
- exc_type: Type[Exception],
- exc_value: Exception,
- traceback: TracebackType,
- ) -> None:
- """
- Just a fallback, usually we would use the per-queue handler.
- """
- queue_name = job.origin
- click.echo(f"HANDLING RQ {queue_name.upper()} EXCEPTION: {exc_type}: {exc_value}")
- job.meta["exception"] = str(exc_value) # meta must contain JSON serializable data
- job.save_meta()
- def parse_queue_list(queue_names_str: str) -> list[Queue]:
- """Parse a | separated string of queue names against the app.queues dict.
- The app.queues dict is expected to have queue names as keys, and rq.Queue objects as values.
- :param queue_names_str: a string with queue names separated by the | character
- :returns: a list of Queue objects.
- """
- q_list = []
- for q_name in queue_names_str.split("|"):
- if q_name in app.queues:
- q_list.append(app.queues[q_name])
- else:
- raise ValueError(f"Unknown queue '{q_name}'.")
- return q_list
- app.cli.add_command(fm_jobs)
|