Skip to content

Commit

Permalink
D400 first line should end with period batch02 (#25268)
Browse files Browse the repository at this point in the history
  • Loading branch information
edithturn committed Sep 9, 2022
1 parent 4178545 commit 5066844
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 43 deletions.
4 changes: 1 addition & 3 deletions airflow/executors/executor_loader.py
Expand Up @@ -65,7 +65,7 @@ class ExecutorLoader:

@classmethod
def get_default_executor(cls) -> "BaseExecutor":
"""Creates a new instance of the configured executor if none exists and returns it"""
"""Creates a new instance of the configured executor if none exists and returns it."""
if cls._default_executor is not None:
return cls._default_executor

Expand Down Expand Up @@ -134,7 +134,6 @@ def import_executor_cls(cls, executor_name: str) -> Tuple[Type["BaseExecutor"],

@classmethod
def __load_celery_kubernetes_executor(cls) -> "BaseExecutor":
""":return: an instance of CeleryKubernetesExecutor"""
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

Expand All @@ -143,7 +142,6 @@ def __load_celery_kubernetes_executor(cls) -> "BaseExecutor":

@classmethod
def __load_local_kubernetes_executor(cls) -> "BaseExecutor":
""":return: an instance of LocalKubernetesExecutor"""
local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

Expand Down
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

"""Merge migrations Heads
"""Merge migrations Heads.
Revision ID: 03bc53e68815
Revises: 0a2a5b66e19d, bf00311e1990
Expand Down
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.

"""Update dag.default_view to grid
"""Update dag.default_view to grid.
Revision ID: b1b348e02d07
Revises: 75d5ed6c2b43
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/transfers/redshift_to_s3.py
Expand Up @@ -29,7 +29,7 @@

class RedshiftToS3Operator(BaseOperator):
"""
Executes an UNLOAD command to s3 as a CSV with headers
Execute an UNLOAD command to s3 as a CSV with headers.
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
24 changes: 14 additions & 10 deletions airflow/providers/databricks/hooks/databricks_base.py
Expand Up @@ -169,8 +169,7 @@ async def __aexit__(self, *err):
@staticmethod
def _parse_host(host: str) -> str:
"""
The purpose of this function is to be robust to improper connections
settings provided by users, specifically in the host field.
This function is resistant to incorrect connection settings provided by users, in the host field.
For example -- when users supply ``https://xx.cloud.databricks.com`` as the
host, we must strip out the protocol to get the host.::
Expand All @@ -195,21 +194,23 @@ def _parse_host(host: str) -> str:

def _get_retry_object(self) -> Retrying:
"""
Instantiates a retry object
Instantiate a retry object.
:return: instance of Retrying class
"""
return Retrying(**self.retry_args)

def _a_get_retry_object(self) -> AsyncRetrying:
"""
Instantiates an async retry object
Instantiate an async retry object.
:return: instance of AsyncRetrying class
"""
return AsyncRetrying(**self.retry_args)

def _get_aad_token(self, resource: str) -> str:
"""
Function to get AAD token for given resource. Supports managed identity or service principal auth
Function to get AAD token for given resource.
Supports managed identity or service principal auth.
:param resource: resource to issue token to
:return: AAD token, or raise an exception
"""
Expand Down Expand Up @@ -340,7 +341,7 @@ async def _a_get_aad_token(self, resource: str) -> str:

def _get_aad_headers(self) -> dict:
"""
Fills AAD headers if necessary (SPN is outside of the workspace)
Fill AAD headers if necessary (SPN is outside of the workspace).
:return: dictionary with filled AAD headers
"""
headers = {}
Expand Down Expand Up @@ -369,7 +370,8 @@ async def _a_get_aad_headers(self) -> dict:
@staticmethod
def _is_aad_token_valid(aad_token: dict) -> bool:
"""
Utility function to check AAD token hasn't expired yet
Utility function to check AAD token hasn't expired yet.
:param aad_token: dict with properties of AAD token
:return: true if token is valid, false otherwise
:rtype: bool
Expand All @@ -382,7 +384,7 @@ def _is_aad_token_valid(aad_token: dict) -> bool:
@staticmethod
def _check_azure_metadata_service() -> None:
"""
Check for Azure Metadata Service
Check for Azure Metadata Service.
https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service
"""
try:
Expand Down Expand Up @@ -472,7 +474,7 @@ def _do_api_call(
wrap_http_errors: bool = True,
):
"""
Utility function to perform an API call with retries
Utility function to perform an API call with retries.
:param endpoint_info: Tuple of method and endpoint
:param json: Parameters for this API call.
Expand Down Expand Up @@ -617,7 +619,9 @@ def _retryable_error(exception: BaseException) -> bool:

class _TokenAuth(AuthBase):
"""
Helper class for requests Auth field. AuthBase requires you to implement the __call__
Helper class for requests Auth field.
AuthBase requires you to implement the ``__call__``
magic function.
"""

Expand Down
19 changes: 10 additions & 9 deletions airflow/providers/google/cloud/hooks/cloud_memorystore.py
Expand Up @@ -90,7 +90,7 @@ def get_conn(self) -> CloudRedisClient:
@staticmethod
def _append_label(instance: Instance, key: str, val: str) -> Instance:
"""
Append labels to provided Instance type
Append labels to provided Instance type.
Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
airflow version string follows semantic versioning spec: x.y.z).
Expand Down Expand Up @@ -275,6 +275,8 @@ def failover_instance(
metadata: Sequence[Tuple[str, str]] = (),
):
"""
Failover of the primary node to current replica node.
Initiates a failover of the primary node to current replica node for a specific STANDARD tier Cloud
Memorystore for Redis instance.
Expand Down Expand Up @@ -392,8 +394,7 @@ def list_instances(
metadata: Sequence[Tuple[str, str]] = (),
):
"""
Lists all Redis instances owned by a project in either the specified location (region) or all
locations.
List Redis instances owned by a project at the specified location (region) or all locations.
:param location: The location of the Cloud Memorystore instance (for example europe-west1)
Expand Down Expand Up @@ -528,7 +529,7 @@ def get_conn(
@staticmethod
def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
"""
Append labels to provided Instance type
Append labels to provided Instance type.
Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
airflow version string follows semantic versioning spec: x.y.z).
Expand Down Expand Up @@ -752,8 +753,7 @@ def list_instances(
metadata: Sequence[Tuple[str, str]] = (),
):
"""
Lists all Memcached instances owned by a project in either the specified location (region) or all
locations.
List Memcached instances owned by a project at the specified location (region) or all locations.
:param location: The location of the Cloud Memorystore instance (for example europe-west1)
Expand Down Expand Up @@ -851,9 +851,10 @@ def update_parameters(
metadata: Sequence[Tuple[str, str]] = (),
):
"""
Updates the defined Memcached Parameters for an existing Instance. This method only stages the
parameters, it must be followed by apply_parameters to apply the parameters to nodes of
the Memcached Instance.
Update the defined Memcached Parameters for an existing Instance.
This method only stages the parameters, it must be followed by apply_parameters
to apply the parameters to nodes of the Memcached Instance.
:param update_mask: Required. Mask of fields to update.
If a dict is provided, it must be of the same form as the protobuf message
Expand Down
22 changes: 16 additions & 6 deletions airflow/providers/mysql/hooks/mysql.py
Expand Up @@ -61,8 +61,10 @@ def __init__(self, *args, **kwargs) -> None:

def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) -> None:
"""
The MySQLdb (mysqlclient) client uses an `autocommit` method rather
than an `autocommit` property to set the autocommit setting
Set *autocommit*.
*mysqlclient* uses an *autocommit* method rather than an *autocommit*
property, so we need to override this to support it.
:param conn: connection to set autocommit setting
:param autocommit: autocommit setting
Expand All @@ -75,8 +77,10 @@ def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) -> None:

def get_autocommit(self, conn: MySQLConnectionTypes) -> bool:
"""
The MySQLdb (mysqlclient) client uses a `get_autocommit` method
rather than an `autocommit` property to get the autocommit setting
Whether *autocommit* is active.
*mysqlclient* uses an *get_autocommit* method rather than an *autocommit*
property, so we need to override this to support it.
:param conn: connection to get autocommit setting from.
:return: connection autocommit setting
Expand Down Expand Up @@ -146,6 +150,8 @@ def _get_conn_config_mysql_connector_python(self, conn: Connection) -> Dict:

