Skip to content

Commit

Permalink
Spanner assets & system tests migration (AIP-47) (#23957)
Browse files Browse the repository at this point in the history
  • Loading branch information
wojsamjan committed Jun 1, 2022
1 parent fedab9d commit 841ed27
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 158 deletions.
23 changes: 0 additions & 23 deletions airflow/providers/google/cloud/example_dags/example_spanner.sql

This file was deleted.

74 changes: 74 additions & 0 deletions airflow/providers/google/cloud/links/spanner.py
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Spanner links."""
from typing import TYPE_CHECKING, Optional

from airflow.models import BaseOperator
from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context

SPANNER_BASE_LINK = "https://console.cloud.google.com/spanner/instances"
SPANNER_INSTANCE_LINK = SPANNER_BASE_LINK + "/{instance_id}/details/databases?project={project_id}"
SPANNER_DATABASE_LINK = (
SPANNER_BASE_LINK + "/{instance_id}/databases/{database_id}/details/tables?project={project_id}"
)


class SpannerInstanceLink(BaseGoogleLink):
"""Helper class for constructing Spanner Instance Link"""

name = "Spanner Instance"
key = "spanner_instance"
format_str = SPANNER_INSTANCE_LINK

@staticmethod
def persist(
context: "Context",
task_instance: BaseOperator,
instance_id: str,
project_id: Optional[str],
):
task_instance.xcom_push(
context,
key=SpannerInstanceLink.key,
value={"instance_id": instance_id, "project_id": project_id},
)


class SpannerDatabaseLink(BaseGoogleLink):
"""Helper class for constructing Spanner Database Link"""

name = "Spanner Database"
key = "spanner_database"
format_str = SPANNER_DATABASE_LINK

@staticmethod
def persist(
context: "Context",
task_instance: BaseOperator,
instance_id: str,
database_id: str,
project_id: Optional[str],
):
task_instance.xcom_push(
context,
key=SpannerDatabaseLink.key,
value={"instance_id": instance_id, "database_id": database_id, "project_id": project_id},
)
32 changes: 32 additions & 0 deletions airflow/providers/google/cloud/operators/spanner.py
Expand Up @@ -21,6 +21,7 @@
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.spanner import SpannerHook
from airflow.providers.google.cloud.links.spanner import SpannerDatabaseLink, SpannerInstanceLink

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -67,6 +68,7 @@ class SpannerDeployInstanceOperator(BaseOperator):
'impersonation_chain',
)
# [END gcp_spanner_deploy_template_fields]
operator_extra_links = (SpannerInstanceLink(),)

def __init__(
self,
Expand Down Expand Up @@ -114,6 +116,12 @@ def execute(self, context: 'Context') -> None:
node_count=self.node_count,
display_name=self.display_name,
)
SpannerInstanceLink.persist(
context=context,
task_instance=self,
instance_id=self.instance_id,
project_id=self.project_id or hook.project_id,
)


class SpannerDeleteInstanceOperator(BaseOperator):
Expand Down Expand Up @@ -223,6 +231,7 @@ class SpannerQueryDatabaseInstanceOperator(BaseOperator):
template_ext: Sequence[str] = ('.sql',)
template_fields_renderers = {'query': 'sql'}
# [END gcp_spanner_query_template_fields]
operator_extra_links = (SpannerDatabaseLink(),)

def __init__(
self,
Expand Down Expand Up @@ -277,6 +286,13 @@ def execute(self, context: 'Context'):
database_id=self.database_id,
queries=queries,
)
SpannerDatabaseLink.persist(
context=context,
task_instance=self,
instance_id=self.instance_id,
database_id=self.database_id,
project_id=self.project_id or hook.project_id,
)

@staticmethod
def sanitize_queries(queries: List[str]) -> None:
Expand Down Expand Up @@ -327,6 +343,7 @@ class SpannerDeployDatabaseInstanceOperator(BaseOperator):
template_ext: Sequence[str] = ('.sql',)
template_fields_renderers = {'ddl_statements': 'sql'}
# [END gcp_spanner_database_deploy_template_fields]
operator_extra_links = (SpannerDatabaseLink(),)

def __init__(
self,
Expand Down Expand Up @@ -361,6 +378,13 @@ def execute(self, context: 'Context') -> Optional[bool]:
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
SpannerDatabaseLink.persist(
context=context,
task_instance=self,
instance_id=self.instance_id,
database_id=self.database_id,
project_id=self.project_id or hook.project_id,
)
if not hook.get_database(
project_id=self.project_id, instance_id=self.instance_id, database_id=self.database_id
):
Expand Down Expand Up @@ -425,6 +449,7 @@ class SpannerUpdateDatabaseInstanceOperator(BaseOperator):
template_ext: Sequence[str] = ('.sql',)
template_fields_renderers = {'ddl_statements': 'sql'}
# [END gcp_spanner_database_update_template_fields]
operator_extra_links = (SpannerDatabaseLink(),)

def __init__(
self,
Expand Down Expand Up @@ -472,6 +497,13 @@ def execute(self, context: 'Context') -> None:
f"Create the database first before you can update it."
)
else:
SpannerDatabaseLink.persist(
context=context,
task_instance=self,
instance_id=self.instance_id,
database_id=self.database_id,
project_id=self.project_id or hook.project_id,
)
return hook.update_database(
project_id=self.project_id,
instance_id=self.instance_id,
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Expand Up @@ -916,6 +916,8 @@ extra-links:
- airflow.providers.google.cloud.links.bigtable.BigtableInstanceLink
- airflow.providers.google.cloud.links.bigtable.BigtableClusterLink
- airflow.providers.google.cloud.links.bigtable.BigtableTablesLink
- airflow.providers.google.cloud.links.spanner.SpannerDatabaseLink
- airflow.providers.google.cloud.links.spanner.SpannerInstanceLink
- airflow.providers.google.cloud.links.stackdriver.StackdriverNotificationsLink
- airflow.providers.google.cloud.links.stackdriver.StackdriverPoliciesLink
- airflow.providers.google.common.links.storage.StorageLink
Expand Down
14 changes: 7 additions & 7 deletions docs/apache-airflow-providers-google/operators/cloud/spanner.rst
Expand Up @@ -41,7 +41,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spanner_deploy]
Expand Down Expand Up @@ -80,7 +80,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spanner_database_delete]
Expand Down Expand Up @@ -120,7 +120,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spanner_database_deploy]
Expand Down Expand Up @@ -164,13 +164,13 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spanner_database_update]
:end-before: [END howto_operator_spanner_database_update]

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spanner_database_update_idempotent]
Expand Down Expand Up @@ -207,7 +207,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spanner_query]
Expand Down Expand Up @@ -246,7 +246,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spanner_delete]
Expand Down

0 comments on commit 841ed27

Please sign in to comment.