Skip to content

Commit

Permalink
Persist DataprocLink for workflow operators regardless of job status (#…
Browse files Browse the repository at this point in the history
…26986)

For DataprocInstantiateInlineWorkflowTemplateOperator and DataprocInstantiateWorkflowTemplateOperator, the dataproc link is available only for the jobs that have succeeded. Incase of Failure, the DataprocLink is not available
  • Loading branch information
vksunilk committed Nov 16, 2022
1 parent 7cfa1be commit 0cb6450
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 97 deletions.
18 changes: 10 additions & 8 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1679,12 +1679,13 @@ def execute(self, context: Context):
timeout=self.timeout,
metadata=self.metadata,
)
operation.result()
workflow_id = operation.operation.name.split("/")[-1]
self.workflow_id = operation.operation.name.split("/")[-1]
DataprocLink.persist(
context=context, task_instance=self, url=DATAPROC_WORKFLOW_LINK, resource=workflow_id
context=context, task_instance=self, url=DATAPROC_WORKFLOW_LINK, resource=self.workflow_id
)
self.log.info("Template instantiated.")
self.log.info("Template instantiated. Workflow Id : %s", self.workflow_id)
operation.result()
self.log.info("Workflow %s completed successfully", self.workflow_id)


class DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator):
Expand Down Expand Up @@ -1770,12 +1771,13 @@ def execute(self, context: Context):
timeout=self.timeout,
metadata=self.metadata,
)
operation.result()
workflow_id = operation.operation.name.split("/")[-1]
self.workflow_id = operation.operation.name.split("/")[-1]
DataprocLink.persist(
context=context, task_instance=self, url=DATAPROC_WORKFLOW_LINK, resource=workflow_id
context=context, task_instance=self, url=DATAPROC_WORKFLOW_LINK, resource=self.workflow_id
)
self.log.info("Template instantiated.")
self.log.info("Template instantiated. Workflow Id : %s", self.workflow_id)
operation.result()
self.log.info("Workflow %s completed successfully", self.workflow_id)


class DataprocSubmitJobOperator(BaseOperator):
Expand Down

0 comments on commit 0cb6450

Please sign in to comment.