Skip to content

Commit

Permalink
Workaround job race bug on biguery to gcs transfer (#24330)
Browse files Browse the repository at this point in the history
Fixes: #24277
  • Loading branch information
potiuk committed Jun 8, 2022
1 parent 29e10c5 commit 047a616
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
6 changes: 5 additions & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1894,7 +1894,8 @@ def run_extract(
field_delimiter: str = ',',
print_header: bool = True,
labels: Optional[Dict] = None,
) -> str:
return_full_job: bool = False,
) -> Union[str, BigQueryJob]:
"""
Executes a BigQuery extract command to copy data from BigQuery to
Google Cloud Storage. See here:
Expand All @@ -1915,6 +1916,7 @@ def run_extract(
:param print_header: Whether to print a header for a CSV file extract.
:param labels: a dictionary containing labels for the job/query,
passed to BigQuery
:param return_full_job: return full job instead of job id only
"""
warnings.warn(
"This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning
Expand Down Expand Up @@ -1953,6 +1955,8 @@ def run_extract(

job = self.insert_job(configuration=configuration, project_id=self.project_id)
self.running_job_id = job.job_id
if return_full_job:
return job
return job.job_id

def run_query(
Expand Down
7 changes: 3 additions & 4 deletions airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink

if TYPE_CHECKING:
Expand Down Expand Up @@ -115,17 +115,16 @@ def execute(self, context: 'Context'):
location=self.location,
impersonation_chain=self.impersonation_chain,
)
job_id = hook.run_extract(
job: BigQueryJob = hook.run_extract(
source_project_dataset_table=self.source_project_dataset_table,
destination_cloud_storage_uris=self.destination_cloud_storage_uris,
compression=self.compression,
export_format=self.export_format,
field_delimiter=self.field_delimiter,
print_header=self.print_header,
labels=self.labels,
return_full_job=True,
)

job = hook.get_job(job_id=job_id).to_api_repr()
conf = job["configuration"]["extract"]["sourceTable"]
dataset_id, project_id, table_id = conf["datasetId"], conf["projectId"], conf["tableId"]
BigQueryTableLink.persist(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ def test_execute(self, mock_hook):
field_delimiter=field_delimiter,
print_header=print_header,
labels=labels,
return_full_job=True,
)

0 comments on commit 047a616

Please sign in to comment.