Skip to content

Commit

Permalink
Fix MyPy errors for google.cloud.tasks (#20233)
Browse files Browse the repository at this point in the history
Part of #19891
  • Loading branch information
potiuk committed Dec 15, 2021
1 parent 77813b4 commit cdaa9a2
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
42 changes: 30 additions & 12 deletions airflow/providers/google/cloud/hooks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
self._client = None
self._client: Optional[CloudTasksClient] = None

def get_conn(self) -> CloudTasksClient:
"""
Expand All @@ -78,7 +78,7 @@ def get_conn(self) -> CloudTasksClient:
:return: Google Cloud Tasks API Client
:rtype: google.cloud.tasks_v2.CloudTasksClient
"""
if not self._client:
if self._client is None:
self._client = CloudTasksClient(credentials=self._get_credentials(), client_info=self.client_info)
return self._client

Expand Down Expand Up @@ -232,7 +232,10 @@ def get_queue(

full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.get_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
request={'name': full_queue_name},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -315,7 +318,10 @@ def delete_queue(

full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
client.delete_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
request={'name': full_queue_name},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -353,7 +359,10 @@ def purge_queue(

full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.purge_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
request={'name': full_queue_name},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -391,7 +400,10 @@ def pause_queue(

full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.pause_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
request={'name': full_queue_name},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -429,7 +441,10 @@ def resume_queue(

full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.resume_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
request={'name': full_queue_name},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand All @@ -440,7 +455,7 @@ def create_task(
task: Union[Dict, Task],
project_id: str,
task_name: Optional[str] = None,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
Expand Down Expand Up @@ -502,7 +517,7 @@ def get_task(
queue_name: str,
task_name: str,
project_id: str,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
Expand Down Expand Up @@ -549,7 +564,7 @@ def list_tasks(
location: str,
queue_name: str,
project_id: str,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
page_size: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
Expand Down Expand Up @@ -629,7 +644,10 @@ def delete_task(

full_task_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
client.delete_task(
request={'name': full_task_name}, retry=retry, timeout=timeout, metadata=metadata or ()
request={'name': full_task_name},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand All @@ -639,7 +657,7 @@ def run_task(
queue_name: str,
task_name: str,
project_id: str,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
Expand Down
10 changes: 5 additions & 5 deletions airflow/providers/google/cloud/operators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def __init__(
project_id: Optional[str] = None,
location: Optional[str] = None,
queue_name: Optional[str] = None,
update_mask: Union[Dict, FieldMask] = None,
update_mask: Optional[Union[Dict, FieldMask]] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
Expand Down Expand Up @@ -828,7 +828,7 @@ def __init__(
task: Union[Dict, Task],
project_id: Optional[str] = None,
task_name: Optional[str] = None,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
Expand Down Expand Up @@ -928,7 +928,7 @@ def __init__(
queue_name: str,
task_name: str,
project_id: Optional[str] = None,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
Expand Down Expand Up @@ -1025,7 +1025,7 @@ def __init__(
location: str,
queue_name: str,
project_id: Optional[str] = None,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
page_size: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
Expand Down Expand Up @@ -1213,7 +1213,7 @@ def __init__(
queue_name: str,
task_name: str,
project_id: Optional[str] = None,
response_view: Optional = None,
response_view: Optional[Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ pretty = True

[mypy-airflow.migrations.*]
ignore_errors = True
[mypy-google.cloud.tasks_v2.*]
no_implicit_optional = False

[isort]
line_length=110
Expand Down

0 comments on commit cdaa9a2

Please sign in to comment.