Skip to content

SFTP hook drops the connection to the proxy during operations #52289

Open
@Mottimo

Description

@Mottimo

Apache Airflow Provider(s)

sftp, ssh

Versions of Apache Airflow Providers

apache-airflow-providers-sftp==5.3.0
apache-airflow-providers-ssh==4.1.0

Apache Airflow version

2.11.0

Operating System

RHEL 8.8 x86_64

Deployment

Virtualenv installation

Deployment details

The deployment is a standard Celery-based one, no other special configuration used.

What happened

The issue can be reproduced as follows:

  • define a simple DAG that uses the SFTP sensor to check if a file exists (i.e. /etc/fstab on Linux)
  • if the connection is direct, that is without any proxy in the middle, the sensor works well
  • if the connection uses a network proxy, the sensor goes in error during execution.

What you think should happen instead

Investigating further this issue, we discovered that the version of the SFTP providers used (5.3.0) is using an auto connection closing logic, that is when no software action is ongoing the SFTP and the undergoing SSH connections are closed (providers/sftp/hook/sftp.py):

    def get_managed_conn(self) -> Generator[SFTPClient, None, None]:
        """Context manager that closes the connection after use."""
        if self._sftp_conn is None:
            ssh_conn: SSHClient = super().get_conn()
            self._ssh_conn = ssh_conn
            self._sftp_conn = ssh_conn.open_sftp()
        self._conn_count += 1

        try:
            yield self._sftp_conn
        finally:
            self._conn_count -= 1
            if self._conn_count == 0 and self._ssh_conn is not None and self._sftp_conn is not None:
                self._sftp_conn.close()
                self._sftp_conn = None
                self._ssh_conn.close()
                self._ssh_conn = None

This closing action causes a drop of the active socket toward the proxy. When a new action is done within the same task, this additional action tries to recreate the connection as visible into the code above. For example, the sensor creates a list of files matching the pattern - it's the first action - then it tries to read the modification time for each of them - it's the second action. This time, because the proxy command is defined as a cached property (providers/ssh/hook/ssh.py):

    @cached_property
    def host_proxy(self) -> paramiko.ProxyCommand | None:
        cmd = self.host_proxy_cmd
        return paramiko.ProxyCommand(cmd) if cmd else None

the new connection tries to reuse something collected from the cache that is no more valid, because the proxy command into the cache was not invalidated. From this point on the error raises.

How to reproduce

Define a DAG having a sensor like the following:

	check_remote_file_without_pattern = SFTPSensor(
		task_id = 'check_remote_file_without_pattern',
		sftp_conn_id = r'sftp-via-proxy',
		path = r'/etc/fstab',
		poke_interval = 1,
		mode = r'poke',
		retries = 0,
		timeout = 1,
		silent_fail = True,
		soft_fail = True,
	)

where the connection sftp-via-proxy point to an SFTP host mySftpHost using a network proxy, the following example uses the connect-proxy (from $HOME/.ssh/config):

Host mySftpHost
     Hostname the-sftp.example.com
     User airflow
     PasswordAuthentication no
     IdentityFile /home/mot/.ssh/id_ed25519
     ProxyCommand=/usr/bin/connect-proxy -H 192.168.1.100:8888 %h %p

The observed error should be like the following:

[2025-06-25, 18:42:28 CEST] {transport.py:1944} ERROR - Exception (client): ProxyCommand("/usr/bin/connect-proxy -H 192.168.1.100:8888 mySftpHost 22") returned nonzero exit status: Broken pipe
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - Traceback (most recent call last):
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -   File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/proxy.py", line 79, in send
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -     self.process.stdin.write(content)
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - BrokenPipeError: [Errno 32] Broken pipe
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - 
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - During handling of the above exception, another exception occurred:
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - 
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - Traceback (most recent call last):
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -   File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/transport.py", line 2180, in run
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -     self.packetizer.write_all(b(self.local_version + "\r\n"))
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -   File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/packet.py", line 354, in write_all
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -     n = self.__socket.send(out)
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -         ^^^^^^^^^^^^^^^^^^^^^^^
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -   File "/home/mot/airflow/lib/python3.12/site-packages/paramiko/proxy.py", line 85, in send
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR -     raise ProxyCommandFailure(" ".join(self.cmd), e.strerror)
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - paramiko.ssh_exception.ProxyCommandFailure: ProxyCommand("/usr/bin/connect-proxy -H 192.168.1.100:8888 mySftpHost 22") returned nonzero exit status: Broken pipe
[2025-06-25, 18:42:28 CEST] {transport.py:1942} ERROR - 
[2025-06-25, 18:42:28 CEST] {ssh.py:337} INFO - Failed to connect. Sleeping before retry attempt 1

Anything else

