Skip to content

Commit

Permalink
Fix GCSToGCSOperator cannot copy a single file/folder without copying…
Browse files Browse the repository at this point in the history
… other files/folders with that prefix (#24039)
  • Loading branch information
gitstart-airflow committed Jun 6, 2022
1 parent 5e6997e commit ec84ffe
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
6 changes: 6 additions & 0 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class GCSToGCSOperator(BaseOperator):
account from the list granting this role to the originating account (templated).
:param source_object_required: Whether you want to raise an exception when the source object
doesn't exist. It doesn't have any effect when the source objects are folders or patterns.
:param exact_match: When specified, only exact match of the source object (filename) will be
copied.
:Example:
Expand Down Expand Up @@ -189,6 +191,7 @@ def __init__(
is_older_than=None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
source_object_required=False,
exact_match=False,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -208,6 +211,7 @@ def __init__(
self.is_older_than = is_older_than
self.impersonation_chain = impersonation_chain
self.source_object_required = source_object_required
self.exact_match = exact_match

def execute(self, context: 'Context'):

Expand Down Expand Up @@ -341,6 +345,8 @@ def _copy_source_without_wildcard(self, hook, prefix):
raise AirflowException(msg)

for source_obj in objects:
if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)):
continue
if self.destination_object is None:
destination_object = source_obj
else:
Expand Down
22 changes: 22 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_copy_file_with_exact_match(self, mock_hook):
SOURCE_FILES = [
'test_object.txt',
'test_object.txt.copy/',
'test_object.txt.folder/',
]
mock_hook.return_value.list.return_value = SOURCE_FILES
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=DESTINATION_BUCKET,
exact_match=True,
)

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_prefix_and_suffix(self, mock_hook):
operator = GCSToGCSOperator(
Expand Down

0 comments on commit ec84ffe

Please sign in to comment.