Skip to content

Commit

Permalink
Fix GCSToGCSOperator ignores replace parameter when there is no wildc…
Browse files Browse the repository at this point in the history
…ard (#23340)
  • Loading branch information
gitstart-airflow committed May 8, 2022
1 parent 0371819 commit 82c244f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 19 deletions.
50 changes: 31 additions & 19 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,32 @@ def execute(self, context: 'Context'):
else:
self._copy_source_without_wildcard(hook=hook, prefix=prefix)

def _ignore_existing_files(self, hook, prefix, **kwargs):
# list all files in the Destination GCS bucket
# and only keep those files which are present in
# Source GCS bucket and not in Destination GCS bucket
delimiter = kwargs.get('delimiter')
objects = kwargs.get('objects')
if self.destination_object is None:
existing_objects = hook.list(self.destination_bucket, prefix=prefix, delimiter=delimiter)
else:
self.log.info("Replaced destination_object with source_object prefix.")
destination_objects = hook.list(
self.destination_bucket,
prefix=self.destination_object,
delimiter=delimiter,
)
existing_objects = [
dest_object.replace(self.destination_object, prefix, 1) for dest_object in destination_objects
]

objects = set(objects) - set(existing_objects)
if len(objects) > 0:
self.log.info('%s files are going to be synced: %s.', len(objects), objects)
else:
self.log.info('There are no new files to sync. Have a nice day!')
return objects

def _copy_source_without_wildcard(self, hook, prefix):
"""
For source_objects with no wildcard, this operator would first list
Expand Down Expand Up @@ -298,6 +324,10 @@ def _copy_source_without_wildcard(self, hook, prefix):
"""
objects = hook.list(self.source_bucket, prefix=prefix, delimiter=self.delimiter)

if not self.replace:
# If we are not replacing, ignore files already existing in source buckets
objects = self._ignore_existing_files(hook, prefix, objects=objects, delimiter=self.delimiter)

# If objects is empty and we have prefix, let's check if prefix is a blob
# and copy directly
if len(objects) == 0 and prefix:
Expand Down Expand Up @@ -335,26 +365,8 @@ def _copy_source_with_wildcard(self, hook, prefix):
# If we are not replacing, list all files in the Destination GCS bucket
# and only keep those files which are present in
# Source GCS bucket and not in Destination GCS bucket
objects = self._ignore_existing_files(hook, prefix_, delimiter=delimiter, objects=objects)

if self.destination_object is None:
existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
else:
self.log.info("Replaced destination_object with source_object prefix.")
destination_objects = hook.list(
self.destination_bucket,
prefix=self.destination_object,
delimiter=delimiter,
)
existing_objects = [
dest_object.replace(self.destination_object, prefix_, 1)
for dest_object in destination_objects
]

objects = set(objects) - set(existing_objects)
if len(objects) > 0:
self.log.info('%s files are going to be synced: %s.', len(objects), objects)
else:
self.log.info('There are no new files to sync. Have a nice day!')
for source_object in objects:
if self.destination_object is None:
destination_object = source_object
Expand Down
17 changes: 17 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook):
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=DESTINATION_BUCKET,
replace=False,
)

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
mock.call(DESTINATION_BUCKET, prefix="test_object.txt", delimiter=None),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_prefix_and_suffix(self, mock_hook):
operator = GCSToGCSOperator(
Expand Down

0 comments on commit 82c244f

Please sign in to comment.