def get_conn(self) -> MySQLConnectionTypes:
"""
Connection to a MySQL database.
Establishes a connection to a mysql database
by extracting the connection configuration from the Airflow connection.
Expand Down Expand Up @@ -174,7 +180,7 @@ def get_conn(self) -> MySQLConnectionTypes:
raise ValueError('Unknown MySQL client name provided!')

def bulk_load(self, table: str, tmp_file: str) -> None:
"""Loads a tab-delimited file into a database table"""
"""Load a tab-delimited file into a database table."""
conn = self.get_conn()
cur = conn.cursor()
cur.execute(
Expand All @@ -187,7 +193,7 @@ def bulk_load(self, table: str, tmp_file: str) -> None:
conn.close()

def bulk_dump(self, table: str, tmp_file: str) -> None:
"""Dumps a database table into a tab-delimited file"""
"""Dump a database table into a tab-delimited file."""
conn = self.get_conn()
cur = conn.cursor()
cur.execute(
Expand All @@ -202,6 +208,8 @@ def bulk_dump(self, table: str, tmp_file: str) -> None:
@staticmethod
def _serialize_cell(cell: object, conn: Optional[Connection] = None) -> object:
"""
Convert argument to a literal.
The package MySQLdb converts an argument to a literal
when passing those separately to execute. Hence, this method does nothing.
Expand All @@ -214,6 +222,8 @@ def _serialize_cell(cell: object, conn: Optional[Connection] = None) -> object:

def get_iam_token(self, conn: Connection) -> Tuple[str, int]:
"""
Retrieve a temporary password to connect to MySQL.
Uses AWSHook to retrieve a temporary password to connect to MySQL
Port is required. If none is provided, default 3306 is used
"""
Expand Down
12 changes: 7 additions & 5 deletions airflow/providers/qubole/sensors/qubole.py
Expand Up @@ -29,7 +29,7 @@


class QuboleSensor(BaseSensorOperator):
"""Base class for all Qubole Sensors"""
"""Base class for all Qubole Sensors."""

template_fields: Sequence[str] = ('data', 'qubole_conn_id')

Expand Down Expand Up @@ -68,8 +68,9 @@ def poke(self, context: 'Context') -> bool:

class QuboleFileSensor(QuboleSensor):
"""
Wait for a file or folder to be present in cloud storage
and check for its presence via QDS APIs
Wait for a file or folder to be present in cloud storage.
Check for file or folder presence via QDS APIs.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
Expand All @@ -92,8 +93,9 @@ def __init__(self, **kwargs) -> None:

class QubolePartitionSensor(QuboleSensor):
"""
Wait for a Hive partition to show up in QHS (Qubole Hive Service)
and check for its presence via QDS APIs
Wait for a Hive partition to show up in QHS (Qubole Hive Service).
Check for Hive partition presence via QDS APIs.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
Expand Down
15 changes: 10 additions & 5 deletions airflow/utils/process_utils.py
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
"""Utilities for running or stopping processes"""
"""Utilities for running or stopping processes."""
import errno
import logging
import os
Expand Down Expand Up @@ -56,6 +56,8 @@ def reap_process_group(
timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
) -> Dict[int, int]:
"""
Send sig (SIGTERM) to the process group of pid.
Tries really hard to terminate all processes in the group (including grandchildren). Will send
sig (SIGTERM) to the process group of pid. If any process is alive after timeout
a SIGKILL will be send.
Expand Down Expand Up @@ -158,7 +160,7 @@ def signal_procs(sig):

def execute_in_subprocess(cmd: List[str], cwd: Optional[str] = None) -> None:
"""
Execute a process and stream output to logger
Execute a process and stream output to logger.
:param cmd: command and arguments to run
:param cwd: Current working directory passed to the Popen constructor
"""
Expand All @@ -167,7 +169,7 @@ def execute_in_subprocess(cmd: List[str], cwd: Optional[str] = None) -> None:

def execute_in_subprocess_with_kwargs(cmd: List[str], **kwargs) -> None:
"""
Execute a process and stream output to logger
Execute a process and stream output to logger.
:param cmd: command and arguments to run
Expand All @@ -190,6 +192,8 @@ def execute_in_subprocess_with_kwargs(cmd: List[str], **kwargs) -> None:

def execute_interactive(cmd: List[str], **kwargs) -> None:
"""
Run the new command as a subprocess.
Runs the new command as a subprocess and ensures that the terminal's state is restored to its original
state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
the process is completed.
Expand Down Expand Up @@ -271,8 +275,9 @@ def kill_child_processes_by_pids(pids_to_kill: List[int], timeout: int = 5) -> N
@contextmanager
def patch_environ(new_env_variables: Dict[str, str]) -> Generator[None, None, None]:
"""
Sets environment variables in context. After leaving the context, it restores its original state.
Set environment variables in context.
After leaving the context, it restores its original state.
:param new_env_variables: Environment variables to set
"""
current_env_state = {key: os.environ.get(key) for key in new_env_variables.keys()}
Expand Down Expand Up @@ -316,7 +321,7 @@ def check_if_pidfile_process_is_running(pid_file: str, process_name: str):

def set_new_process_group() -> None:
"""
Tries to set current process to a new process group
Try to set current process to a new process group.
That makes it easy to kill all sub-process of this at the OS-level,
rather than having to iterate the child processes.
If current process spawn by system call ``exec()`` than keep current process group
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/migrations-ref.rst
Expand Up @@ -58,7 +58,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``1de7bc13c950`` | ``b1b348e02d07`` | ``2.3.1`` | Add index for ``event`` column in ``log`` table. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``b1b348e02d07`` | ``75d5ed6c2b43`` | ``2.3.0`` | Update dag.default_view to grid |
| ``b1b348e02d07`` | ``75d5ed6c2b43`` | ``2.3.0`` | Update dag.default_view to grid. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``75d5ed6c2b43`` | ``909884dea523`` | ``2.3.0`` | Add map_index to Log. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
Expand Down Expand Up @@ -216,7 +216,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``41f5f12752f8`` | ``03bc53e68815`` | ``1.10.2`` | Add superuser field |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``03bc53e68815`` (merge_point) | ``0a2a5b66e19d``, | ``1.10.2`` | Merge migrations Heads |
| ``03bc53e68815`` (merge_point) | ``0a2a5b66e19d``, | ``1.10.2`` | Merge migrations Heads. |
| | ``bf00311e1990`` | | |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``0a2a5b66e19d`` | ``9635ae0956e7`` | ``1.10.2`` | Add ``task_reschedule`` table |
Expand Down

0 comments on commit 5066844

Please sign in to comment.