Skip to content

Commit

Permalink
Improve GCSToSFTPOperator paths handling (#11284)
Browse files Browse the repository at this point in the history
* Improve GCSToSFTPOperator paths handling
  • Loading branch information
TobKed committed Feb 5, 2021
1 parent f92194b commit 5d7d46b
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 58 deletions.
51 changes: 47 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@

from airflow import models
from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.utils.dates import days_ago

SFTP_CONN_ID = "ssh_default"
BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sftp")
OBJECT_SRC_1 = "parent-1.bin"
OBJECT_SRC_2 = "parent-2.bin"
OBJECT_SRC_3 = "subdir-1/*"
OBJECT_SRC_2 = "dir-1/parent-2.bin"
OBJECT_SRC_3 = "dir-2/*"
DESTINATION_PATH_1 = "/tmp/single-file/"
DESTINATION_PATH_2 = "/tmp/dirs/"
DESTINATION_PATH_2 = "/tmp/dest-dir-1/"
DESTINATION_PATH_3 = "/tmp/dest-dir-2/"


with models.DAG(
Expand All @@ -39,36 +42,76 @@
# [START howto_operator_gcs_to_sftp_copy_single_file]
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-copy-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_1,
destination_path=DESTINATION_PATH_1,
)
# [END howto_operator_gcs_to_sftp_copy_single_file]

check_copy_file_from_gcs_to_sftp = SFTPSensor(
task_id="check-file-copy-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
timeout=60,
path=os.path.join(DESTINATION_PATH_1, OBJECT_SRC_1),
)

# [START howto_operator_gcs_to_sftp_move_single_file_destination]
move_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-move-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_2,
destination_path=DESTINATION_PATH_1,
move_object=True,
)
# [END howto_operator_gcs_to_sftp_move_single_file_destination]

check_move_file_from_gcs_to_sftp = SFTPSensor(
task_id="check-file-move-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
timeout=60,
path=os.path.join(DESTINATION_PATH_1, OBJECT_SRC_2),
)

# [START howto_operator_gcs_to_sftp_copy_directory]
copy_dir_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="dir-copy-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_3,
destination_path=DESTINATION_PATH_2,
)
# [END howto_operator_gcs_to_sftp_copy_directory]

check_copy_dir_from_gcs_to_sftp = SFTPSensor(
task_id="check-dir-copy-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
timeout=60,
path=os.path.join(DESTINATION_PATH_2, "dir-2", OBJECT_SRC_1),
)

# [START howto_operator_gcs_to_sftp_move_specific_files]
move_dir_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="dir-move-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_3,
destination_path=DESTINATION_PATH_2,
destination_path=DESTINATION_PATH_3,
keep_directory_structure=False,
)
# [END howto_operator_gcs_to_sftp_move_specific_files]

check_move_dir_from_gcs_to_sftp = SFTPSensor(
task_id="check-dir-move-gsc-to-sftp",
sftp_conn_id=SFTP_CONN_ID,
timeout=60,
path=os.path.join(DESTINATION_PATH_3, OBJECT_SRC_1),
)

move_file_from_gcs_to_sftp >> check_move_file_from_gcs_to_sftp
copy_dir_from_gcs_to_sftp >> check_copy_file_from_gcs_to_sftp

copy_dir_from_gcs_to_sftp >> move_dir_from_gcs_to_sftp
copy_dir_from_gcs_to_sftp >> check_copy_dir_from_gcs_to_sftp
move_dir_from_gcs_to_sftp >> check_move_dir_from_gcs_to_sftp
44 changes: 42 additions & 2 deletions airflow/providers/google/cloud/transfers/gcs_to_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ class GCSToSFTPOperator(BaseOperator):
"""
Transfer files from a Google Cloud Storage bucket to SFTP server.
**Example**: ::
with models.DAG(
"example_gcs_to_sftp",
start_date=datetime(2020, 6, 19),
schedule_interval=None,
) as dag:
# downloads file to /tmp/sftp/folder/subfolder/file.txt
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-copy-gsc-to-sftp",
source_bucket="test-gcs-sftp-bucket-name",
source_object="folder/subfolder/file.txt",
destination_path="/tmp/sftp",
)
# moves file to /tmp/data.txt
move_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-move-gsc-to-sftp",
source_bucket="test-gcs-sftp-bucket-name",
source_object="folder/subfolder/data.txt",
destination_path="/tmp",
move_object=True,
keep_directory_structure=False,
)
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GCSToSFTPOperator`
Expand All @@ -50,6 +75,9 @@ class GCSToSFTPOperator(BaseOperator):
:param destination_path: The sftp remote path. This is the specified directory path for
uploading to the SFTP server.
:type destination_path: str
:param keep_directory_structure: (Optional) When set to False the path of the file
on the bucket is recreated within path passed in destination_path.
:type keep_directory_structure: bool
:param move_object: When move object is True, the object is moved instead
of copied to the new location. This is the equivalent of a mv command
as opposed to a cp command.
Expand Down Expand Up @@ -90,6 +118,7 @@ def __init__(
source_bucket: str,
source_object: str,
destination_path: str,
keep_directory_structure: bool = True,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "ssh_default",
Expand All @@ -102,6 +131,7 @@ def __init__(
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_path = destination_path
self.keep_directory_structure = keep_directory_structure
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.sftp_conn_id = sftp_conn_id
Expand All @@ -127,18 +157,28 @@ def execute(self, context):
)

prefix, delimiter = self.source_object.split(WILDCARD, 1)
prefix_dirname = os.path.dirname(prefix)

objects = gcs_hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)

for source_object in objects:
destination_path = os.path.join(self.destination_path, source_object)
destination_path = self._resolve_destination_path(source_object, prefix=prefix_dirname)
self._copy_single_object(gcs_hook, sftp_hook, source_object, destination_path)

self.log.info("Done. Uploaded '%d' files to %s", len(objects), self.destination_path)
else:
destination_path = os.path.join(self.destination_path, self.source_object)
destination_path = self._resolve_destination_path(self.source_object)
self._copy_single_object(gcs_hook, sftp_hook, self.source_object, destination_path)
self.log.info("Done. Uploaded '%s' file to %s", self.source_object, destination_path)

def _resolve_destination_path(self, source_object: str, prefix: Optional[str] = None) -> str:
if not self.keep_directory_structure:
if prefix:
source_object = os.path.relpath(source_object, start=prefix)
else:
source_object = os.path.basename(source_object)
return os.path.join(self.destination_path, source_object)

def _copy_single_object(
self,
gcs_hook: GCSHook,
Expand Down

0 comments on commit 5d7d46b

Please sign in to comment.