Skip to content

Commit

Permalink
Add links for Cloud Datastore operators
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksYermak authored and potiuk committed Apr 25, 2022
1 parent 8cfb2be commit 43ded6c
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 0 deletions.
97 changes: 97 additions & 0 deletions airflow/providers/google/cloud/links/datastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import TYPE_CHECKING

from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context

BASE_LINK = "https://console.cloud.google.com"
DATASTORE_BASE_LINK = BASE_LINK + "/datastore"
DATASTORE_IMPORT_EXPORT_LINK = DATASTORE_BASE_LINK + "/import-export?project={project_id}"
DATASTORE_EXPORT_ENTITIES_LINK = (
BASE_LINK + "/storage/browser/{bucket_name}/{export_name}?project={project_id}"
)
DATASTORE_ENTITIES_LINK = DATASTORE_BASE_LINK + "/entities/query/kind?project={project_id}"


class CloudDatastoreImportExportLink(BaseGoogleLink):
"""Helper class for constructing Cloud Datastore Import/Export Link"""

name = "Import/Export Page"
key = "import_export_conf"
format_str = DATASTORE_IMPORT_EXPORT_LINK

@staticmethod
def persist(
context: "Context",
task_instance,
):
task_instance.xcom_push(
context=context,
key=CloudDatastoreImportExportLink.key,
value={
"project_id": task_instance.project_id,
},
)


class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
"""Helper class for constructing Cloud Datastore Export Entities Link"""

name = "Export Entities"
key = "export_conf"
format_str = DATASTORE_EXPORT_ENTITIES_LINK

@staticmethod
def persist(
context: "Context",
task_instance,
output_url: str,
):
task_instance.xcom_push(
context=context,
key=CloudDatastoreExportEntitiesLink.key,
value={
"project_id": task_instance.project_id,
"bucket_name": task_instance.bucket,
"export_name": output_url.split('/')[3],
},
)


class CloudDatastoreEntitiesLink(BaseGoogleLink):
"""Helper class for constructing Cloud Datastore Entities Link"""

name = "Entities"
key = "entities_conf"
format_str = DATASTORE_ENTITIES_LINK

@staticmethod
def persist(
context: "Context",
task_instance,
):
task_instance.xcom_push(
context=context,
key=CloudDatastoreEntitiesLink.key,
value={
"project_id": task_instance.project_id,
},
)
17 changes: 17 additions & 0 deletions airflow/providers/google/cloud/operators/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.datastore import (
CloudDatastoreExportEntitiesLink,
CloudDatastoreImportExportLink,
CloudDatastoreEntitiesLink,
)

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -73,6 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
operator_extra_links = (CloudDatastoreExportEntitiesLink(),)

def __init__(
self,
Expand Down Expand Up @@ -132,6 +138,11 @@ def execute(self, context: 'Context') -> dict:
state = result['metadata']['common']['state']
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
CloudDatastoreExportEntitiesLink.persist(
context=context,
task_instance=self,
output_url=result['response']['outputUrl'],
)
return result


Expand Down Expand Up @@ -179,6 +190,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
operator_extra_links = (CloudDatastoreImportExportLink(),)

def __init__(
self,
Expand Down Expand Up @@ -231,6 +243,7 @@ def execute(self, context: 'Context'):
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')

CloudDatastoreImportExportLink.persist(context=context, task_instance=self)
return result


Expand Down Expand Up @@ -265,6 +278,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
"partial_keys",
"impersonation_chain",
)
operator_extra_links = (CloudDatastoreEntitiesLink(),)

def __init__(
self,
Expand Down Expand Up @@ -293,6 +307,7 @@ def execute(self, context: 'Context') -> list:
partial_keys=self.partial_keys,
project_id=self.project_id,
)
CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
return keys


Expand Down Expand Up @@ -389,6 +404,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
"body",
"impersonation_chain",
)
operator_extra_links = (CloudDatastoreEntitiesLink(),)

def __init__(
self,
Expand Down Expand Up @@ -417,6 +433,7 @@ def execute(self, context: 'Context') -> dict:
body=self.body,
project_id=self.project_id,
)
CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
return response


Expand Down
3 changes: 3 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,9 @@ extra-links:
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
- airflow.providers.google.common.links.storage.StorageLink

additional-extras:
Expand Down

0 comments on commit 43ded6c

Please sign in to comment.