Description
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
7.4.0
Apache Airflow version
3.0.2
Operating System
Ubuntu 24.04.2 LTS
Deployment
Virtualenv installation
Deployment details
- Deployment Type: Virtualenv installation
- Operating System: Ubuntu 24.04.2 LTS
- Python Version: 3.12.3
- Airflow Version: 3.0.2
- Databricks Provider Version: 7.4.0
- Database Backend: PostgreSQL 16
- Secrets Backend: Microsoft Azure Key Vault
- Authentication: Flask AppBuilder (FAB) with Microsoft Entra ID (SSO)
- SSL Configuration: Enabled with custom certificates
- Timezone: Pacific/Auckland
- Airflow Services Management: systemd unit files for
api-server
,scheduler
,dag-processor
, andtriggerer
- Custom Configuration Highlights:
- Airflow configuration (
airflow.cfg
) includes:sql_alchemy_conn_secret
for DB connection string- Azure Key Vault integration for secrets
- SSL cert/key paths
- FAB auth manager
- Environment variables for Azure credentials (
AZURE_CLIENT_ID
,AZURE_TENANT_ID
,AZURE_CLIENT_SECRET
) - Custom
webserver_config.py
for SSO - Firewall configured to allow port 8443
- Airflow configuration (
Installation followed a manual setup process using a Python virtual environment and systemd for service orchestration. All dependencies were installed using pip with Airflow constraints for version compatibility.
What happened
While running a DAG using DatabricksRunNowOperator
with deferrable=True
in Airflow 3.0.2 and Databricks Provider 7.4.0, the task successfully triggered a Databricks job and completed with a SUCCESS
state. However, during the execution, the Airflow logs showed repeated warnings and errors related to unclosed aiohttp
client sessions and connectors:
[2025-06-19, 02:21:23] ERROR - Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: source="asyncio"
[2025-06-19, 02:21:23] ERROR - Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7784a33c6990>, 341232.543125349)])']
connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: source="asyncio"
These errors appeared shortly after the task was deferred and while the DatabricksExecutionTrigger
was polling the job status. Despite the job completing successfully, the presence of these warnings suggests a potential resource leak or improper cleanup of async HTTP sessions in the provider code.
This issue occurred in a production-like environment using:
- Airflow 3.0.2 (virtualenv installation)
- Python 3.12.3
- Ubuntu 24.04.2 LTS
- Databricks Provider 7.4.0
- PostgreSQL 16 as backend
- Azure Key Vault as secrets backend
No custom modifications were made to the operator or trigger logic. The DAG was manually triggered and ran without retries.
What you think should happen instead
The task completes successfully, but the error messages in the logs indicates improper resource management in the Databricks provider:
[2025-06-19, 02:21:23] ERROR - Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: source="asyncio"
[2025-06-19, 02:21:23] ERROR - Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7784a33c6990>, 341232.543125349)])']
connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: source="asyncio"
These errors suggest that the aiohttp.ClientSession
and TCPConnector
objects used during the async polling in DatabricksExecutionTrigger
are not being properly closed or garbage collected. This could lead to memory leaks or degraded performance over time, especially in long-running Airflow environments.
Since the operator is marked as deferrable=True
, it should follow best practices for async resource cleanup. I expect that:
- All async HTTP sessions and connectors are properly closed after use.
- No warnings or errors related to unclosed resources appear in the logs.
- The operator and trigger behave cleanly and efficiently without leaving behind dangling resources.
This behavior is erroneous because it violates expected resource lifecycle management in asynchronous Python code and could impact system stability.
How to reproduce
To reproduce the issue, follow these steps in a clean Airflow 3.0.2 environment with the Databricks Provider version 7.4.0:
-
Set up Airflow:
- Install Airflow 3.0.2 in a Python 3.12 virtual environment.
- Use PostgreSQL 16 as the metadata database.
- Configure Airflow to use Microsoft Azure Key Vault as the secrets backend.
- Enable SSL and set up Flask AppBuilder (FAB) authentication.
-
Create a DAG with the following characteristics:
- Uses
DatabricksRunNowOperator
- Sets
deferrable=True
- Specifies a valid
job_id
for a Databricks job - Uses a valid
databricks_conn_id
pointing to an Azure Databricks workspace
- Uses
My DAG snippet:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
)
with DAG(
dag_id="sit_trigger_databricks_job_run",
description="System Integration Testing - Trigger a Databricks job run",
start_date=datetime.now() - timedelta(days=1),
schedule=None, # timedelta(days=1),
catchup=False,
default_args={
"retries": 0,
"retry_delay": timedelta(minutes=1),
"databricks_conn_id": "databricks-default2",
},
tags=["sit", "databricks"],
) as dag:
trigger_databricks_job_run = DatabricksRunNowOperator(
task_id="trigger_databricks_job_run",
job_id=604918372746183,
deferrable=True,
execution_timeout=timedelta(hours=3),
)
-
Trigger the DAG manually via the Airflow UI or CLI.
-
Observe the logs of the task. You will see the following error messages even though the job completes successfully:
ERROR - Unclosed client session ERROR - Unclosed connector
This issue is reproducible consistently in environments using async deferrable operators with the Databricks provider.
Anything else
This issue occurs every time the DAG is triggered using DatabricksRunNowOperator
with deferrable=True
. The error messages consistently appear during the polling phase managed by DatabricksExecutionTrigger
.
Here are the relevant full log:
[2025-06-19, 02:21:16] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-19, 02:21:16] INFO - Filling up the DagBag from /home/airflow/airflow/dags/sit_trigger_databricks_job_run.py: source="airflow.models.dagbag.DagBag"
[2025-06-19, 02:21:16] INFO - Environment is configured for ClientSecretCredential: source="azure.identity._credentials.environment"
[2025-06-19, 02:21:16] INFO - ManagedIdentityCredential will use IMDS with client_id: 3a9f1c84-7b3d-4e2a-a9d1-8f6c3e2b7f90: source="azure.identity._credentials.managed_identity"
[2025-06-19, 02:21:17] INFO - DefaultAzureCredential acquired a token from EnvironmentCredential: source="azure.identity._credentials.chained"
[2025-06-19, 02:21:17] INFO - Connection Retrieved 'databricks-default2': source="airflow.hooks.base"
[2025-06-19, 02:21:17] INFO - Existing AAD token is expired, or going to expire soon. Refreshing...: source="airflow.task.hooks.airflow.providers.databricks.hooks.databricks.DatabricksHook"
[2025-06-19, 02:21:18] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-19, 02:21:19] INFO - Run submitted with run_id: 748193847562019: source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator"
[2025-06-19, 02:21:19] INFO - View run status, Spark UI, and logs at https://adb-8274910385627419.47.azuredatabricks.net/?o=9182736450192837#job/604918372746183/run/748193847562019: source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator"
[2025-06-19, 02:21:20] INFO - Pausing task as DEFERRED. : dag_id="sit_trigger_databricks_job_run": task_id="trigger_databricks_job_run": run_id="manual__2025-06-19T02:21:11.927855+00:00": source="task"
[2025-06-19, 02:21:20] INFO - trigger sit_trigger_databricks_job_run/manual__2025-06-19T02:21:11.927855+00:00/trigger_databricks_job_run/-1/1 (ID 15) starting
[2025-06-19, 02:21:20] INFO - Environment is configured for ClientSecretCredential: source="azure.identity._credentials.environment"
[2025-06-19, 02:21:20] INFO - ManagedIdentityCredential will use IMDS with client_id: 3a9f1c84-7b3d-4e2a-a9d1-8f6c3e2b7f90: source="azure.identity._credentials.managed_identity"
[2025-06-19, 02:21:21] INFO - DefaultAzureCredential acquired a token from EnvironmentCredential: source="azure.identity._credentials.chained"
[2025-06-19, 02:21:23] INFO - Connection Retrieved 'databricks-default2': source="airflow.hooks.base"
[2025-06-19, 02:21:23] INFO - Existing AAD token is expired, or going to expire soon. Refreshing...: source="airflow.task.hooks.airflow.providers.databricks.hooks.databricks.DatabricksHook"
[2025-06-19, 02:21:23] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity.aio._internal.get_token_mixin"
[2025-06-19, 02:21:23] ERROR - Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: source="asyncio"
[2025-06-19, 02:21:23] ERROR - Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7784a33c6990>, 341232.543125349)])']
connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: source="asyncio"
[2025-06-19, 02:21:23] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:21:54] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:22:24] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:22:54] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:23:25] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:23:55] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:24:26] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:24:56] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:25:26] INFO - run-id 748193847562019 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. sleeping for 30 seconds: source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
[2025-06-19, 02:25:56] INFO - Trigger fired event: name="sit_trigger_databricks_job_run/manual__2025-06-19T02:21:11.927855+00:00/trigger_databricks_job_run/-1/1 (ID 15)": result="TriggerEvent<{'run_id': 748193847562019, 'run_page_url': 'https://adb-8274910385627419.47.azuredatabricks.net/?o=9182736450192837#job/604918372746183/run/748193847562019', 'run_state': '{\"life_cycle_state\": \"TERMINATED\", \"result_state\": \"SUCCESS\", \"state_message\": \"\"}', 'repair_run': False, 'errors': []}>"
These errors appear shortly after the task is deferred and while the trigger is actively polling the Databricks job status. The job itself completes successfully, but the logs indicate a potential issue with async resource cleanup.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct