Skip to content

Unclosed aiohttp ClientSession and TCPConnector in DatabricksRunNowOperator with deferrable=True (Airflow 3.0.2, Databricks Provider 7.4.0) #51910

Closed
@albertwangnz

Description

@albertwangnz

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

7.4.0

Apache Airflow version

3.0.2

Operating System

Ubuntu 24.04.2 LTS

Deployment

Virtualenv installation

Deployment details

  • Deployment Type: Virtualenv installation
  • Operating System: Ubuntu 24.04.2 LTS
  • Python Version: 3.12.3
  • Airflow Version: 3.0.2
  • Databricks Provider Version: 7.4.0
  • Database Backend: PostgreSQL 16
  • Secrets Backend: Microsoft Azure Key Vault
  • Authentication: Flask AppBuilder (FAB) with Microsoft Entra ID (SSO)
  • SSL Configuration: Enabled with custom certificates
  • Timezone: Pacific/Auckland
  • Airflow Services Management: systemd unit files for api-server, scheduler, dag-processor, and triggerer
  • Custom Configuration Highlights:
    • Airflow configuration (airflow.cfg) includes:
      • sql_alchemy_conn_secret for DB connection string
      • Azure Key Vault integration for secrets
      • SSL cert/key paths
      • FAB auth manager
    • Environment variables for Azure credentials (AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET)
    • Custom webserver_config.py for SSO
    • Firewall configured to allow port 8443

Installation followed a manual setup process using a Python virtual environment and systemd for service orchestration. All dependencies were installed using pip with Airflow constraints for version compatibility.

What happened

While running a DAG using DatabricksRunNowOperator with deferrable=True in Airflow 3.0.2 and Databricks Provider 7.4.0, the task successfully triggered a Databricks job and completed with a SUCCESS state. However, during the execution, the Airflow logs showed repeated warnings and errors related to unclosed aiohttp client sessions and connectors:

[2025-06-19, 02:21:23] ERROR - Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: source="asyncio"
[2025-06-19, 02:21:23] ERROR - Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7784a33c6990>, 341232.543125349)])']
connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: source="asyncio"

These errors appeared shortly after the task was deferred and while the DatabricksExecutionTrigger was polling the job status. Despite the job completing successfully, the presence of these warnings suggests a potential resource leak or improper cleanup of async HTTP sessions in the provider code.

This issue occurred in a production-like environment using:

  • Airflow 3.0.2 (virtualenv installation)
  • Python 3.12.3
  • Ubuntu 24.04.2 LTS
  • Databricks Provider 7.4.0
  • PostgreSQL 16 as backend
  • Azure Key Vault as secrets backend

No custom modifications were made to the operator or trigger logic. The DAG was manually triggered and ran without retries.

What you think should happen instead

The task completes successfully, but the error messages in the logs indicates improper resource management in the Databricks provider:

[2025-06-19, 02:21:23] ERROR - Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: source="asyncio"
[2025-06-19, 02:21:23] ERROR - Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7784a33c6990>, 341232.543125349)])']
connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: source="asyncio"

These errors suggest that the aiohttp.ClientSession and TCPConnector objects used during the async polling in DatabricksExecutionTrigger are not being properly closed or garbage collected. This could lead to memory leaks or degraded performance over time, especially in long-running Airflow environments.

Since the operator is marked as deferrable=True, it should follow best practices for async resource cleanup. I expect that:

  • All async HTTP sessions and connectors are properly closed after use.
  • No warnings or errors related to unclosed resources appear in the logs.
  • The operator and trigger behave cleanly and efficiently without leaving behind dangling resources.

This behavior is erroneous because it violates expected resource lifecycle management in asynchronous Python code and could impact system stability.

How to reproduce

To reproduce the issue, follow these steps in a clean Airflow 3.0.2 environment with the Databricks Provider version 7.4.0:

  1. Set up Airflow:

    • Install Airflow 3.0.2 in a Python 3.12 virtual environment.
    • Use PostgreSQL 16 as the metadata database.
    • Configure Airflow to use Microsoft Azure Key Vault as the secrets backend.
    • Enable SSL and set up Flask AppBuilder (FAB) authentication.
  2. Create a DAG with the following characteristics:

    • Uses DatabricksRunNowOperator
    • Sets deferrable=True
    • Specifies a valid job_id for a Databricks job
    • Uses a valid databricks_conn_id pointing to an Azure Databricks workspace

My DAG snippet:

from datetime import datetime, timedelta

from airflow.models import DAG

from airflow.providers.databricks.operators.databricks import (
    DatabricksRunNowOperator,
)

with DAG(
    dag_id="sit_trigger_databricks_job_run",
    description="System Integration Testing - Trigger a Databricks job run",
    start_date=datetime.now() - timedelta(days=1),
    schedule=None,  # timedelta(days=1),
    catchup=False,
    default_args={
        "retries": 0,
        "retry_delay": timedelta(minutes=1),
        "databricks_conn_id": "databricks-default2",
    },
    tags=["sit", "databricks"],
) as dag:

    trigger_databricks_job_run = DatabricksRunNowOperator(
        task_id="trigger_databricks_job_run",
        job_id=604918372746183,
        deferrable=True,
        execution_timeout=timedelta(hours=3),
    )
  1. Trigger the DAG manually via the Airflow UI or CLI.

  2. Observe the logs of the task. You will see the following error messages even though the job completes successfully:

    ERROR - Unclosed client session
    ERROR - Unclosed connector
    

This issue is reproducible consistently in environments using async deferrable operators with the Databricks provider.

Anything else

This issue occurs every time the DAG is triggered using DatabricksRunNowOperator with deferrable=True. The error messages consistently appear during the polling phase managed by DatabricksExecutionTrigger.

Here are the relevant full log:

[2025-06-19, 02:21:16] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-19, 02:21:16] INFO - Filling up the DagBag from /home/airflow/airflow/dags/sit_trigger_databricks_job_run.py: source="airflow.models.dagbag.DagBag"
[2025-06-19, 02:21:16] INFO - Environment is configured for ClientSecretCredential: source="azure.identity._credentials.environment"
[2025-06-19, 02:21:16] INFO - ManagedIdentityCredential will use IMDS with client_id: 3a9f1c84-7b3d-4e2a-a9d1-8f6c3e2b7f90: source="azure.identity._credentials.managed_identity"
[2025-06-19, 02:21:17] INFO - DefaultAzureCredential acquired a token from EnvironmentCredential: source="azure.identity._credentials.chained"
[2025-06-19, 02:21:17] INFO - Connection Retrieved 'databricks-default2': source="airflow.hooks.base"
[2025-06-19, 02:21:17] INFO - Existing AAD token is expired, or going to expire soon. Refreshing...: source="airflow.task.hooks.airflow.providers.databricks.hooks.databricks.DatabricksHook"
[2025-06-19, 02:21:18] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-19, 02:21:19] INFO - Run submitted with run_id: 748193847562019: source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator"
[2025-06-19, 02:21:19] INFO - View run status, Spark UI, and logs at https://adb-8274910385627419.47.azuredatabricks.net/?o=9182736450192837#job/604918372746183/run/748193847562019: source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator"
[2025-06-19, 02:21:20] INFO - Pausing task as DEFERRED. : dag_id="sit_trigger_databricks_job_run": task_id="trigger_databricks_job_run": run_id="manual__2025-06-19T02:21:11.927855+00:00": source="task"
[2025-06-19, 02:21:20] INFO - trigger sit_trigger_databricks_job_run/manual__2025-06-19T02:21:11.927855+00:00/trigger_databricks_job_run/-1/1 (ID 15) starting
[2025-06-19, 02:21:20] INFO - Environment is configured for ClientSecretCredential: source="azure.identity._credentials.environment"
[2025-06-19, 02:21:20] INFO - ManagedIdentityCredential will use IMDS with client_id: 3a9f1c84-7b3d-4e2a-a9d1-8f6c3e2b7f90: source="azure.identity._credentials.managed_identity"
[2025-06-19, 02:21:21] INFO - DefaultAzureCredential acquired a token from EnvironmentCredential: source="azure.identity._credentials.chained"
[2025-06-19, 02:21:23] INFO - Connection Retrieved 'databricks-default2': source="airflow.hooks.base"
[2025-06-19, 02:21:23] INFO - Existing AAD token is expired, or going to expire soon. Refreshing...: source="airflow.task.hooks.airflow.providers.databricks.hooks.databricks.DatabricksHook"
[2025-06-19, 02:21:23] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity.aio._internal.get_token_mixin"
[2025-06-19, 02:21:23] ERROR - Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: source="asyncio"
[2025-06-19, 02:21:23] ERROR - Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7784a33c6990>, 341232.543125349)])']
connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: source="asyncio"
[2025-06-19, 02:21:23] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:21:54] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:22:24] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:22:54] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:23:25] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:23:55] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:24:26] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:24:56] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:25:26] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:25:56] INFO - Trigger fired event: name="sit_trigger_databricks_job_run/manual__2025-06-19T02:21:11.927855+00:00/trigger_databricks_job_run/-1/1 (ID 15)": result="TriggerEvent<{'run_id': 748193847562019, 'run_page_url': 'https://adb-8274910385627419.47.azuredatabricks.net/?o=9182736450192837#job/604918372746183/run/748193847562019', 'run_state': '{\"life_cycle_state\": \"TERMINATED\", \"result_state\": \"SUCCESS\", \"state_message\": \"\"}', 'repair_run': False, 'errors': []}>"

These errors appear shortly after the task is deferred and while the trigger is actively polling the Databricks job status. The job itself completes successfully, but the logs indicate a potential issue with async resource cleanup.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions