Skip to content

Commit

Permalink
[AIRFLOW-7082] Remove catch_http_exception decorator in GCP hooks (#7756
Browse files Browse the repository at this point in the history
)

* [AIRFLOW-7082] Remove catch_http_exception decorator in GCP hooks

This decorator unables to catch 409 or similar error from Google APIs

* fixup! [AIRFLOW-7082] Remove catch_http_exception decorator in GCP hooks

* fixup! fixup! [AIRFLOW-7082] Remove catch_http_exception decorator in GCP hooks

* fixup! fixup! fixup! [AIRFLOW-7082] Remove catch_http_exception decorator in GCP hooks
  • Loading branch information
turbaszek committed Mar 19, 2020
1 parent 97dbc1b commit ae854ca
Show file tree
Hide file tree
Showing 12 changed files with 10 additions and 180 deletions.
12 changes: 0 additions & 12 deletions airflow/providers/google/cloud/hooks/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def prediction_client(self) -> PredictionServiceClient:
credentials=self._get_credentials(), client_info=self.client_info
)

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def create_model(
self,
Expand Down Expand Up @@ -122,7 +121,6 @@ def create_model(
parent=parent, model=model, retry=retry, timeout=timeout, metadata=metadata
)

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def batch_predict(
self,
Expand Down Expand Up @@ -185,7 +183,6 @@ def batch_predict(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def predict(
self,
Expand Down Expand Up @@ -240,7 +237,6 @@ def predict(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def create_dataset(
self,
Expand Down Expand Up @@ -286,7 +282,6 @@ def create_dataset(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def import_data(
self,
Expand Down Expand Up @@ -337,7 +332,6 @@ def import_data(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def list_column_specs( # pylint: disable=too-many-arguments
self,
Expand Down Expand Up @@ -406,7 +400,6 @@ def list_column_specs( # pylint: disable=too-many-arguments
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def get_model(
self,
Expand Down Expand Up @@ -447,7 +440,6 @@ def get_model(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def delete_model(
self,
Expand Down Expand Up @@ -488,7 +480,6 @@ def delete_model(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def update_dataset(
self,
Expand Down Expand Up @@ -534,7 +525,6 @@ def update_dataset(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def deploy_model(
self,
Expand Down Expand Up @@ -650,7 +640,6 @@ def list_table_specs(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def list_datasets(
self,
Expand Down Expand Up @@ -691,7 +680,6 @@ def list_datasets(
)
return result

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def delete_dataset(
self,
Expand Down
37 changes: 1 addition & 36 deletions airflow/providers/google/cloud/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,10 @@
import google_auth_httplib2
import httplib2
import tenacity
from google.api_core.exceptions import (
AlreadyExists, Forbidden, GoogleAPICallError, ResourceExhausted, RetryError, TooManyRequests,
)
from google.api_core.exceptions import Forbidden, ResourceExhausted, TooManyRequests
from google.api_core.gapic_v1.client_info import ClientInfo
from google.auth import _cloud_sdk
from google.auth.environment_vars import CREDENTIALS
from googleapiclient.errors import HttpError
from googleapiclient.http import set_user_agent

from airflow import version
Expand Down Expand Up @@ -314,38 +311,6 @@ def decorator(fun: Callable):
)(fun)
return decorator

@staticmethod
def catch_http_exception(func: Callable[..., RT]) -> Callable[..., RT]:
"""
Function decorator that intercepts HTTP Errors and raises AirflowException
with more informative message.
"""

@functools.wraps(func)
def wrapper_decorator(self: CloudBaseHook, *args, **kwargs) -> RT:
try:
return func(self, *args, **kwargs)
except GoogleAPICallError as e:
if isinstance(e, AlreadyExists):
raise e
else:
self.log.error('The request failed:\n%s', str(e))
raise AirflowException(e)
except RetryError as e:
self.log.error('The request failed due to a retryable error and retry attempts failed.')
raise AirflowException(e)
except ValueError as e:
self.log.error('The request failed, the parameters are invalid.')
raise AirflowException(e)
except HttpError as e:
if hasattr(e, "content"):
self.log.error('The request failed:\n%s', e.content.decode(encoding="utf-8"))
else:
self.log.error('The request failed:\n%s', str(e))
raise AirflowException(e)

return wrapper_decorator

@staticmethod
def fallback_to_default_project_id(func: Callable[..., RT]) -> Callable[..., RT]:
"""
Expand Down
21 changes: 1 addition & 20 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ def table_exists(self, project_id: str, dataset_id: str, table_id: str) -> bool:
return False
raise

@CloudBaseHook.catch_http_exception
def create_empty_table(self, # pylint: disable=too-many-arguments
project_id: str,
dataset_id: str,
Expand Down Expand Up @@ -266,7 +265,6 @@ def create_empty_table(self, # pylint: disable=too-many-arguments
datasetId=dataset_id,
body=table_resource).execute(num_retries=num_retries)

@CloudBaseHook.catch_http_exception
def create_empty_dataset(self,
dataset_id: str = "",
project_id: str = "",
Expand Down Expand Up @@ -341,7 +339,6 @@ def create_empty_dataset(self,
projectId=dataset_project_id,
body=dataset_reference).execute(num_retries=self.num_retries)

@CloudBaseHook.catch_http_exception
def get_dataset_tables(self, dataset_id: str, project_id: Optional[str] = None,
max_results: Optional[int] = None,
page_token: Optional[str] = None) -> Dict[str, Union[str, int, List]]:
Expand Down Expand Up @@ -377,7 +374,6 @@ def get_dataset_tables(self, dataset_id: str, project_id: Optional[str] = None,
datasetId=dataset_id,
**optional_params).execute(num_retries=self.num_retries))

@CloudBaseHook.catch_http_exception
def delete_dataset(self, project_id: str, dataset_id: str, delete_contents: bool = False) -> None:
"""
Delete a dataset of Big query in your project.
Expand Down Expand Up @@ -411,7 +407,6 @@ def delete_dataset(self, project_id: str, dataset_id: str, delete_contents: bool
'BigQuery job failed. Error was: {}'.format(err.content)
)

@CloudBaseHook.catch_http_exception
def create_external_table(self, # pylint: disable=too-many-locals,too-many-arguments
external_project_dataset_table: str,
schema_fields: List,
Expand Down Expand Up @@ -618,7 +613,6 @@ def create_external_table(self, # pylint: disable=too-many-locals,too-many-argu
self.log.info('External table created successfully: %s',
external_project_dataset_table)

@CloudBaseHook.catch_http_exception
def patch_table(self, # pylint: disable=too-many-arguments
dataset_id: str,
table_id: str,
Expand Down Expand Up @@ -731,7 +725,6 @@ def patch_table(self, # pylint: disable=too-many-arguments
self.log.info('Table patched successfully: %s:%s.%s',
project_id, dataset_id, table_id)

@CloudBaseHook.catch_http_exception
def insert_all(self, project_id: str, dataset_id: str, table_id: str,
rows: List, ignore_unknown_values: bool = False,
skip_invalid_rows: bool = False, fail_on_error: bool = False) -> None:
Expand Down Expand Up @@ -803,7 +796,6 @@ def insert_all(self, project_id: str, dataset_id: str, table_id: str,
)
self.log.info(error_msg)

@CloudBaseHook.catch_http_exception
def update_dataset(self, dataset_id: str,
dataset_resource: Dict, project_id: Optional[str] = None) -> Dict:
"""
Expand Down Expand Up @@ -846,7 +838,6 @@ def update_dataset(self, dataset_id: str,

return dataset

@CloudBaseHook.catch_http_exception
def patch_dataset(self, dataset_id: str, dataset_resource: str, project_id: Optional[str] = None) -> Dict:
"""
Patches information in an existing dataset.
Expand Down Expand Up @@ -888,7 +879,6 @@ def patch_dataset(self, dataset_id: str, dataset_resource: str, project_id: Opti

return dataset

@CloudBaseHook.catch_http_exception
def get_dataset_tables_list(self, dataset_id, project_id=None, table_prefix=None, max_results=None):
"""
Method returns tables list of a BigQuery dataset. If table prefix is specified,
Expand Down Expand Up @@ -951,7 +941,6 @@ def get_dataset_tables_list(self, dataset_id, project_id=None, table_prefix=None

return dataset_tables_list

@CloudBaseHook.catch_http_exception
def get_datasets_list(self, project_id: Optional[str] = None) -> List:
"""
Method returns full list of BigQuery datasets in the current project
Expand Down Expand Up @@ -996,7 +985,6 @@ def get_datasets_list(self, project_id: Optional[str] = None) -> List:

return datasets_list

@CloudBaseHook.catch_http_exception
def get_dataset(self, dataset_id: str, project_id: Optional[str] = None) -> Dict:
"""
Method returns dataset_resource if dataset exist
Expand Down Expand Up @@ -1025,7 +1013,6 @@ def get_dataset(self, dataset_id: str, project_id: Optional[str] = None) -> Dict

return dataset_resource

@CloudBaseHook.catch_http_exception
def run_grant_dataset_view_access(self,
source_dataset: str,
view_dataset: str,
Expand Down Expand Up @@ -1090,7 +1077,6 @@ def run_grant_dataset_view_access(self,
view_project, view_dataset, view_table, source_project, source_dataset)
return source_dataset_resource

@CloudBaseHook.catch_http_exception
def run_table_upsert(self, dataset_id: str, table_resource: Dict,
project_id: Optional[str] = None) -> Dict:
"""
Expand Down Expand Up @@ -1142,7 +1128,6 @@ def run_table_upsert(self, dataset_id: str, table_resource: Dict,
datasetId=dataset_id,
body=table_resource).execute(num_retries=self.num_retries)

@CloudBaseHook.catch_http_exception
def run_table_delete(self, deletion_dataset_table: str,
ignore_if_missing: bool = False) -> None:
"""
Expand Down Expand Up @@ -1180,7 +1165,6 @@ def run_table_delete(self, deletion_dataset_table: str,
else:
raise e

@CloudBaseHook.catch_http_exception
def get_tabledata(self, dataset_id: str, table_id: str,
max_results: Optional[int] = None, selected_fields: Optional[str] = None,
page_token: Optional[str] = None, start_index: Optional[int] = None) -> Dict:
Expand Down Expand Up @@ -1216,7 +1200,6 @@ def get_tabledata(self, dataset_id: str, table_id: str,
tableId=table_id,
**optional_params).execute(num_retries=self.num_retries))

@CloudBaseHook.catch_http_exception
def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] = None) -> Dict:
"""
Get the schema for a given datset.table.
Expand All @@ -1238,7 +1221,6 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
)
return tables_resource['schema']

@CloudBaseHook.catch_http_exception
def poll_job_complete(self, job_id: str) -> bool:
"""
Check if jobs completed.
Expand Down Expand Up @@ -1268,7 +1250,6 @@ def poll_job_complete(self, job_id: str) -> bool:
raise err
return False

@CloudBaseHook.catch_http_exception
def cancel_query(self) -> None:
"""
Cancel all started queries that have not yet completed
Expand Down Expand Up @@ -2304,7 +2285,7 @@ def cancel_query(self, *args, **kwargs) -> None:
"This method is deprecated. "
"Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.cancel_query`",
DeprecationWarning, stacklevel=3)
return self.hook.cancel_query(*args, **kwargs)
return self.hook.cancel_query(*args, **kwargs) # type: ignore # noqa

def run_with_configuration(self, *args, **kwargs) -> str:
"""
Expand Down
4 changes: 0 additions & 4 deletions airflow/providers/google/cloud/hooks/bigquery_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def get_conn(self) -> DataTransferServiceClient:
)
return self._conn

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def create_transfer_config(
self,
Expand Down Expand Up @@ -139,7 +138,6 @@ def create_transfer_config(
metadata=metadata,
)

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def delete_transfer_config(
self,
Expand Down Expand Up @@ -178,7 +176,6 @@ def delete_transfer_config(
name=name, retry=retry, timeout=timeout, metadata=metadata
)

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def start_manual_transfer_runs(
self,
Expand Down Expand Up @@ -236,7 +233,6 @@ def start_manual_transfer_runs(
metadata=metadata,
)

@CloudBaseHook.catch_http_exception
@CloudBaseHook.fallback_to_default_project_id
def get_transfer_run(
self,
Expand Down

0 comments on commit ae854ca

Please sign in to comment.