Skip to content

Commit

Permalink
Enable individual trigger logging (#27758)
Browse files Browse the repository at this point in the history
FileTaskHandler is updated so that it can be used by triggerer job to write messages from distinct triggers such that they are visible in web UI task logs.  Messages from all relevant sources are merged and interleaved in the same task log pane.

Due to variation in FileTaskHandler implementations, we needed some extra params set so triggerer job can use them appropriately.  For some we need to use QueueListener pattern, for some we need to put many instances (one per trigger) behind a wrapper class.  For this purpose we introduce some config params you can set on handler instance or class.  Context vars are used so that handler can map messages to the correct TI.  And these changes to log handling in triggerer job are entirely disableable.

Also add log serving capability to triggerer service for live logs with non-streaming handlers.

Added a TaskReturnCode for signaling so that log messages can be more accurate when deferring.  And when resuming we now have better messaging i.e. "resuming from deferral".

New method in FileTaskHandler which is for reading remote logs.  It's called along with other sources in _read and interleaved there.  Meta messages and actual log content are now separated so that messages are displayed at top after interleaving.

For interleaving we need to parse log timestamp so we make this pluggable in case user formatter is weird.

TI.next_kwargs is used to infer "has deferred" for "resuming from deferral messaging".

TabWithTooltip class is added in web UI js. It can be used for adding tooltips for tabs. We didn't end up needing it, but Brent went through the trouble to write it and it may come in handy.

In the chart, we need triggerer to be a service (instead of deployment) in order for logs serving to work out of the box.  This is only enabled when airflowVersion in your chart values is >= 2.6.0.

---------

Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
  • Loading branch information
4 people committed Feb 4, 2023
1 parent 8338926 commit 1b18a50
Show file tree
Hide file tree
Showing 54 changed files with 2,390 additions and 488 deletions.
8 changes: 6 additions & 2 deletions airflow/api_connexion/endpoints/log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from flask import Response, request
from itsdangerous.exc import BadSignature
from itsdangerous.url_safe import URLSafeSerializer
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.session import Session

from airflow.api_connexion import security
Expand Down Expand Up @@ -73,9 +74,10 @@ def get_log(
metadata["download_logs"] = False

task_log_reader = TaskLogReader()

if not task_log_reader.supports_read:
raise BadRequest("Task log handler does not support read logs.")
ti = (
query = (
session.query(TaskInstance)
.filter(
TaskInstance.task_id == task_id,
Expand All @@ -84,8 +86,10 @@ def get_log(
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
.one_or_none()
.options(joinedload("trigger"))
.options(joinedload("trigger.triggerer_job"))
)
ti = query.one_or_none()
if ti is None:
metadata["end_of_log"] = True
raise NotFound(title="TaskInstance not found")
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,7 @@ class GroupCommand(NamedTuple):
ARG_LOG_FILE,
ARG_CAPACITY,
ARG_VERBOSE,
ARG_SKIP_SERVE_LOGS,
),
),
ActionCommand(
Expand Down
30 changes: 19 additions & 11 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.operator import needs_expansion
from airflow.models.taskinstance import TaskReturnCode
from airflow.settings import IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
Expand All @@ -58,6 +59,7 @@
suppress_logs_and_warning,
)
from airflow.utils.dates import timezone
from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
from airflow.utils.log.logging_mixin import StreamLogWriter
from airflow.utils.log.secrets_masker import RedactedIO
from airflow.utils.net import get_hostname
Expand Down Expand Up @@ -182,7 +184,7 @@ def _get_ti(
return ti, dr_created


def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode:
"""
Runs the task based on a mode.
Expand All @@ -193,11 +195,11 @@ def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
- by executor
"""
if args.local:
_run_task_by_local_task_job(args, ti)
return _run_task_by_local_task_job(args, ti)
elif args.raw:
_run_raw_task(args, ti)
return _run_raw_task(args, ti)
else:
_run_task_by_executor(args, dag, ti)
return _run_task_by_executor(args, dag, ti)


def _run_task_by_executor(args, dag, ti):
Expand Down Expand Up @@ -239,7 +241,7 @@ def _run_task_by_executor(args, dag, ti):
executor.end()


def _run_task_by_local_task_job(args, ti):
def _run_task_by_local_task_job(args, ti) -> TaskReturnCode | None:
"""Run LocalTaskJob, which monitors the raw task execution process."""
run_job = LocalTaskJob(
task_instance=ti,
Expand All @@ -254,11 +256,14 @@ def _run_task_by_local_task_job(args, ti):
external_executor_id=_extract_external_executor_id(args),
)
try:
run_job.run()
ret = run_job.run()

finally:
if args.shut_down_logging:
logging.shutdown()
with suppress(ValueError):
return TaskReturnCode(ret)
return None


RAW_TASK_UNSUPPORTED_OPTION = [
Expand All @@ -269,9 +274,9 @@ def _run_task_by_local_task_job(args, ti):
]


def _run_raw_task(args, ti: TaskInstance) -> None:
def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
"""Runs the main task handling code."""
ti._run_raw_task(
return ti._run_raw_task(
mark_success=args.mark_success,
job_id=args.job_id,
pool=args.pool,
Expand Down Expand Up @@ -407,18 +412,21 @@ def task_run(args, dag=None):
# this should be last thing before running, to reduce likelihood of an open session
# which can cause trouble if running process in a fork.
settings.reconfigure_orm(disable_connection_pool=True)

task_return_code = None
try:
if args.interactive:
_run_task_by_selected_method(args, dag, ti)
task_return_code = _run_task_by_selected_method(args, dag, ti)
else:
with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
_run_task_by_selected_method(args, dag, ti)
task_return_code = _run_task_by_selected_method(args, dag, ti)
if task_return_code == TaskReturnCode.DEFERRED:
_set_task_deferred_context_var()
finally:
try:
get_listener_manager().hook.before_stopping(component=TaskCommandMarker())
except Exception:
pass
return task_return_code


@cli_utils.action_cli(check_db=False)
Expand Down
29 changes: 25 additions & 4 deletions airflow/cli/commands/triggerer_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,35 @@
from __future__ import annotations

import signal
from contextlib import contextmanager
from functools import partial
from multiprocessing import Process
from typing import Generator

import daemon
from daemon.pidfile import TimeoutPIDLockFile

from airflow import settings
from airflow.configuration import conf
from airflow.jobs.triggerer_job import TriggererJob
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
from airflow.utils.serve_logs import serve_logs


@contextmanager
def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]:
"""Starts serve_logs sub-process"""
sub_proc = None
if skip_serve_logs is False:
port = conf.getint("logging", "trigger_log_server_port", fallback=8794)
sub_proc = Process(target=partial(serve_logs, port=port))
sub_proc.start()
try:
yield
finally:
if sub_proc:
sub_proc.terminate()


@cli_utils.action_cli
Expand All @@ -44,18 +65,18 @@ def triggerer(args):
stdout_handle.truncate(0)
stderr_handle.truncate(0)

ctx = daemon.DaemonContext(
daemon_context = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
with daemon_context, _serve_logs(args.skip_serve_logs):
job.run()

else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
job.run()
with _serve_logs(args.skip_serve_logs):
job.run()
18 changes: 18 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,24 @@ logging:
type: string
example: ~
default: "8793"
trigger_log_server_port:
description: |
Port to serve logs from for triggerer. See worker_log_server_port description
for more info.
version_added: 2.6.0
type: string
example: ~
default: "8794"
interleave_timestamp_parser:
description: |
We must parse timestamps to interleave logs between trigger and task. To do so,
we need to parse timestamps in log files. In case your log format is non-standard,
you may provide import path to callable which takes a string log line and returns
the timestamp (datetime.datetime compatible).
version_added: 2.6.0
type: string
example: path.to.my_func
default: ~
metrics:
description: |
StatsD (https://github.com/etsy/statsd) integration settings.
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,17 @@ extra_logger_names =
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# Port to serve logs from for triggerer. See worker_log_server_port description
# for more info.
trigger_log_server_port = 8794

# We must parse timestamps to interleave logs between trigger and task. To do so,
# we need to parse timestamps in log files. In case your log format is non-standard,
# you may provide import path to callable which takes a string log line and returns
# the timestamp (datetime.datetime compatible).
# Example: interleave_timestamp_parser = path.to.my_func
# interleave_timestamp_parser =

[metrics]

# StatsD (https://github.com/etsy/statsd) integration settings.
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_time_delta_sensor_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
catchup=False,
tags=["example"],
) as dag:
wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10))
wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=30))
finish = EmptyOperator(task_id="finish")
wait >> finish
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,15 @@ def execute_async(
"""
raise NotImplementedError()

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
"""
This method can be implemented by any child class to return the task logs.
:param ti: A TaskInstance object
:param log: log str
:return: logs or tuple of logs and meta dict
"""
return [], []

def end(self) -> None: # pragma: no cover
"""Wait synchronously for the previously submitted job to complete."""
Expand Down
6 changes: 3 additions & 3 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ def queue_task_instance(
cfg_path=cfg_path,
)

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
"""Fetch task log from Kubernetes executor"""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, log=log)
return None
return self.kubernetes_executor.get_task_log(ti=ti)
return [], []

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Expand Down
19 changes: 9 additions & 10 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,14 +781,16 @@ def _get_pod_namespace(ti: TaskInstance):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")

def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:

def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
messages = []
log = []
try:
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodGenerator

client = get_kube_client()

log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
messages.append(f"Trying to get logs (last 100 lines) from worker pod {ti.hostname}")
selector = PodGenerator.build_selector_for_k8s_executor_pod(
dag_id=ti.dag_id,
task_id=ti.task_id,
Expand Down Expand Up @@ -816,13 +818,10 @@ def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict
)

for line in res:
log += line.decode()

return log

except Exception as f:
log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
return log, {"end_of_log": True}
log.append(line.decode())
except Exception as e:
messages.append(f"Reading from k8s pod logs failed: {str(e)}")
return messages, ["\n".join(log)]

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
Expand Down
7 changes: 3 additions & 4 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,11 @@ def queue_task_instance(
cfg_path=cfg_path,
)

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
"""Fetch task log from kubernetes executor"""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, log=log)

return None
return self.kubernetes_executor.get_task_log(ti=ti)
return [], []

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Expand Down
4 changes: 3 additions & 1 deletion airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,15 @@ def run(self):
"""Starts the job."""
Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
# Adding an entry in the DB
ret = None
with create_session() as session:
self.state = State.RUNNING
session.add(self)
session.commit()
make_transient(self)

try:
self._execute()
ret = self._execute()
# In case of max runs or max duration
self.state = State.SUCCESS
except SystemExit:
Expand All @@ -272,6 +273,7 @@ def run(self):
session.commit()

Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
return ret

def _execute(self):
raise NotImplementedError("This method needs to be overridden")

0 comments on commit 1b18a50

Please sign in to comment.