Two simple patches can be implemented to bypass this limit:

  1. Disabling the cached property using the connection's extra arguments
--- airflow-2.11.0.vanilla.rhel86/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py	2025-05-22 21:33:52.129132917 +0200
+++ airflow/lib/python3.11/site-packages/airflow/providers/ssh/hooks/ssh.py	2025-06-25 19:23:10.059929361 +0200
@@ -143,6 +143,7 @@
         self.allow_host_key_change = False
         self.host_key = None
         self.look_for_keys = True
+        self.proxy_command_cached = True
 
         # Placeholder for future cached connection
         self.client: paramiko.SSHClient | None = None
@@ -219,6 +220,14 @@
                     self.host_key = key_constructor(data=decoded_host_key)
                     self.no_host_key_check = False
 
+                if "auto_connection_closing" in extra_options:
+                    if str(extra_options["auto_connection_closing"]).lower() == "false":
+                        self.proxy_command_cached = False
+
+                if "proxy_command_cached" in extra_options:
+                    if str(extra_options["proxy_command_cached"]).lower() == "false":
+                        self.proxy_command_cached = False
+
         if self.cmd_timeout is NOTSET:
             self.cmd_timeout = CMD_TIMEOUT
 
@@ -260,6 +269,10 @@
         cmd = self.host_proxy_cmd
         return paramiko.ProxyCommand(cmd) if cmd else None
 
+    def host_proxy_not_cached(self) -> paramiko.ProxyCommand | None:
+        cmd = self.host_proxy_cmd
+        return paramiko.ProxyCommand(cmd) if cmd else None
+
     def get_conn(self) -> paramiko.SSHClient:
         """Establish an SSH connection to the remote host."""
         if self.client:
@@ -305,7 +318,7 @@
             "timeout": self.conn_timeout,
             "compress": self.compress,
             "port": self.port,
-            "sock": self.host_proxy,
+            "sock": self.host_proxy if self.proxy_command_cached is True else self.host_proxy_not_cached(),
             "look_for_keys": self.look_for_keys,
             "banner_timeout": self.banner_timeout,
             "auth_timeout": self.auth_timeout,

  1. Disabling at all the auto connection closing, demanding the closing to the interpreter:
--- airflow-2.11.0.vanilla.rhel86/lib/python3.11/site-packages/airflow/providers/sftp/hooks/sftp.py	2025-05-22 21:33:52.346135325 +0200
+++ airflow/lib/python3.11/site-packages/airflow/providers/sftp/hooks/sftp.py	2025-06-25 19:23:23.946165171 +0200
@@ -115,6 +115,16 @@
         self._ssh_conn: SSHClient | None = None
         self._sftp_conn: SFTPClient | None = None
         self._conn_count = 0
+        self._auto_conn_closing = False
+
+        if self.ssh_conn_id is not None:
+            conn = self.get_connection(self.ssh_conn_id)
+
+            if conn.extra is not None:
+                extra_options = conn.extra_dejson
+
+            if "auto_connection_closing" in extra_options and str(extra_options["auto_connection_closing"]).lower() == "true":
+                self._auto_conn_closing = True
 
         super().__init__(*args, **kwargs)
 
@@ -133,7 +143,7 @@
     @contextmanager
     def get_managed_conn(self) -> Generator[SFTPClient, None, None]:
         """Context manager that closes the connection after use."""
-        if self._sftp_conn is None:
+        if self._sftp_conn is None or self._ssh_conn is None:
             ssh_conn: SSHClient = super().get_conn()
             self._ssh_conn = ssh_conn
             self._sftp_conn = ssh_conn.open_sftp()
@@ -143,11 +153,16 @@
             yield self._sftp_conn
         finally:
             self._conn_count -= 1
-            if self._conn_count == 0 and self._ssh_conn is not None and self._sftp_conn is not None:
-                self._sftp_conn.close()
-                self._sftp_conn = None
-                self._ssh_conn.close()
-                self._ssh_conn = None
+            if self._auto_conn_closing is False:
+                print(f"SFTP hook - The auto connection closing is disabled, currently {self._conn_count} clients remain")
+                pass
+            else:
+                print(f"SFTP hook - Automatically closing connections, currently {self._conn_count} clients remain")
+                if self._conn_count == 0 and self._ssh_conn is not None and self._sftp_conn is not None:
+                    self._sftp_conn.close()
+                    self._sftp_conn = None
+                    self._ssh_conn.close()
+                    self._ssh_conn = None
 
     def get_conn_count(self) -> int:
         """Get the number of open connections."""
@@ -229,6 +244,18 @@
             except OSError:
                 return False

The connection's extra arguments should be like follows:

{
  [...],
  "auto_connection_closing": "false",
  "proxy_command_cached": "false"
}

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions