Skip to content

Commit

Permalink
[AIRFLOW-6676] added GCSDeleteBucketOperator (#7307)
Browse files Browse the repository at this point in the history
  • Loading branch information
michalslowikowski00 committed Feb 3, 2020
1 parent 92c72f4 commit 04c1fef
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 5 deletions.
30 changes: 28 additions & 2 deletions airflow/providers/google/cloud/example_dags/example_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import (
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteObjectsOperator,
GcsFileTransformOperator, GCSListObjectsOperator, GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteBucketOperator,
GCSDeleteObjectsOperator, GcsFileTransformOperator, GCSListObjectsOperator,
GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
)
from airflow.providers.google.cloud.operators.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.operators.local_to_gcs import LocalFilesystemToGCSOperator
Expand Down Expand Up @@ -126,7 +127,32 @@
task_id="delete_files", bucket_name=BUCKET_1, objects=[BUCKET_FILE_LOCATION]
)

# [START howto_operator_gcs_delete_bucket]
delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)
delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_2)
# [END howto_operator_gcs_delete_bucket]

[create_bucket1, create_bucket2] >> list_buckets >> list_buckets_result
[create_bucket1, create_bucket2] >> upload_file
upload_file >> [download_file, copy_file]
upload_file >> gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task >> delete_files

create_bucket1 >> delete_bucket_1
create_bucket2 >> delete_bucket_2
create_bucket2 >> copy_file
create_bucket1 >> copy_file
list_buckets >> delete_bucket_1
upload_file >> delete_bucket_1
create_bucket1 >> upload_file >> delete_bucket_1
transform_file >> delete_bucket_1
gcs_bucket_create_acl_entry_task >> delete_bucket_1
gcs_object_create_acl_entry_task >> delete_bucket_1
download_file >> delete_bucket_1
copy_file >> delete_bucket_1
copy_file >> delete_bucket_2
delete_files >> delete_bucket_1


if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run()
22 changes: 22 additions & 0 deletions airflow/providers/google/cloud/hooks/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from typing import Optional, Set, Tuple, Union
from urllib.parse import urlparse

from google.api_core.exceptions import NotFound
from google.cloud import storage

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -322,6 +323,27 @@ def delete(self, bucket_name, object_name):

self.log.info('Blob %s deleted.', object_name)

def delete_bucket(self, bucket_name: str, force: bool = False):
"""
Delete a bucket object from the Google Cloud Storage.
:param bucket_name: name of the bucket which will be deleted
:type bucket_name: str
:param force: false not allow to delete non empty bucket, set force=True
allows to delete non empty bucket
:type: bool
"""

client = self.get_conn()
bucket = client.bucket(bucket_name)

self.log.info("Deleting %s bucket", bucket_name)
try:
bucket.delete(force=force)
self.log.info("Bucket %s has been deleted", bucket_name)
except NotFound:
self.log.info("Bucket %s not exists", bucket_name)

def list(self, bucket_name, versions=None, max_results=None, prefix=None, delimiter=None):
"""
List all objects from the bucket with the give string prefix in name
Expand Down
34 changes: 34 additions & 0 deletions airflow/providers/google/cloud/operators/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,37 @@ def execute(self, context: Dict):
object_name=self.destination_object,
filename=destination_file.name
)


class GCSDeleteBucketOperator(BaseOperator):
"""
Deletes bucket from a Google Cloud Storage.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GCSDeleteBucketOperator`
:param bucket_name: name of the bucket which will be deleted
:type bucket_name: str
:param force: false not allow to delete non empty bucket, set force=True
allows to delete non empty bucket
:type: bool
"""

template_fields = ('bucket_name', "gcp_conn_id")

@apply_defaults
def __init__(self,
bucket_name: str,
force: bool = True,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)

self.bucket_name = bucket_name
self.force: bool = force
self.gcp_conn_id = gcp_conn_id

def execute(self, context):
hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
hook.delete_bucket(bucket_name=self.bucket_name, force=self.force)
27 changes: 27 additions & 0 deletions docs/howto/operator/gcp/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,30 @@ For further information, look at:

* `Client Library Documentation <https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`__
* `Product Documentation <https://cloud.google.com/storage/docs/>`__

.. _howto/operator:GCSDeleteBucketOperator:

Deleting Bucket
^^^^^^^^^^^^^^^

Deleting Bucket allows you to remove bucket object from the Google Cloud Storage.
It is performed through the
:class:`~airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator` operator.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_delete_bucket]
:end-before: [END howto_operator_gcs_delete_bucket]

You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator`
parameters which allows you to dynamically determine values.

Reference
^^^^^^^^^

For further information, look at:

* `Client Library Documentation <https://googleapis.dev/python/storage/latest/buckets.html>`__
* `Product Documentation <https://cloud.google.com/storage/docs/json_api/v1/buckets>`__
22 changes: 21 additions & 1 deletion tests/providers/google/cloud/hooks/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import tempfile
import unittest
from datetime import datetime
from unittest import mock

import dateutil
import mock
from google.cloud import exceptions, storage

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -334,6 +334,26 @@ def test_delete_nonexisting_object(self, mock_service):
with self.assertRaises(exceptions.NotFound):
self.gcs_hook.delete(bucket_name=test_bucket, object_name=test_object)

@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_delete_bucket(self, mock_service):
test_bucket = "test bucket"

self.gcs_hook.delete_bucket(bucket_name=test_bucket)

mock_service.return_value.bucket.assert_called_once_with(test_bucket)
mock_service.return_value.bucket.return_value.delete.assert_called_once()

@mock.patch(GCS_STRING.format('GCSHook.get_conn'), **{
'return_value.bucket.return_value.delete.side_effect': exceptions.NotFound(message="Not Found")
})
def test_delete_nonexisting_bucket(self, mock_service):
test_bucket = "test bucket"

self.gcs_hook.delete_bucket(bucket_name=test_bucket)

mock_service.return_value.bucket.assert_called_once_with(test_bucket)
mock_service.return_value.bucket.return_value.delete.assert_called_once()

@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_object_get_size(self, mock_service):
test_bucket = 'test_bucket'
Expand Down
15 changes: 13 additions & 2 deletions tests/providers/google/cloud/operators/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import mock

from airflow.providers.google.cloud.operators.gcs import (
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteObjectsOperator,
GcsFileTransformOperator, GCSListObjectsOperator, GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteBucketOperator,
GCSDeleteObjectsOperator, GcsFileTransformOperator, GCSListObjectsOperator,
GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
)

TASK_ID = "test-gcs-operator"
Expand Down Expand Up @@ -230,3 +231,13 @@ def test_execute(self, mock_hook, mock_subprocess, mock_tempfile):
object_name=destination_object,
filename=destination,
)


class TestGCSDeleteBucketOperator(unittest.TestCase):
@mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
def test_delete_bucket(self, mock_hook):
operator = GCSDeleteBucketOperator(
task_id=TASK_ID, bucket_name=TEST_BUCKET)

operator.execute(None)
mock_hook.return_value.delete_bucket.assert_called_once_with(bucket_name=TEST_BUCKET, force=True)

0 comments on commit 04c1fef

Please sign in to comment.