Skip to content

Commit

Permalink
Fix GCStoGCS operator with replace diabled and existing destination o…
Browse files Browse the repository at this point in the history
…bject (#16991)
  • Loading branch information
aslantar committed Jul 25, 2021
1 parent e7bd82a commit 966b250
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
14 changes: 13 additions & 1 deletion airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,19 @@ def _copy_source_with_wildcard(self, hook, prefix):
# and only keep those files which are present in
# Source GCS bucket and not in Destination GCS bucket

existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
if self.destination_object is None:
existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
else:
self.log.info("Replaced destination_object with source_object prefix.")
destination_objects = hook.list(
self.destination_bucket,
prefix=self.destination_object,
delimiter=delimiter,
)
existing_objects = [
dest_object.replace(self.destination_object, prefix_, 1)
for dest_object in destination_objects
]

objects = set(objects) - set(existing_objects)
if len(objects) > 0:
Expand Down
18 changes: 18 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,21 @@ def test_wc_with_last_modified_time_with_all_true_cond_no_file(self, mock_hook):
mock.call(TEST_BUCKET, 'test_object/file3.json', DESTINATION_BUCKET, 'test_object/file3.json'),
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls_none)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_wildcard_with_replace_flag_false_with_destination_object(self, mock_hook):
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
destination_bucket=DESTINATION_BUCKET,
destination_object=DESTINATION_OBJECT_PREFIX,
replace=False,
)

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object", delimiter=""),
mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

0 comments on commit 966b250

Please sign in to comment.