Description
Apache Airflow Provider(s)
sftp, ssh
Versions of Apache Airflow Providers
apache-airflow-providers-sftp==5.3.0
apache-airflow-providers-ssh==4.1.0
Apache Airflow version
2.11.0
Operating System
RHEL 8.8 x86_64
Deployment
Virtualenv installation
Deployment details
The deployment is a standard Celery-based one, no other special configuration used.
What happened
The issue can be reproduced as follows:
- define a simple DAG that uses the SFTP sensor to check if a file exists (i.e. /etc/fstab on Linux)
- if the connection is direct, that is without any proxy in the middle, the sensor works well
- if the connection uses a network proxy, the sensor goes in error during execution.
What you think should happen instead
Investigating further this issue, we discovered that the version of the SFTP providers used (5.3.0) is using an auto connection closing logic, that is when no software action is ongoing the SFTP and the undergoing SSH connections are closed (providers/sftp/hook/sftp.py):
def get_managed_conn(self) -> Generator[SFTPClient, None, None]:
"""Context manager that closes the connection after use."""
if self._sftp_conn is None:
ssh_conn: SSHClient = super().get_conn()
self._ssh_conn = ssh_conn
self._sftp_conn = ssh_conn.open_sftp()
self._conn_count += 1
try:
yield self._sftp_conn
finally:
self._conn_count -= 1
if self._conn_count == 0 and self._ssh_conn is not None and self._sftp_conn is not None:
self._sftp_conn.close()
self._sftp_conn = None
self._ssh_conn.close()
self._ssh_conn = None
This closing action causes a drop of the active socket toward the proxy. When a new action is done within the same task, this additional action tries to recreate the connection as visible into the code above. For example, the sensor creates a list of files matching the pattern - it's the first action - then it tries to read the modification time for each of them - it's the second action. This time, because the proxy command is defined as a cached property (providers/ssh/hook/ssh.py):
@cached_property
def host_proxy(self) -> paramiko.ProxyCommand | None:
cmd = self.host_proxy_cmd
return paramiko.ProxyCommand(cmd) if cmd else None
the new connection tries to reuse something collected from the cache that is no more valid, because the proxy command into the cache was not invalidated. From this point on the error raises.
How to reproduce
Define a DAG having a sensor like the following:
check_remote_file_without_pattern = SFTPSensor(
task_id = 'check_remote_file_without_pattern',
sftp_conn_id = r'sftp-via-proxy',
path = r'/etc/fstab',
poke_interval = 1,
mode = r'poke',
retries = 0,
timeout = 1,
silent_fail = True,
soft_fail = True,
)
where the connection sftp-via-proxy point to an SFTP host mySftpHost using a network proxy, the following example uses the connect-proxy (from $HOME/.ssh/config):
Host mySftpHost
Hostname the-sftp.example.com
User airflow
PasswordAuthentication no
IdentityFile /home/mot/.ssh/id_ed25519
ProxyCommand=/usr/bin/connect-proxy -H 192.168.1.100:8888 %h %p
The observed error should be like the following:
[2025-06-25, 18:42:28 CEST] {transport.py:1944} ERROR - Exception (client): ProxyCommand("/usr/bin/connect-proxy -H 192.168.1.100:8888 mySftpHost 22") returned nonzero exit status: Broken pipe
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - Traceback (most recent call last):
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/proxy.py", line 79, in send
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - self.process.stdin.write(content)
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - BrokenPipeError: [Errno 32] Broken pipe
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - During handling of the above exception, another exception occurred:
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - Traceback (most recent call last):
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/transport.py", line 2180, in run
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - self.packetizer.write_all(b(self.local_version + "\r\n"))
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/packet.py", line 354, in write_all
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - n = self.__socket.send(out)
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - ^^^^^^^^^^^^^^^^^^^^^^^
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/proxy.py", line 85, in send
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - raise ProxyCommandFailure(" ".join(self.cmd), e.strerror)
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - paramiko.ssh_exception.ProxyCommandFailure: ProxyCommand("/usr/bin/connect-proxy -H 192.168.1.100:8888 mySftpHost 22") returned nonzero exit status: Broken pipe
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -
[2025-06-25, 18:42:28 CEST] {ssh.py:337} INFO - Failed to connect. Sleeping before retry attempt 1
Anything else
Two simple patches can be implemented to bypass this limit:
- Disabling the cached property using the connection's extra arguments
--- airflow-2.11.0.vanilla.rhel86/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py 2025-05-22 21:33:52.129132917 +0200
+++ airflow/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py 2025-06-25 19:23:10.059929361 +0200
@@ -143,6 +143,7 @@
self.allow_host_key_change = False
self.host_key = None
self.look_for_keys = True
+ self.proxy_command_cached = True
# Placeholder for future cached connection
self.client: paramiko.SSHClient | None = None
@@ -219,6 +220,14 @@
self.host_key = key_constructor(data=decoded_host_key)
self.no_host_key_check = False
+ if "auto_connection_closing" in extra_options:
+ if str(extra_options["auto_connection_closing"]).lower() == "false":
+ self.proxy_command_cached = False
+
+ if "proxy_command_cached" in extra_options:
+ if str(extra_options["proxy_command_cached"]).lower() == "false":
+ self.proxy_command_cached = False
+
if self.cmd_timeout is NOTSET:
self.cmd_timeout = CMD_TIMEOUT
@@ -260,6 +269,10 @@
cmd = self.host_proxy_cmd
return paramiko.ProxyCommand(cmd) if cmd else None
+ def host_proxy_not_cached(self) -> paramiko.ProxyCommand | None:
+ cmd = self.host_proxy_cmd
+ return paramiko.ProxyCommand(cmd) if cmd else None
+
def get_conn(self) -> paramiko.SSHClient:
"""Establish an SSH connection to the remote host."""
if self.client:
@@ -305,7 +318,7 @@
"timeout": self.conn_timeout,
"compress": self.compress,
"port": self.port,
- "sock": self.host_proxy,
+ "sock": self.host_proxy if self.proxy_command_cached is True else self.host_proxy_not_cached(),
"look_for_keys": self.look_for_keys,
"banner_timeout": self.banner_timeout,
"auth_timeout": self.auth_timeout,
- Disabling at all the auto connection closing, demanding the closing to the interpreter:
--- airflow-2.11.0.vanilla.rhel86/lib/python3.11/site-packages/airflow/providers/sftp/hooks/sftp.py 2025-05-22 21:33:52.346135325 +0200
+++ airflow/lib/python3.11/site-packages/airflow/providers/sftp/hooks/sftp.py 2025-06-25 19:23:23.946165171 +0200
@@ -115,6 +115,16 @@
self._ssh_conn: SSHClient | None = None
self._sftp_conn: SFTPClient | None = None
self._conn_count = 0
+ self._auto_conn_closing = False
+
+ if self.ssh_conn_id is not None:
+ conn = self.get_connection(self.ssh_conn_id)
+
+ if conn.extra is not None:
+ extra_options = conn.extra_dejson
+
+ if "auto_connection_closing" in extra_options and str(extra_options["auto_connection_closing"]).lower() == "true":
+ self._auto_conn_closing = True
super().__init__(*args, **kwargs)
@@ -133,7 +143,7 @@
@contextmanager
def get_managed_conn(self) -> Generator[SFTPClient, None, None]:
"""Context manager that closes the connection after use."""
- if self._sftp_conn is None:
+ if self._sftp_conn is None or self._ssh_conn is None:
ssh_conn: SSHClient = super().get_conn()
self._ssh_conn = ssh_conn
self._sftp_conn = ssh_conn.open_sftp()
@@ -143,11 +153,16 @@
yield self._sftp_conn
finally:
self._conn_count -= 1
- if self._conn_count == 0 and self._ssh_conn is not None and self._sftp_conn is not None:
- self._sftp_conn.close()
- self._sftp_conn = None
- self._ssh_conn.close()
- self._ssh_conn = None
+ if self._auto_conn_closing is False:
+ print(f"SFTP hook - The auto connection closing is disabled, currently {self._conn_count} clients remain")
+ pass
+ else:
+ print(f"SFTP hook - Automatically closing connections, currently {self._conn_count} clients remain")
+ if self._conn_count == 0 and self._ssh_conn is not None and self._sftp_conn is not None:
+ self._sftp_conn.close()
+ self._sftp_conn = None
+ self._ssh_conn.close()
+ self._ssh_conn = None
def get_conn_count(self) -> int:
"""Get the number of open connections."""
@@ -229,6 +244,18 @@
except OSError:
return False
The connection's extra arguments should be like follows:
{
[...],
"auto_connection_closing": "false",
"proxy_command_cached": "false"
}
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct