Skip to content

Commit

Permalink
Add Google Deployment Manager Hook (#9159)
Browse files Browse the repository at this point in the history
Co-authored-by: Ephraim Anierobi <[email protected]>
  • Loading branch information
SamWheating and ephraimbuddy committed Jul 15, 2020
1 parent 4a547ee commit 9f01795
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 0 deletions.
100 changes: 100 additions & 0 deletions airflow/providers/google/cloud/hooks/gdm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from typing import Any, Dict, List, Optional

from googleapiclient.discovery import build

from airflow.exceptions import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook


class GoogleDeploymentManagerHook(GoogleBaseHook): # pylint: disable=abstract-method
"""
Interact with Google Cloud Deployment Manager using the Google Cloud Platform connection.
This allows for scheduled and programatic inspection and deletion fo resources managed by GDM.
"""

def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
super(GoogleDeploymentManagerHook, self).__init__(gcp_conn_id, delegate_to=delegate_to)

def get_conn(self):
"""
Returns a Google Deployment Manager service object.
:rtype: googleapiclient.discovery.Resource
"""
http_authorized = self._authorize()
return build('deploymentmanager', 'v2', http=http_authorized, cache_discovery=False)

@GoogleBaseHook.fallback_to_default_project_id
def list_deployments(self, project_id: Optional[str] = None, # pylint: disable=too-many-arguments
deployment_filter: Optional[str] = None,
order_by: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Lists deployments in a google cloud project.
:param project_id: The project ID for this request.
:type project_id: str
:param deployment_filter: A filter expression which limits resources returned in the response.
:type deployment_filter: str
:param order_by: A field name to order by, ex: "creationTimestamp desc"
:type order_by: Optional[str]
:rtype: list
"""
deployments = [] # type: List[Dict]
conn = self.get_conn()
request = conn.deployments().list(project=project_id, # pylint: disable=no-member
filter=deployment_filter,
orderBy=order_by)

while request is not None:
response = request.execute(num_retries=self.num_retries)
deployments.extend(response.get("deployments", []))
request = conn.deployments().list_next( # pylint: disable=no-member
previous_request=request, previous_response=response
)

return deployments

@GoogleBaseHook.fallback_to_default_project_id
def delete_deployment(self,
project_id: Optional[str],
deployment: Optional[str] = None,
delete_policy: Optional[str] = None):
"""
Deletes a deployment and all associated resources in a google cloud project.
:param project_id: The project ID for this request.
:type project_id: str
:param deployment: The name of the deployment for this request.
:type deployment: str
:param delete_policy: Sets the policy to use for deleting resources. (ABANDON | DELETE)
:type delete_policy: string
:rtype: None
"""
conn = self.get_conn()
request = conn.deployments().delete(project=project_id, # pylint: disable=no-member
deployment=deployment,
deletePolicy=delete_policy)
resp = request.execute()
if 'error' in resp.keys():
raise AirflowException('Errors deleting deployment: ',
', '.join([err['message'] for err in resp['error']['errors']]))
6 changes: 6 additions & 0 deletions docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ These integrations allow you to perform various operations within the Google Clo
- :mod:`airflow.providers.google.cloud.operators.datastore`
-

* - `Deployment Manager <https://cloud.google.com/deployment-manager/>`__
-
- :mod:`airflow.providers.google.cloud.hooks.gdm`
-
-

* - `Cloud Functions <https://cloud.google.com/functions/>`__
- :doc:`How to use <howto/operator/google/cloud/functions>`
- :mod:`airflow.providers.google.cloud.hooks.functions`
Expand Down
100 changes: 100 additions & 0 deletions tests/providers/google/cloud/hooks/test_gdm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest import mock

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.gdm import GoogleDeploymentManagerHook


def mock_init(self, gcp_conn_id, delegate_to=None): # pylint: disable=unused-argument
pass


TEST_PROJECT = 'my-project'
TEST_DEPLOYMENT = 'my-deployment'


class TestDeploymentManagerHook(unittest.TestCase):

def setUp(self):
with mock.patch(
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
new=mock_init,
):
self.gdm_hook = GoogleDeploymentManagerHook(gcp_conn_id="test")

@mock.patch("airflow.providers.google.cloud.hooks.gdm.GoogleDeploymentManagerHook.get_conn")
def test_list_deployments(self, mock_get_conn):

response1 = {'deployments': [{'id': 'deployment1', 'name': 'test-deploy1'}], 'pageToken': None}
response2 = {'deployments': [{'id': 'deployment2', 'name': 'test-deploy2'}], 'pageToken': None}

mock_get_conn.return_value.deployments.return_value.list.return_value.execute.return_value = response1

request_mock = mock.MagicMock()
request_mock.execute.return_value = response2
mock_get_conn.return_value.deployments.return_value.list_next.side_effect = [
request_mock,
None,
]

deployments = self.gdm_hook.list_deployments(project_id=TEST_PROJECT,
deployment_filter='filter',
order_by='name')

mock_get_conn.assert_called_once_with()

mock_get_conn.return_value.deployments.return_value.list.assert_called_once_with(
project=TEST_PROJECT,
filter='filter',
orderBy='name',
)

self.assertEqual(mock_get_conn.return_value.deployments.return_value.list_next.call_count, 2)

self.assertEqual(deployments, [{'id': 'deployment1', 'name': 'test-deploy1'},
{'id': 'deployment2', 'name': 'test-deploy2'}])

@mock.patch("airflow.providers.google.cloud.hooks.gdm.GoogleDeploymentManagerHook.get_conn")
def test_delete_deployment(self, mock_get_conn):
self.gdm_hook.delete_deployment(project_id=TEST_PROJECT, deployment=TEST_DEPLOYMENT)
mock_get_conn.assert_called_once_with()
mock_get_conn.return_value.deployments().delete.assert_called_once_with(
project=TEST_PROJECT,
deployment=TEST_DEPLOYMENT,
deletePolicy=None
)

@mock.patch("airflow.providers.google.cloud.hooks.gdm.GoogleDeploymentManagerHook.get_conn")
def test_delete_deployment_delete_fails(self, mock_get_conn):

resp = {'error': {'errors': [{'message': 'error deleting things.', 'domain': 'global'}]}}

mock_get_conn.return_value.deployments.return_value.delete.return_value.execute.return_value = resp

with self.assertRaises(AirflowException):
self.gdm_hook.delete_deployment(project_id=TEST_PROJECT, deployment=TEST_DEPLOYMENT)

mock_get_conn.assert_called_once_with()
mock_get_conn.return_value.deployments().delete.assert_called_once_with(
project=TEST_PROJECT,
deployment=TEST_DEPLOYMENT,
deletePolicy=None
)

0 comments on commit 9f01795

Please sign in to comment.