Skip to content

Commit

Permalink
Add D401 fixes (#37348)
Browse files Browse the repository at this point in the history
  • Loading branch information
rawwar committed Feb 12, 2024
1 parent d43c804 commit 2d0d78b
Show file tree
Hide file tree
Showing 35 changed files with 93 additions and 130 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/links/vertex_ai.py
Expand Up @@ -109,7 +109,7 @@ class VertexAIModelExportLink(BaseGoogleLink):

@staticmethod
def extract_bucket_name(config):
"""Returns bucket name from output configuration."""
"""Return bucket name from output configuration."""
return config["artifact_destination"]["output_uri_prefix"].rpartition("gs://")[-1]

@staticmethod
Expand Down
10 changes: 5 additions & 5 deletions airflow/providers/google/cloud/log/stackdriver_task_handler.py
Expand Up @@ -174,7 +174,7 @@ def emit(self, record: logging.LogRecord) -> None:

def set_context(self, task_instance: TaskInstance) -> None:
"""
Configures the logger to add information with information about the current task.
Configure the logger to add information with information about the current task.
:param task_instance: Currently executed task
"""
Expand Down Expand Up @@ -224,7 +224,7 @@ def read(

def _prepare_log_filter(self, ti_labels: dict[str, str]) -> str:
"""
Prepares the filter that chooses which log entries to fetch.
Prepare the filter that chooses which log entries to fetch.
More information:
https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list#body.request_body.FIELDS.filter
Expand Down Expand Up @@ -258,7 +258,7 @@ def _read_logs(
self, log_filter: str, next_page_token: str | None, all_pages: bool
) -> tuple[str, bool, str | None]:
"""
Sends requests to the Stackdriver service and downloads logs.
Send requests to the Stackdriver service and downloads logs.
:param log_filter: Filter specifying the logs to be downloaded.
:param next_page_token: The token of the page from which the log download will start.
Expand Down Expand Up @@ -293,7 +293,7 @@ def _read_logs(

def _read_single_logs_page(self, log_filter: str, page_token: str | None = None) -> tuple[str, str]:
"""
Sends requests to the Stackdriver service and downloads single pages with logs.
Send requests to the Stackdriver service and downloads single pages with logs.
:param log_filter: Filter specifying the logs to be downloaded.
:param page_token: The token of the page to be downloaded. If None is passed, the first page will be
Expand Down Expand Up @@ -344,7 +344,7 @@ def _resource_path(self):

def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str:
"""
Creates an address for an external log collecting service.
Create an address for an external log collecting service.
:param task_instance: task instance object
:param try_number: task instance try_number to read logs from
Expand Down
10 changes: 5 additions & 5 deletions airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -322,7 +322,7 @@ def execute(self, context: Context):
self.log.info("Current state of job %s is %s", job.job_id, job.state)

def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -461,7 +461,7 @@ def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}")

def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -609,7 +609,7 @@ def execute(self, context: Context):
)

def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -1080,7 +1080,7 @@ def execute(self, context: Context):
)

def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -2890,7 +2890,7 @@ def execute(self, context: Any):
self._handle_job_error(job)

def execute_complete(self, context: Context, event: dict[str, Any]) -> str | None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/bigquery_dts.py
Expand Up @@ -392,7 +392,7 @@ def _job_is_done(state: TransferState) -> bool:
return state in finished_job_statuses

def execute_completed(self, context: Context, event: dict):
"""Method to be executed after invoked trigger in defer method finishes its job."""
"""Execute after invoked trigger in defer method finishes its job."""
if event["status"] in ("failed", "cancelled"):
self.log.error("Trigger finished its work with status: %s.", event["status"])
raise AirflowException(event["message"])
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/cloud_base.py
Expand Up @@ -28,7 +28,7 @@ class GoogleCloudBaseOperator(BaseOperator):

