Skip to content

Commit

Permalink
Upgrade ruff to 0.0.262 (#30809)
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk committed Apr 22, 2023
1 parent 676a95b commit c585ad5
Show file tree
Hide file tree
Showing 30 changed files with 46 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Expand Up @@ -179,7 +179,7 @@ repos:
# Since ruff makes use of multiple cores we _purposefully_ don't run this in docker so it can use the
# host CPU to it's fullest
entry: ruff --fix --no-update-check --force-exclude
additional_dependencies: ['ruff==0.0.226']
additional_dependencies: ['ruff==0.0.262']
files: \.pyi?$
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py
- repo: https://github.com/asottile/blacken-docs
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/pod.py
Expand Up @@ -29,7 +29,7 @@

with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources # noqa: autoflake
from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources

warnings.warn(
"This module is deprecated. Please use `kubernetes.client.models` for `V1ResourceRequirements` and `Port`.",
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Expand Up @@ -87,7 +87,7 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
:param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transforms a dynamodb item to bytes. By default we dump the json
""" # noqa: E501
"""

template_fields: Sequence[str] = (
*AwsToAwsBaseOperator.template_fields,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/triggers/redshift_cluster.py
Expand Up @@ -55,7 +55,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
hook = RedshiftAsyncHook(aws_conn_id=self.aws_conn_id)
while self.attempts >= 1:
self.attempts = self.attempts - 1
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/kafka/hooks/base.py
Expand Up @@ -58,7 +58,7 @@ def _get_client(self, config):

@cached_property
def get_conn(self) -> Any:
"""get the configuration object"""
"""Get the configuration object"""
config = self.get_connection(self.kafka_config_id).extra_dejson

if not (config.get("bootstrap.servers", None)):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/kafka/hooks/client.py
Expand Up @@ -41,7 +41,7 @@ def create_topic(
self,
topics: Sequence[Sequence[Any]],
) -> None:
"""creates a topic
"""Creates a topic
:param topics: a list of topics to create including the number of partitions for the topic
and the replication factor. Format: [ ("topic_name", number of partitions, replication factor)]
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/livy/triggers/livy.py
Expand Up @@ -78,7 +78,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Checks if the _polling_interval > 0, in that case it pools Livy for
batch termination asynchronously.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/triggers/pod.py
Expand Up @@ -116,7 +116,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current pod status and yields a TriggerEvent"""
hook = self._get_async_hook()
self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/dbt/cloud/operators/dbt.py
Expand Up @@ -173,7 +173,7 @@ def execute(self, context: Context):
)
return self.run_id

def execute_complete(self, context: "Context", event: dict[str, Any]) -> int:
def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/dbt/cloud/triggers/dbt.py
Expand Up @@ -64,7 +64,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Dbt, polls for the pipeline run status"""
hook = DbtCloudHook(self.conn_id)
try:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigtable.py
Expand Up @@ -248,7 +248,7 @@ def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None
"""
instance = self.get_instance(instance_id=instance_id, project_id=project_id)
if instance is None:
raise RuntimeError("Instance %s did not exist; unable to delete table %s" % instance_id, table_id)
raise RuntimeError(f"Instance {instance_id} did not exist; unable to delete table {table_id}")
table = instance.table(table_id=table_id)
table.delete()

Expand Down
14 changes: 7 additions & 7 deletions airflow/providers/google/cloud/triggers/bigquery.py
Expand Up @@ -71,7 +71,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -122,7 +122,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -181,7 +181,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent with response data"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -286,7 +286,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -414,7 +414,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -487,7 +487,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
def _get_async_hook(self) -> BigQueryTableAsyncHook:
return BigQueryTableAsyncHook(gcp_conn_id=self.gcp_conn_id)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Will run until the table exists in the Google Big Query."""
while True:
try:
Expand Down Expand Up @@ -562,7 +562,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Will run until the table exists in the Google Big Query."""
hook = BigQueryAsyncHook(gcp_conn_id=self.gcp_conn_id)
job_id = None
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/cloud_build.py
Expand Up @@ -75,7 +75,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current build execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/datafusion.py
Expand Up @@ -80,7 +80,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current pipeline status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/triggers/dataproc.py
Expand Up @@ -143,7 +143,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
while True:
cluster = await self.get_async_hook().get_cluster(
project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
Expand Down Expand Up @@ -261,7 +261,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Wait until cluster is deleted completely"""
while self.end_time > time.time():
try:
Expand Down Expand Up @@ -309,7 +309,7 @@ def serialize(self):
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
hook = self.get_async_hook()
while True:
try:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/triggers/gcs.py
Expand Up @@ -66,8 +66,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
"""loop until the relevant file/folder is found."""
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Loop until the relevant file/folder is found."""
try:
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -144,7 +144,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Loop until the object updated time is greater than target datetime"""
try:
hook = self._get_async_hook()
Expand Down
Expand Up @@ -157,7 +157,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets operation status and yields corresponding event."""
hook = self._get_hook()
while True:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/mlengine.py
Expand Up @@ -88,7 +88,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/http/hooks/http.py
Expand Up @@ -306,7 +306,7 @@ async def run(
data: dict[str, Any] | str | None = None,
headers: dict[str, Any] | None = None,
extra_options: dict[str, Any] | None = None,
) -> "ClientResponse":
) -> ClientResponse:
r"""
Performs an asynchronous HTTP request call
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/triggers/data_factory.py
Expand Up @@ -67,7 +67,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline run status"""
hook = AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
try:
Expand Down Expand Up @@ -140,7 +140,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline run status"""
hook = AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
try:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/triggers/wasb.py
Expand Up @@ -63,7 +63,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Makes async connection to Azure WASB and polls for existence of the given blob name."""
blob_exists = False
hook = WasbAsyncHook(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read)
Expand Down Expand Up @@ -138,7 +138,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Makes async connection to Azure WASB and polls for existence of a blob with given prefix."""
prefix_exists = False
hook = WasbAsyncHook(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read)
Expand Down
2 changes: 1 addition & 1 deletion airflow/stats.py
Expand Up @@ -301,7 +301,7 @@ def wrapper(
if stat is not None and tags is not None:
for k, v in tags.items():
if self.metric_tags_validator.test(k):
if all((c not in [",", "="] for c in v + k)):
if all(c not in [",", "="] for c in v + k):
stat += f",{k}={v}"
else:
log.error("Dropping invalid tag: %s=%s.", k, v)
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/base.py
Expand Up @@ -60,7 +60,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
raise NotImplementedError("Triggers must implement serialize()")

@abc.abstractmethod
async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Runs the trigger in an asynchronous context.
Expand Down
4 changes: 2 additions & 2 deletions airflow/triggers/external_task.py
Expand Up @@ -72,7 +72,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""
Checks periodically in the database to see if the task exists, and has
hit one of the states yet, or not.
Expand Down Expand Up @@ -136,7 +136,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""
Checks periodically in the database to see if the dag run exists, and has
hit one of the states yet, or not.
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/file.py
Expand Up @@ -58,7 +58,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""Loop until the relevant files are found."""
while True:
for path in glob(self.filepath, recursive=self.recursive):
Expand Down
2 changes: 1 addition & 1 deletion docs/exts/exampleinclude.py
Expand Up @@ -36,7 +36,7 @@
from sphinx.util.nodes import set_source_info

try:
import sphinx_airflow_theme # noqa: autoflake
import sphinx_airflow_theme

airflow_theme_is_available = True
except ImportError:
Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/pre_commit/pre_commit_insert_extras.py
Expand Up @@ -27,8 +27,8 @@
sys.path.insert(0, str(AIRFLOW_SOURCES_DIR)) # make sure setup is imported from Airflow
# flake8: noqa: F401

from common_precommit_utils import insert_documentation # isort: skip # noqa E402
from setup import EXTRAS_DEPENDENCIES # isort:skip # noqa
from common_precommit_utils import insert_documentation # isort: skip
from setup import EXTRAS_DEPENDENCIES # isort:skip

sys.path.append(str(AIRFLOW_SOURCES_DIR))

Expand Down
6 changes: 3 additions & 3 deletions scripts/ci/pre_commit/pre_commit_local_yml_mounts.py
Expand Up @@ -22,16 +22,16 @@

sys.path.insert(0, str(Path(__file__).parent.resolve())) # make sure common_precommit_utils is imported

from common_precommit_utils import AIRFLOW_SOURCES_ROOT_PATH # isort: skip # noqa E402
from common_precommit_utils import AIRFLOW_SOURCES_ROOT_PATH # isort: skip

sys.path.insert(0, str(AIRFLOW_SOURCES_ROOT_PATH)) # make sure setup is imported from Airflow
sys.path.insert(
0, str(AIRFLOW_SOURCES_ROOT_PATH / "dev" / "breeze" / "src")
) # make sure setup is imported from Airflow
# flake8: noqa: F401
from airflow_breeze.utils.docker_command_utils import VOLUMES_FOR_SELECTED_MOUNTS # isort: skip # noqa E402
from airflow_breeze.utils.docker_command_utils import VOLUMES_FOR_SELECTED_MOUNTS # isort: skip

from common_precommit_utils import insert_documentation # isort: skip # noqa E402
from common_precommit_utils import insert_documentation # isort: skip

sys.path.append(str(AIRFLOW_SOURCES_ROOT_PATH))

Expand Down
2 changes: 1 addition & 1 deletion tests/cli/conftest.py
Expand Up @@ -27,7 +27,7 @@
from tests.test_utils.config import conf_vars

# Create custom executors here because conftest is imported first
custom_executor_module = type(sys)("custom_executor") # noqa
custom_executor_module = type(sys)("custom_executor")
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
)
Expand Down
Expand Up @@ -40,7 +40,7 @@
See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.cluster
* CBT_TABLE_ID - desired ID of the Table
* CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check
""" # noqa: E501
"""
from __future__ import annotations

import os
Expand Down

0 comments on commit c585ad5

Please sign in to comment.