Skip to content

Commit

Permalink
Optionally raise an error if source file does not exist in GCSToGCSOp…
Browse files Browse the repository at this point in the history
…erator (#21391)
  • Loading branch information
davidpr91 committed Feb 10, 2022
1 parent a2abf66 commit 51aff27
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
9 changes: 9 additions & 0 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Expand Up @@ -90,6 +90,8 @@ class GCSToGCSOperator(BaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param source_object_required: Whether you want to raise an exception when the source object
doesn't exist. It doesn't have any effect when the source objects are folders or patterns.
:Example:
Expand Down Expand Up @@ -190,6 +192,7 @@ def __init__(
maximum_modified_time=None,
is_older_than=None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
source_object_required=False,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -216,6 +219,7 @@ def __init__(
self.maximum_modified_time = maximum_modified_time
self.is_older_than = is_older_than
self.impersonation_chain = impersonation_chain
self.source_object_required = source_object_required

def execute(self, context: 'Context'):

Expand Down Expand Up @@ -313,6 +317,11 @@ def _copy_source_without_wildcard(self, hook, prefix):
self._copy_single_object(
hook=hook, source_object=prefix, destination_object=self.destination_object
)
elif self.source_object_required:
msg = f"{prefix} does not exist in bucket {self.source_bucket}"
self.log.warning(msg)
raise AirflowException(msg)

for source_obj in objects:
if self.destination_object is None:
destination_object = source_obj
Expand Down
17 changes: 17 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Expand Up @@ -546,3 +546,20 @@ def test_execute_wildcard_with_replace_flag_false_with_destination_object(self,
mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_source_object_required_flag_true(self, mock_hook):
mock_hook.return_value.exists.return_value = False
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_objects=SOURCE_OBJECTS_SINGLE_FILE,
destination_bucket=DESTINATION_BUCKET,
destination_object=DESTINATION_OBJECT_PREFIX,
source_object_required=True,
)

with pytest.raises(
AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}"
):
operator.execute(None)

0 comments on commit 51aff27

Please sign in to comment.