Skip to content

Commit

Permalink
D205 Support - Providers - Final Pass (#33303)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi committed Aug 11, 2023
1 parent b5a4d36 commit 7e79997
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 6 deletions.
2 changes: 2 additions & 0 deletions airflow/providers/apache/beam/hooks/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def start_go_pipeline_with_binary(
class BeamAsyncHook(BeamHook):
"""
Asynchronous hook for Apache Beam.
:param runner: Runner type.
"""

Expand All @@ -411,6 +412,7 @@ async def _create_tmp_dir(prefix: str) -> str:
async def _cleanup_tmp_dir(tmp_dir: str) -> None:
"""
Helper method to delete temporary directory after finishing work with it.
Is uses `rmtree` method to recursively remove the temporary directory.
"""
shutil.rmtree(tmp_dir)
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/apache/beam/operators/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ async def execute_async(self, context: Context):
def execute_complete(self, context: Context, event: dict[str, Any]):
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def _list_pods(self, query_kwargs):
def _make_safe_label_value(self, input_value: str | datetime) -> str:
"""
Normalize a provided label to be of valid length and characters.
See airflow.providers.cncf.kubernetes.pod_generator.make_safe_label_value for more details.
"""
# airflow.providers.cncf.kubernetes is an expensive import, locally import it here to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@

def md5(__string: ReadableBuffer = b"") -> hashlib._Hash:
"""
Safely allows calling the ``hashlib.md5`` function when ``usedforsecurity`` is disabled in
the configuration.
Safely allows calling the ``hashlib.md5`` function when ``usedforsecurity`` is disabled in configuration.
:param __string: The data to hash. Default to empty str byte.
:return: The hashed value.
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/elasticsearch/log/es_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def to_dict(self):
class Hit(AttributeDict):
"""
The Hit class is used to manage and access elements in a document.
It inherits from the AttributeDict class and provides
attribute-like access to its elements, similar to a dictionary.
"""
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def __init__(
def format_url(host: str) -> str:
"""
Formats the given host string to ensure it starts with 'http'.
Checks if the host string represents a valid URL.
:params host: The host string to format and check.
Expand Down Expand Up @@ -444,6 +445,7 @@ def supports_external_link(self) -> bool:
def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> type[Hit]:
"""
Resolves nested hits from Elasticsearch by iteratively navigating the `_nested` field.
The result is used to fetch the appropriate document class to handle the hit.
This method can be used with nested Elasticsearch fields which are structured
Expand All @@ -468,8 +470,7 @@ def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> type[Hit]:

def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
"""
This method processes a hit (i.e., a result) from an Elasticsearch response and transforms it into an
appropriate class instance.
Process a hit (i.e., a result) from an Elasticsearch response and transform it into a class instance.
The transformation depends on the contents of the hit. If the document in hit contains a nested field,
the '_resolve_nested' method is used to determine the appropriate class (based on the nested path).
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/ftp/operators/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def execute(self, context: Any) -> str | list[str] | None:

def get_openlineage_facets_on_start(self):
"""
Returns OpenLineage datasets with following naming structure:
Returns OpenLineage datasets.
Dataset will have the following structure:
input: file://hostname/path
output file://<conn.host>:<conn.port>/path.
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ def _copy_single_object(self, hook, source_object, destination_object):
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because execute method does preprocessing on internals.
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/transfers/s3_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H
def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/redis/log/redis_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
class RedisTaskHandler(FileTaskHandler, LoggingMixin):
"""
RedisTaskHandler is a Python log handler that handles and reads task instance logs.
It extends airflow FileTaskHandler and uploads to and reads from Redis.
:param base_log_folder:
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/sftp/operators/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ def execute(self, context: Any) -> str | list[str] | None:

def get_openlineage_facets_on_start(self):
"""
This returns OpenLineage datasets in format:
Returns OpenLineage datasets.
Dataset will have the following structure:
input: file://<local_host>/path
output: file://<remote_host>:<remote_port>/path.
"""
Expand Down

0 comments on commit 7e79997

Please sign in to comment.