Skip to content

Commit

Permalink
Use python client in BQ hook create_empty_table/dataset and table_exi…
Browse files Browse the repository at this point in the history
…sts (#8377)

* Use python client in BQ hook create_empty_table method

* Refactor table_exists and create_empty_dataset

* Add note in UPDATING
  • Loading branch information
turbaszek committed Apr 22, 2020
1 parent 93ea058 commit 57c8c05
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 258 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Expand Up @@ -62,6 +62,14 @@ https://developers.google.com/style/inclusive-documentation
-->

### Changes in BigQueryHook
- `create_empty_table` method accepts now `table_resource` parameter. If provided all
other parameters are ignored.
- `create_empty_dataset` will now use values from `dataset_reference` instead of raising error
if parameters were passed in `dataset_reference` and as arguments to method. Additionally validation
of `dataset_reference` is done using `Dataset.from_api_repr`. Exception and log messages has been
changed.

### Added mypy plugin to preserve types of decorated functions

Mypy currently doesn't support precise type information for decorated
Expand Down
Expand Up @@ -219,10 +219,10 @@
# [START howto_operator_bigquery_create_view]
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=LOCATION_DATASET_NAME,
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": "SELECT * FROM `{}.test_table`".format(DATASET_NAME),
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False
}
)
Expand Down
171 changes: 81 additions & 90 deletions airflow/providers/google/cloud/hooks/bigquery.py
Expand Up @@ -26,6 +26,9 @@
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Tuple, Type, Union

from google.api_core.retry import Retry
from google.cloud.bigquery import DEFAULT_RETRY, Client, Dataset, Table
from google.cloud.exceptions import NotFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pandas import DataFrame
Expand All @@ -38,6 +41,7 @@
from airflow.exceptions import AirflowException
from airflow.hooks.dbapi_hook import DbApiHook
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -136,7 +140,8 @@ def get_pandas_df(
verbose=False,
credentials=credentials)

def table_exists(self, project_id: str, dataset_id: str, table_id: str) -> bool:
@GoogleBaseHook.fallback_to_default_project_id
def table_exists(self, dataset_id: str, table_id: str, project_id: str) -> bool:
"""
Checks for the existence of a table in Google BigQuery.
Expand All @@ -150,28 +155,30 @@ def table_exists(self, project_id: str, dataset_id: str, table_id: str) -> bool:
:param table_id: The name of the table to check the existence of.
:type table_id: str
"""
service = self.get_service()
table_reference = f"{project_id}.{dataset_id}.{table_id}"

try:
service.tables().get( # pylint: disable=no-member
projectId=project_id, datasetId=dataset_id,
tableId=table_id).execute(num_retries=self.num_retries)
Client(client_info=self.client_info).get_table(table_reference)
return True
except HttpError as e:
if e.resp['status'] == '404':
return False
raise

def create_empty_table(self, # pylint: disable=too-many-arguments
project_id: str,
dataset_id: str,
table_id: str,
schema_fields: Optional[List] = None,
time_partitioning: Optional[Dict] = None,
cluster_fields: Optional[List[str]] = None,
labels: Optional[Dict] = None,
view: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
num_retries: int = 5) -> None:
except NotFound:
return False

@GoogleBaseHook.fallback_to_default_project_id
def create_empty_table( # pylint: disable=too-many-arguments
self,
project_id: str,
dataset_id: str,
table_id: str,
table_resource: Optional[Dict[str, Any]] = None,
schema_fields: Optional[List] = None,
time_partitioning: Optional[Dict] = None,
cluster_fields: Optional[List[str]] = None,
labels: Optional[Dict] = None,
view: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
retry: Optional[Retry] = DEFAULT_RETRY,
num_retries: Optional[int] = None
) -> None:
"""
Creates a new, empty table in the dataset.
To create a view, which is defined by a SQL query, parse a dictionary to 'view' kwarg
Expand All @@ -182,11 +189,17 @@ def create_empty_table(self, # pylint: disable=too-many-arguments
:type dataset_id: str
:param table_id: The Name of the table to be created.
:type table_id: str
:param table_resource: Table resource as described in documentation:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table
If provided all other parameters are ignored.
:type table_resource: Dict[str, Any]
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
:type schema_fields: list
:param labels: a dictionary containing labels for the table, passed to BigQuery
:type labels: dict
:param retry: Optional. How to retry the RPC.
:type retry: google.api_core.retry.Retry
**Example**: ::
Expand Down Expand Up @@ -227,119 +240,97 @@ def create_empty_table(self, # pylint: disable=too-many-arguments
:type num_retries: int
:return: None
"""
service = self.get_service()

project_id = project_id if project_id is not None else self.project_id
if num_retries:
warnings.warn("Parameter `num_retries` is deprecated", DeprecationWarning)

table_resource = {
_table_resource: Dict[str, Any] = {
'tableReference': {
'tableId': table_id
'tableId': table_id,
'projectId': project_id,
'datasetId': dataset_id,
}
} # type: Dict[str, Any]
}

