Skip to content

Commit 5d7d46b

Browse files
authored
Improve GCSToSFTPOperator paths handling (#11284)
* Improve GCSToSFTPOperator paths handling
1 parent f92194b commit 5d7d46b

File tree

5 files changed

+296
-58
lines changed

5 files changed

+296
-58
lines changed

airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@
2323

2424
from airflow import models
2525
from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator
26+
from airflow.providers.sftp.sensors.sftp import SFTPSensor
2627
from airflow.utils.dates import days_ago
2728

29+
SFTP_CONN_ID = "ssh_default"
2830
BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sftp")
2931
OBJECT_SRC_1 = "parent-1.bin"
30-
OBJECT_SRC_2 = "parent-2.bin"
31-
OBJECT_SRC_3 = "subdir-1/*"
32+
OBJECT_SRC_2 = "dir-1/parent-2.bin"
33+
OBJECT_SRC_3 = "dir-2/*"
3234
DESTINATION_PATH_1 = "/tmp/single-file/"
33-
DESTINATION_PATH_2 = "/tmp/dirs/"
35+
DESTINATION_PATH_2 = "/tmp/dest-dir-1/"
36+
DESTINATION_PATH_3 = "/tmp/dest-dir-2/"
3437

3538

3639
with models.DAG(
@@ -39,36 +42,76 @@
3942
# [START howto_operator_gcs_to_sftp_copy_single_file]
4043
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
4144
task_id="file-copy-gsc-to-sftp",
45+
sftp_conn_id=SFTP_CONN_ID,
4246
source_bucket=BUCKET_SRC,
4347
source_object=OBJECT_SRC_1,
4448
destination_path=DESTINATION_PATH_1,
4549
)
4650
# [END howto_operator_gcs_to_sftp_copy_single_file]
4751

52+
check_copy_file_from_gcs_to_sftp = SFTPSensor(
53+
task_id="check-file-copy-gsc-to-sftp",
54+
sftp_conn_id=SFTP_CONN_ID,
55+
timeout=60,
56+
path=os.path.join(DESTINATION_PATH_1, OBJECT_SRC_1),
57+
)
58+
4859
# [START howto_operator_gcs_to_sftp_move_single_file_destination]
4960
move_file_from_gcs_to_sftp = GCSToSFTPOperator(
5061
task_id="file-move-gsc-to-sftp",
62+
sftp_conn_id=SFTP_CONN_ID,
5163
source_bucket=BUCKET_SRC,
5264
source_object=OBJECT_SRC_2,
5365
destination_path=DESTINATION_PATH_1,
5466
move_object=True,
5567
)
5668
# [END howto_operator_gcs_to_sftp_move_single_file_destination]
5769

70+
check_move_file_from_gcs_to_sftp = SFTPSensor(
71+
task_id="check-file-move-gsc-to-sftp",
72+
sftp_conn_id=SFTP_CONN_ID,
73+
timeout=60,
74+
path=os.path.join(DESTINATION_PATH_1, OBJECT_SRC_2),
75+
)
76+
5877
# [START howto_operator_gcs_to_sftp_copy_directory]
5978
copy_dir_from_gcs_to_sftp = GCSToSFTPOperator(
6079
task_id="dir-copy-gsc-to-sftp",
80+
sftp_conn_id=SFTP_CONN_ID,
6181
source_bucket=BUCKET_SRC,
6282
source_object=OBJECT_SRC_3,
6383
destination_path=DESTINATION_PATH_2,
6484
)
6585
# [END howto_operator_gcs_to_sftp_copy_directory]
6686

87+
check_copy_dir_from_gcs_to_sftp = SFTPSensor(
88+
task_id="check-dir-copy-gsc-to-sftp",
89+
sftp_conn_id=SFTP_CONN_ID,
90+
timeout=60,
91+
path=os.path.join(DESTINATION_PATH_2, "dir-2", OBJECT_SRC_1),
92+
)
93+
6794
# [START howto_operator_gcs_to_sftp_move_specific_files]
6895
move_dir_from_gcs_to_sftp = GCSToSFTPOperator(
6996
task_id="dir-move-gsc-to-sftp",
97+
sftp_conn_id=SFTP_CONN_ID,
7098
source_bucket=BUCKET_SRC,
7199
source_object=OBJECT_SRC_3,
72-
destination_path=DESTINATION_PATH_2,
100+
destination_path=DESTINATION_PATH_3,
101+
keep_directory_structure=False,
73102
)
74103
# [END howto_operator_gcs_to_sftp_move_specific_files]
104+
105+
check_move_dir_from_gcs_to_sftp = SFTPSensor(
106+
task_id="check-dir-move-gsc-to-sftp",
107+
sftp_conn_id=SFTP_CONN_ID,
108+
timeout=60,
109+
path=os.path.join(DESTINATION_PATH_3, OBJECT_SRC_1),
110+
)
111+
112+
move_file_from_gcs_to_sftp >> check_move_file_from_gcs_to_sftp
113+
copy_dir_from_gcs_to_sftp >> check_copy_file_from_gcs_to_sftp
114+
115+
copy_dir_from_gcs_to_sftp >> move_dir_from_gcs_to_sftp
116+
copy_dir_from_gcs_to_sftp >> check_copy_dir_from_gcs_to_sftp
117+
move_dir_from_gcs_to_sftp >> check_move_dir_from_gcs_to_sftp

airflow/providers/google/cloud/transfers/gcs_to_sftp.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,31 @@ class GCSToSFTPOperator(BaseOperator):
3333
"""
3434
Transfer files from a Google Cloud Storage bucket to SFTP server.
3535
36+
**Example**: ::
37+
38+
with models.DAG(
39+
"example_gcs_to_sftp",
40+
start_date=datetime(2020, 6, 19),
41+
schedule_interval=None,
42+
) as dag:
43+
# downloads file to /tmp/sftp/folder/subfolder/file.txt
44+
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
45+
task_id="file-copy-gsc-to-sftp",
46+
source_bucket="test-gcs-sftp-bucket-name",
47+
source_object="folder/subfolder/file.txt",
48+
destination_path="/tmp/sftp",
49+
)
50+
51+
# moves file to /tmp/data.txt
52+
move_file_from_gcs_to_sftp = GCSToSFTPOperator(
53+
task_id="file-move-gsc-to-sftp",
54+
source_bucket="test-gcs-sftp-bucket-name",
55+
source_object="folder/subfolder/data.txt",
56+
destination_path="/tmp",
57+
move_object=True,
58+
keep_directory_structure=False,
59+
)
60+
3661
.. seealso::
3762
For more information on how to use this operator, take a look at the guide:
3863
:ref:`howto/operator:GCSToSFTPOperator`
@@ -50,6 +75,9 @@ class GCSToSFTPOperator(BaseOperator):
5075
:param destination_path: The sftp remote path. This is the specified directory path for
5176
uploading to the SFTP server.
5277
:type destination_path: str
78+
:param keep_directory_structure: (Optional) When set to False the path of the file
79+
on the bucket is recreated within path passed in destination_path.
80+
:type keep_directory_structure: bool
5381
:param move_object: When move object is True, the object is moved instead
5482
of copied to the new location. This is the equivalent of a mv command
5583
as opposed to a cp command.
@@ -90,6 +118,7 @@ def __init__(
90118
source_bucket: str,
91119
source_object: str,
92120
destination_path: str,
121+
keep_directory_structure: bool = True,
93122
move_object: bool = False,
94123
gcp_conn_id: str = "google_cloud_default",
95124
sftp_conn_id: str = "ssh_default",
@@ -102,6 +131,7 @@ def __init__(
102131
self.source_bucket = source_bucket
103132
self.source_object = source_object
104133
self.destination_path = destination_path
134+
self.keep_directory_structure = keep_directory_structure
105135
self.move_object = move_object
106136
self.gcp_conn_id = gcp_conn_id
107137
self.sftp_conn_id = sftp_conn_id
@@ -127,18 +157,28 @@ def execute(self, context):
127157
)
128158

129159
prefix, delimiter = self.source_object.split(WILDCARD, 1)
160+
prefix_dirname = os.path.dirname(prefix)
161+
130162
objects = gcs_hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)
131163

132164
for source_object in objects:
133-
destination_path = os.path.join(self.destination_path, source_object)
165+
destination_path = self._resolve_destination_path(source_object, prefix=prefix_dirname)
134166
self._copy_single_object(gcs_hook, sftp_hook, source_object, destination_path)
135167

136168
self.log.info("Done. Uploaded '%d' files to %s", len(objects), self.destination_path)
137169
else:
138-
destination_path = os.path.join(self.destination_path, self.source_object)
170+
destination_path = self._resolve_destination_path(self.source_object)
139171
self._copy_single_object(gcs_hook, sftp_hook, self.source_object, destination_path)
140172
self.log.info("Done. Uploaded '%s' file to %s", self.source_object, destination_path)
141173

174+
def _resolve_destination_path(self, source_object: str, prefix: Optional[str] = None) -> str:
175+
if not self.keep_directory_structure:
176+
if prefix:
177+
source_object = os.path.relpath(source_object, start=prefix)
178+
else:
179+
source_object = os.path.basename(source_object)
180+
return os.path.join(self.destination_path, source_object)
181+
142182
def _copy_single_object(
143183
self,
144184
gcs_hook: GCSHook,

0 commit comments

Comments
 (0)