def __deepcopy__(self, memo):
"""
Updating the memo to fix the non-copyable global constant.
Update the memo to fix the non-copyable global constant.
This constant can be specified in operator parameters as a retry configuration to indicate a default.
See https://github.com/apache/airflow/issues/28751 for details.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/cloud_build.py
Expand Up @@ -1046,7 +1046,7 @@ def _reformat_storage_source(self) -> None:

def process_body(self) -> Build:
"""
Processes the body passed in the constructor.
Process the body passed in the constructor.
:return: the body.
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/cloud_sql.py
Expand Up @@ -1031,7 +1031,7 @@ def execute(self, context: Context) -> None:

def execute_complete(self, context, event=None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
Expand Down
Expand Up @@ -161,7 +161,7 @@ def _restrict_aws_credentials(self) -> None:

def validate_body(self) -> None:
"""
Validates the body.
Validate the body.
Checks if body specifies `transferSpec` if yes, then check if AWS credentials
are passed correctly and no more than 1 data source was selected.
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/dataflow.py
Expand Up @@ -716,7 +716,7 @@ def set_current_job(current_job):
)

def execute_complete(self, context: Context, event: dict[str, Any]):
"""Method which executes after trigger finishes its work."""
"""Execute after trigger finishes its work."""
if event["status"] in ("error", "stopped"):
self.log.info("status: %s, msg: %s", event["status"], event["message"])
raise AirflowException(event["message"])
Expand Down Expand Up @@ -906,7 +906,7 @@ def _append_uuid_to_job_name(self):
self.log.info("Job name was changed to %s", job_name)

def execute_complete(self, context: Context, event: dict):
"""Method which executes after trigger finishes its work."""
"""Execute after trigger finishes its work."""
if event["status"] in ("error", "stopped"):
self.log.info("status: %s, msg: %s", event["status"], event["message"])
raise AirflowException(event["message"])
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/datafusion.py
Expand Up @@ -872,7 +872,7 @@ def execute(self, context: Context) -> str:

def execute_complete(self, context: Context, event: dict[str, Any]):
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/operators/dataplex.py
Expand Up @@ -1003,7 +1003,7 @@ def execute(self, context: Context) -> str:

def execute_complete(self, context, event=None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Expand Down Expand Up @@ -1180,7 +1180,7 @@ def execute(self, context: Context) -> dict:

def execute_complete(self, context, event=None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Expand Down Expand Up @@ -1578,7 +1578,7 @@ def execute(self, context: Context) -> dict:

def execute_complete(self, context, event=None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Expand Down Expand Up @@ -1713,7 +1713,7 @@ def execute(self, context: Context) -> dict:

def execute_complete(self, context, event=None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
Expand Down
34 changes: 17 additions & 17 deletions airflow/providers/google/cloud/operators/dataproc.py
Expand Up @@ -541,7 +541,7 @@ def _build_cluster_data(self):

def make(self):
"""
Helper method for easier migration.
Act as a helper method for easier migration.
:return: Dict representing Dataproc cluster.
"""
Expand Down Expand Up @@ -859,7 +859,7 @@ def execute(self, context: Context) -> dict:

def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
Expand Down Expand Up @@ -1108,7 +1108,7 @@ def execute(self, context: Context) -> None:

def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> Any:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
Expand Down Expand Up @@ -1470,7 +1470,7 @@ def execute(self, context: Context):

def execute_complete(self, context, event=None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
Expand All @@ -1484,7 +1484,7 @@ def execute_complete(self, context, event=None) -> None:
return job_id

def on_kill(self) -> None:
"""Callback called when the operator is killed; cancel any running job."""
"""Act as a callback called when the operator is killed; cancel any running job."""
if self.dataproc_job_id:
self.hook.cancel_job(project_id=self.project_id, job_id=self.dataproc_job_id, region=self.region)

Expand Down Expand Up @@ -1587,7 +1587,7 @@ def __init__(

def generate_job(self):
"""
Helper method for easier migration to `DataprocSubmitJobOperator`.
Act as a helper method for easier migration to `DataprocSubmitJobOperator`.
:return: Dict representing Dataproc job
"""
Expand Down Expand Up @@ -1681,7 +1681,7 @@ def __init__(

def generate_job(self):
"""
Helper method for easier migration to `DataprocSubmitJobOperator`.
Act as a helper method for easier migration to `DataprocSubmitJobOperator`.
:return: Dict representing Dataproc job
"""
Expand Down Expand Up @@ -1774,7 +1774,7 @@ def __init__(

def generate_job(self):
"""
Helper method for easier migration to `DataprocSubmitJobOperator`.
Act as a helper method for easier migration to `DataprocSubmitJobOperator`.
:return: Dict representing Dataproc job
"""
Expand Down Expand Up @@ -1869,7 +1869,7 @@ def __init__(

def generate_job(self):
"""
Helper method for easier migration to `DataprocSubmitJobOperator`.
Act as a helper method for easier migration to `DataprocSubmitJobOperator`.
:return: Dict representing Dataproc job
"""
Expand Down Expand Up @@ -1959,7 +1959,7 @@ def __init__(
self.files = files

def generate_job(self):
"""Helper method for easier migration to `DataprocSubmitJobOperator`.
"""Act as a helper method for easier migration to `DataprocSubmitJobOperator`.
:return: Dict representing Dataproc job
"""
Expand Down Expand Up @@ -2073,7 +2073,7 @@ def __init__(
self.pyfiles = pyfiles

def generate_job(self):
"""Helper method for easier migration to :class:`DataprocSubmitJobOperator`.
"""Act as a helper method for easier migration to :class:`DataprocSubmitJobOperator`.
:return: Dict representing Dataproc job
"""
Expand Down Expand Up @@ -2305,7 +2305,7 @@ def execute(self, context: Context):
)

def execute_complete(self, context, event=None) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -2447,7 +2447,7 @@ def execute(self, context: Context):
)

def execute_complete(self, context, event=None) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -2601,7 +2601,7 @@ def execute(self, context: Context):
return self.job_id

def execute_complete(self, context, event=None) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -2757,7 +2757,7 @@ def execute(self, context: Context):

def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
"""
Callback for when the trigger fires - returns immediately.
Act as a callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
Expand Down Expand Up @@ -2887,7 +2887,7 @@ def execute(self, context: Context):
)

def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down Expand Up @@ -3085,7 +3085,7 @@ def execute(self, context: Context):
return Batch.to_dict(result)

def execute_complete(self, context, event=None) -> None:
"""Callback for when the trigger fires.
"""Act as a callback for when the trigger fires.
This returns immediately. It relies on trigger to throw an exception,
otherwise it assumes execution was successful.
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/functions.py
Expand Up @@ -305,13 +305,13 @@ def _verify_archive_url_and_zip_path(self) -> None:
)