if self.location:
table_resource['location'] = self.location
_table_resource['location'] = self.location

if schema_fields:
table_resource['schema'] = {'fields': schema_fields}
_table_resource['schema'] = {'fields': schema_fields}

if time_partitioning:
table_resource['timePartitioning'] = time_partitioning
_table_resource['timePartitioning'] = time_partitioning

if cluster_fields:
table_resource['clustering'] = {
_table_resource['clustering'] = {
'fields': cluster_fields
}

if labels:
table_resource['labels'] = labels
_table_resource['labels'] = labels

if view:
table_resource['view'] = view
_table_resource['view'] = view

if encryption_configuration:
table_resource["encryptionConfiguration"] = encryption_configuration
_table_resource["encryptionConfiguration"] = encryption_configuration

num_retries = num_retries if num_retries else self.num_retries

service.tables().insert( # pylint: disable=no-member
projectId=project_id,
datasetId=dataset_id,
body=table_resource).execute(num_retries=num_retries)
table_resource = table_resource or _table_resource
table = Table.from_api_repr(table_resource)
Client(client_info=self.client_info).create_table(table=table, exists_ok=True, retry=retry)

@GoogleBaseHook.fallback_to_default_project_id
def create_empty_dataset(self,
dataset_id: str = "",
project_id: str = "",
dataset_id: Optional[str] = None,
project_id: Optional[str] = None,
location: Optional[str] = None,
dataset_reference: Optional[Dict] = None) -> None:
dataset_reference: Optional[Dict[str, Any]] = None) -> None:
"""
Create a new empty dataset:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
:param project_id: The name of the project where we want to create
an empty a dataset. Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:param dataset_id: The id of dataset. Don't need to provide, if datasetId in dataset_reference.
:type dataset_id: str
:param location: (Optional) The geographic location where the dataset should reside.
There is no default value but the dataset will be created in US if nothing is provided.
:type location: str
:param dataset_reference: Dataset reference that could be provided
with request body. More info:
:param dataset_reference: Dataset reference that could be provided with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict
"""
service = self.get_service()

if dataset_reference:
_validate_value('dataset_reference', dataset_reference, dict)
else:
dataset_reference = {}

if "datasetReference" not in dataset_reference:
dataset_reference["datasetReference"] = {}
dataset_reference = dataset_reference or {"datasetReference": {}}

if self.location:
dataset_reference['location'] = dataset_reference.get('location') or self.location

if not dataset_reference["datasetReference"].get("datasetId") and not dataset_id:
raise ValueError(
"dataset_id not provided and datasetId not exist in the datasetReference. "
"Impossible to create dataset")

dataset_required_params = [(dataset_id, "datasetId", ""),
(project_id, "projectId", self.project_id)]
for param_tuple in dataset_required_params:
param, param_name, param_default = param_tuple
if param_name not in dataset_reference['datasetReference']:
if param_default and not param:
for param, value in zip(["datasetId", "projectId"], [dataset_id, project_id]):
specified_param = dataset_reference["datasetReference"].get(param)
if specified_param:
if value:
self.log.info(
"%s was not specified. Will be used default value %s.",
param_name, param_default
"`%s` was provided in both `dataset_reference` and as `%s`. "
"Using value from `dataset_reference`",
param, convert_camel_to_snake(param)
)
param = param_default
dataset_reference['datasetReference'].update(
{param_name: param})
elif param:
_api_resource_configs_duplication_check(
param_name, param,
dataset_reference['datasetReference'], 'dataset_reference')
continue # use specified value
if not value:
raise ValueError(
f"Please specify `{param}` either in `dataset_reference` "
f"or by providing `{convert_camel_to_snake(param)}`",
)
# dataset_reference has no param but we can fallback to default value
self.log.info(
"%s was not specified in `dataset_reference`. Will use default value %s.",
param, value
)
dataset_reference["datasetReference"][param] = value

location = location or self.location
if location:
if 'location' not in dataset_reference:
dataset_reference.update({'location': location})
else:
_api_resource_configs_duplication_check(
'location', location,
dataset_reference, 'dataset_reference')
dataset_reference["location"] = dataset_reference.get("location", location)

dataset_id = dataset_reference.get("datasetReference").get("datasetId") # type: ignore
dataset_project_id = dataset_reference.get("datasetReference").get("projectId") # type: ignore

service.datasets().insert( # pylint: disable=no-member
projectId=dataset_project_id,
body=dataset_reference).execute(num_retries=self.num_retries)
dataset = Dataset.from_api_repr(dataset_reference)
Client(client_info=self.client_info).create_dataset(dataset=dataset, exists_ok=True)

def get_dataset_tables(self, dataset_id: str, project_id: Optional[str] = None,
max_results: Optional[int] = None,
Expand Down

0 comments on commit 57c8c05

Please sign in to comment.