implementations.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from datetime import datetime, timezone
  2. import time
  3. from flask import request, current_app
  4. from flask_json import as_json
  5. from sqlalchemy import exc as sqla_exc, select
  6. from flexmeasures.data import db
  7. from flexmeasures.data.models.task_runs import LatestTaskRun
  8. from flexmeasures.auth.error_handling import UNAUTH_STATUS_CODE, FORBIDDEN_STATUS_CODE
  9. @as_json
  10. def ping():
  11. return dict(message="ok"), 200
  12. @as_json
  13. def get_task_run():
  14. """
  15. Get latest task runs.
  16. This endpoint returns output conforming to the task monitoring tool (bobbydams/py-pinger)
  17. """
  18. task_name: str = request.args.get("name", "")
  19. def make_response(status: str, reason: str, last_run: datetime) -> dict:
  20. return dict(
  21. status=status,
  22. reason=reason,
  23. lastrun=last_run.isoformat(),
  24. frequency=current_app.config.get(
  25. "MONITOR_FREQUENCY_%s" % task_name.upper(), 10
  26. ),
  27. process="FlexMeasures",
  28. server=current_app.config.get("FLEXMEASURES_MODE", ""),
  29. )
  30. # check auth token
  31. token_name = current_app.config.get("SECURITY_TOKEN_AUTHENTICATION_HEADER")
  32. token = current_app.config.get("FLEXMEASURES_TASK_CHECK_AUTH_TOKEN", "")
  33. if token_name not in request.headers:
  34. return (
  35. make_response(
  36. "ERROR", "Not authenticated to check task status.", datetime(1970, 1, 1)
  37. ),
  38. UNAUTH_STATUS_CODE,
  39. )
  40. if request.headers.get(token_name) != token:
  41. return (
  42. make_response(
  43. "ERROR", "Not authorized to check task status.", datetime(1970, 1, 1)
  44. ),
  45. FORBIDDEN_STATUS_CODE,
  46. )
  47. if task_name is None or task_name == "":
  48. return make_response("ERROR", "No task name given.", datetime(1970, 1, 1)), 400
  49. try:
  50. last_known_run = db.session.scalars(
  51. select(LatestTaskRun).filter(LatestTaskRun.name == task_name).limit(1)
  52. ).first()
  53. except (sqla_exc.ResourceClosedError, sqla_exc.DatabaseError):
  54. # This is an attempt to make this more stable against some rare condition we encounter. Let's try once more.
  55. time.sleep(2)
  56. last_known_run = db.session.scalars(
  57. select(LatestTaskRun).filter(LatestTaskRun.name == task_name).limit(1)
  58. ).first()
  59. if not last_known_run:
  60. return (
  61. make_response(
  62. "ERROR",
  63. "Task %s has no last run time." % task_name,
  64. datetime(1970, 1, 1),
  65. ),
  66. 404,
  67. )
  68. last_status = "OK" if last_known_run.status else "ERROR"
  69. return make_response(last_status, "", last_known_run.datetime), 200
  70. @as_json
  71. def post_task_run():
  72. """
  73. Post that a task has been (attempted to) run.
  74. Form fields to send in: name: str, status: bool [defaults to True], datetime: datetime [defaults to now]
  75. """
  76. task_name = request.form.get("name", "")
  77. if task_name == "":
  78. return {"status": "ERROR", "reason": "No task name given."}, 400
  79. date_time = request.form.get("datetime", datetime.now(timezone.utc))
  80. status = request.form.get("status", "True") == "True"
  81. try:
  82. task_run = db.session.execute(
  83. select(LatestTaskRun).filter(LatestTaskRun.name == task_name)
  84. ).scalar_one_or_none()
  85. if task_run is None:
  86. task_run = LatestTaskRun(name=task_name)
  87. db.session.add(task_run)
  88. task_run.datetime = date_time
  89. task_run.status = status
  90. except Exception as e:
  91. return {"status": "ERROR", "reason": str(e)}, 500
  92. return {"status": "OK"}, 200