Skip to content

Commit

Permalink
Change CloudDatastoreExportEntitiesLink to StorageLink
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksYermak authored and potiuk committed Apr 25, 2022
1 parent b3cc2f5 commit 544d658
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 29 deletions.
24 changes: 0 additions & 24 deletions airflow/providers/google/cloud/links/datastore.py
Expand Up @@ -52,30 +52,6 @@ def persist(
)


class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
"""Helper class for constructing Cloud Datastore Export Entities Link"""

name = "Export Entities"
key = "export_conf"
format_str = DATASTORE_EXPORT_ENTITIES_LINK

@staticmethod
def persist(
context: "Context",
task_instance,
output_url: str,
):
task_instance.xcom_push(
context=context,
key=CloudDatastoreExportEntitiesLink.key,
value={
"project_id": task_instance.project_id,
"bucket_name": task_instance.bucket,
"export_name": output_url.split('/')[3],
},
)


class CloudDatastoreEntitiesLink(BaseGoogleLink):
"""Helper class for constructing Cloud Datastore Entities Link"""

Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/operators/datastore.py
Expand Up @@ -25,9 +25,9 @@
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.datastore import (
CloudDatastoreEntitiesLink,
CloudDatastoreExportEntitiesLink,
CloudDatastoreImportExportLink,
)
from airflow.providers.google.common.links.storage import StorageLink

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -78,7 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
operator_extra_links = (StorageLink(),)

def __init__(
self,
Expand Down Expand Up @@ -138,10 +138,10 @@ def execute(self, context: 'Context') -> dict:
state = result['metadata']['common']['state']
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
CloudDatastoreExportEntitiesLink.persist(
StorageLink.persist(
context=context,
task_instance=self,
output_url=result['response']['outputUrl'],
uri=f"{self.bucket}/{result['response']['outputUrl'].split('/')[3]}",
)
return result

Expand Down
1 change: 0 additions & 1 deletion airflow/providers/google/provider.yaml
Expand Up @@ -905,7 +905,6 @@ extra-links:
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
- airflow.providers.google.common.links.storage.StorageLink

Expand Down

0 comments on commit 544d658

Please sign in to comment.