Skip to content

Commit

Permalink
Make generated job_id more informative in BQ insert_job (#9203)
Browse files Browse the repository at this point in the history
* Make generated job_id more informative in BQ insert_job

* fixup! Make generated job_id more informative in BQ insert_job

* fixup! fixup! Make generated job_id more informative in BQ insert_job

* fixup! fixup! fixup! Make generated job_id more informative in BQ insert_job
  • Loading branch information
turbaszek committed Jun 10, 2020
1 parent 7f02e56 commit a26afbf
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"""
import logging
import time
import uuid
import warnings
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Type, Union
Expand Down Expand Up @@ -1453,8 +1452,9 @@ def insert_job(
:param location: location the job is running
:type location: str
"""
job_id = job_id or str(uuid.uuid4())
location = location or self.location
job_id = job_id or f"airflow_{int(time.time())}"

client = self.get_client(project_id=project_id, location=location)
job_data = {
"configuration": configuration,
Expand Down
7 changes: 4 additions & 3 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import enum
import json
import warnings
from time import sleep
from time import sleep, time
from typing import Any, Dict, Iterable, List, Optional, SupportsAbs, Union

import attr
Expand Down Expand Up @@ -1630,20 +1630,21 @@ def execute(self, context: Any):
delegate_to=self.delegate_to,
)

job_id = self.job_id or f"airflow_{self.task_id}_{int(time())}"
try:
job = hook.insert_job(
configuration=self.configuration,
project_id=self.project_id,
location=self.location,
job_id=self.job_id,
job_id=job_id,
)
# Start the job and wait for it to complete and get the result.
job.result()
except Conflict:
job = hook.get_job(
project_id=self.project_id,
location=self.location,
job_id=self.job_id,
job_id=job_id,
)
# Get existing job and wait for it to be ready
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
Expand Down

0 comments on commit a26afbf

Please sign in to comment.