def should_upload_function(self) -> bool:
"""Checks if function source should be uploaded."""
"""Check if function source should be uploaded."""
if self.upload_function is None:
raise AirflowException("validate() method has to be invoked before should_upload_function")
return self.upload_function

def preprocess_body(self) -> None:
"""Modifies sourceUploadUrl body field in special way when zip_path is not empty."""
"""Modify sourceUploadUrl body field in special way when zip_path is not empty."""
self._verify_archive_url_and_zip_path()
self._verify_upload_url_and_zip_path()
self._verify_upload_url_and_no_zip_path()
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/gcs.py
Expand Up @@ -331,7 +331,7 @@ def execute(self, context: Context) -> None:
hook.delete(bucket_name=self.bucket_name, object_name=object_name)

def get_openlineage_facets_on_complete(self, task_instance):
"""Implementing on_complete as execute() resolves object names."""
"""Implement on_complete as execute() resolves object names."""
from openlineage.client.facet import (
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
Expand Down Expand Up @@ -904,7 +904,7 @@ def execute(self, context: Context) -> list[str]:
return self._destination_object_names

def get_openlineage_facets_on_complete(self, task_instance):
"""Implementing on_complete as execute() resolves object names."""
"""Implement on_complete as execute() resolves object names."""
from openlineage.client.run import Dataset

from airflow.providers.openlineage.extractors import OperatorLineage
Expand Down

0 comments on commit 2d0d78b

Please sign in to comment.