Skip to content

Commit

Permalink
Add D202 pydocstyle check (#11032)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcandoalmeida committed Sep 22, 2020
1 parent 52fdb62 commit f3e87c5
Show file tree
Hide file tree
Showing 88 changed files with 1 addition and 215 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ repos:
name: Run pydocstyle
args:
- --convention=pep257
- --add-ignore=D100,D102,D104,D105,D107,D200,D202,D205,D400,D401
- --add-ignore=D100,D102,D104,D105,D107,D200,D205,D400,D401
exclude: ^tests/.*\.py$|^scripts/.*\.py$|^dev|^backport_packages|^kubernetes_tests
- repo: local
hooks:
Expand Down
1 change: 0 additions & 1 deletion airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def __init__(self):

def init_app(app):
"""Initializes application with kerberos"""

hostname = app.config.get('SERVER_NAME')
if not hostname:
hostname = getfqdn()
Expand Down
1 change: 0 additions & 1 deletion airflow/api/common/experimental/get_dag_run_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def get_dag_run_state(dag_id: str, execution_date: datetime) -> Dict[str, str]:
:param execution_date: execution date
:return: Dictionary storing state of the object
"""

dag = check_and_get_dag(dag_id=dag_id)

dagrun = check_and_get_dagrun(dag, execution_date)
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def get_dag_runs(
"""
Get all DAG Runs.
"""

query = session.query(DagRun)

# This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs for all DAGs.
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def get_event_logs(session, limit, offset=None):
"""
Get all log entries from event log
"""

total_entries = session.query(func.count(Log.id)).scalar()
event_logs = session.query(Log).order_by(Log.id).offset(offset).limit(limit).all()
return event_log_collection_schema.dump(
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/endpoints/import_error_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def get_import_errors(session, limit, offset=None):
"""
Get all import errors
"""

total_entries = session.query(func.count(ImportError.id)).scalar()
import_errors = session.query(ImportError).order_by(ImportError.id).offset(offset).limit(limit).all()
return import_error_collection_schema.dump(
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/endpoints/pool_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def get_pools(session, limit, offset=None):
"""
Get all pools
"""

total_entries = session.query(func.count(Pool.id)).scalar()
pools = session.query(Pool).order_by(Pool.id).offset(offset).limit(limit).all()
return pool_collection_schema.dump(PoolCollection(pools=pools, total_entries=total_entries))
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def get_xcom_entries(
"""
Get all XCom values
"""

query = session.query(XCom)
if dag_id != '~':
query = query.filter(XCom.dag_id == dag_id)
Expand Down
2 changes: 0 additions & 2 deletions airflow/api_connexion/schemas/common_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class TimeDeltaSchema(Schema):
@marshmallow.post_load
def make_time_delta(self, data, **kwargs):
"""Create time delta based on data"""

if "objectType" in data:
del data["objectType"]
return datetime.timedelta(**data)
Expand Down Expand Up @@ -74,7 +73,6 @@ class RelativeDeltaSchema(Schema):
@marshmallow.post_load
def make_relative_delta(self, data, **kwargs):
"""Create relative delta based on data"""

if "objectType" in data:
del data["objectType"]

Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class Meta:
@staticmethod
def get_owners(obj: DagModel):
"""Convert owners attribute to DAG representation"""

if not getattr(obj, 'owners', None):
return []
return obj.owners.split(",")
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ def dag_list_dag_runs(args, dag=None):
@cli_utils.action_logging
def generate_pod_yaml(args):
"""Generates yaml files for each task in the DAG. Used for testing output of KubernetesExecutor"""

from kubernetes.client.api_client import ApiClient

from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig
Expand Down
2 changes: 0 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def _run_raw_task(args, ti):
@cli_utils.action_logging
def task_run(args, dag=None):
"""Runs a single task instance"""

# Load custom airflow config
if args.cfg_path:
with open(args.cfg_path, 'r') as conf_file:
Expand Down Expand Up @@ -289,7 +288,6 @@ def _guess_debugger():
* `ipdb <https://github.com/gotcha/ipdb>`__
* `pdb <https://docs.python.org/3/library/pdb.html>`__
"""

for mod in SUPPORTED_DEBUGGER_MODULES:
try:
return importlib.import_module(mod)
Expand Down
1 change: 0 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ def _validate_config_dependencies(self):
Validate that config values aren't invalid given other config values
or system-level limitations and requirements.
"""

if (
self.get("core", "executor") not in ('DebugExecutor', 'SequentialExecutor') and
"sqlite" in self.get('core', 'sql_alchemy_conn')):
Expand Down
2 changes: 0 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ def _check_for_stalled_adopted_tasks(self):

def update_all_task_states(self) -> None:
"""Updates states of the tasks."""

self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
state_and_info_by_celery_task_id = self.bulk_state_fetcher.get_many(self.tasks.values())

Expand Down Expand Up @@ -401,7 +400,6 @@ def fetch_celery_task_state(async_result: AsyncResult) -> \
of the task
:rtype: tuple[str, str, str]
"""

try:
with timeout(seconds=OPERATION_TIMEOUT):
# Accessing state property of celery task will make actual network request
Expand Down
2 changes: 0 additions & 2 deletions airflow/hooks/dbapi_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def get_autocommit(self, conn):
:return: connection autocommit setting.
:rtype: bool
"""

return getattr(conn, 'autocommit', False) and self.supports_autocommit

def get_cursor(self):
Expand Down Expand Up @@ -322,7 +321,6 @@ def _serialize_cell(cell, conn=None): # pylint: disable=unused-argument
:return: The serialized cell
:rtype: str
"""

if cell is None:
return None
if isinstance(cell, datetime):
Expand Down
1 change: 0 additions & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ def _process_backfill_task_instances(self, # pylint: disable=too-many-statement
:return: the list of execution_dates for the finished dag runs
:rtype: list
"""

executed_run_dates = []

while ((len(ti_status.to_run) > 0 or len(ti_status.running) > 0) and
Expand Down
1 change: 0 additions & 1 deletion airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def on_kill(self):
@provide_session
def heartbeat_callback(self, session=None):
"""Self destruct task if state has been moved away from running externally"""

if self.terminating:
# ensure termination if processes are created later
self.task_runner.terminate()
Expand Down
1 change: 0 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,6 @@ def _process_task_instances(
active DAG runs and adding task instances that should run to the
queue.
"""

# update the state of the previously active dag runs
active_dag_runs = 0
task_instances_list = []
Expand Down
1 change: 0 additions & 1 deletion airflow/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def get_kube_client(in_cluster: bool = conf.getboolean('kubernetes', 'in_cluster
:return kubernetes client
:rtype client.CoreV1Api
"""

if not has_kubernetes:
raise _import_err

Expand Down
1 change: 0 additions & 1 deletion airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
@staticmethod
def from_obj(obj) -> Optional[Union[dict, k8s.V1Pod]]:
"""Converts to pod from obj"""

if obj is None:
return None

Expand Down
1 change: 0 additions & 1 deletion airflow/macros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def ds_add(ds, days):
>>> ds_add('2015-01-06', -5)
'2015-01-01'
"""

ds = datetime.strptime(ds, '%Y-%m-%d')
if days:
ds = ds + timedelta(days)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def downgrade():
"""
Make TaskInstance.pool field nullable.
"""

conn = op.get_bind()
if conn.dialect.name == "mssql":
op.drop_index('ti_pool', table_name='task_instance')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

def upgrade():
"""Apply Set conn_type as non-nullable"""

Base = declarative_base()

class Connection(Base):
Expand Down
4 changes: 0 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,6 @@ def priority_weight_total(self) -> int:
@cached_property
def operator_extra_link_dict(self) -> Dict[str, Any]:
"""Returns dictionary of all extra links for the operator"""

op_extra_links_from_plugin: Dict[str, Any] = {}
from airflow import plugins_manager
plugins_manager.initialize_extra_operators_links_plugins()
Expand Down Expand Up @@ -831,7 +830,6 @@ def render_template_fields(self, context: Dict, jinja_env: Optional[jinja2.Envir
:param jinja_env: Jinja environment
:type jinja_env: jinja2.Environment
"""

if not jinja_env:
jinja_env = self.get_template_env()

Expand Down Expand Up @@ -866,7 +864,6 @@ def render_template( # pylint: disable=too-many-return-statements
:type seen_oids: set
:return: Templated content
"""

if not jinja_env:
jinja_env = self.get_template_env()

Expand Down Expand Up @@ -1034,7 +1031,6 @@ def get_flat_relative_ids(self,
"""
Get a flat set of relatives' ids, either upstream or downstream.
"""

if not self._dag:
return set()

Expand Down
2 changes: 0 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,6 @@ def resolve_template_files(self):

def get_template_env(self) -> jinja2.Environment:
"""Build a Jinja2 environment."""

# Collect directories to search for template files
searchpath = [self.folder]
if self.template_searchpath:
Expand Down Expand Up @@ -1683,7 +1682,6 @@ def deactivate_unknown_dags(active_dag_ids, session=None):
:type active_dag_ids: list[unicode]
:return: None
"""

if len(active_dag_ids) == 0:
return
for dag in session.query(
Expand Down
1 change: 0 additions & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ def bag_dag(self, dag, root_dag):
Adds the DAG into the bag, recurses into sub dags.
Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags
"""

test_cycle(dag) # throws if a task cycle is found

dag.resolve_template_files()
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ def get_dag(self):
@provide_session
def get_previous_dagrun(self, state: Optional[str] = None, session: Session = None) -> Optional['DagRun']:
"""The previous DagRun, if there is one"""

filters = [
DagRun.dag_id == self.dag_id,
DagRun.execution_date < self.execution_date,
Expand Down Expand Up @@ -312,7 +311,6 @@ def update_state(self, session: Session = None) -> List[TI]:
:return: ready_tis: the tis that can be scheduled in the current loop
:rtype ready_tis: list[airflow.models.TaskInstance]
"""

dag = self.get_dag()
ready_tis: List[TI] = []
tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,)))
Expand Down
5 changes: 0 additions & 5 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,6 @@ def check_and_change_state_before_execution( # pylint: disable=too-many-argum
:return: whether the state was changed to running or not
:rtype: bool
"""

task = self.task
self.refresh_from_task(task, pool_override=pool)
self.test_mode = test_mode
Expand Down Expand Up @@ -1061,7 +1060,6 @@ def _run_raw_task(
:param session: SQLAlchemy ORM Session
:type session: Session
"""

task = self.task
self.test_mode = test_mode
self.refresh_from_task(task, pool_override=pool)
Expand Down Expand Up @@ -1719,7 +1717,6 @@ def xcom_push(
task on a future date without it being immediately visible.
:type execution_date: datetime
"""

if execution_date and execution_date < self.execution_date:
raise ValueError(
'execution_date can not be in the past (current '
Expand Down Expand Up @@ -1768,7 +1765,6 @@ def xcom_pull( # pylint: disable=inconsistent-return-statements
are returned as well.
:type include_prior_dates: bool
"""

if dag_id is None:
dag_id = self.dag_id

Expand Down Expand Up @@ -1922,7 +1918,6 @@ def construct_task_instance(self, session=None, lock_for_update=False) -> TaskIn
session is committed.
:return: the task instance constructed
"""

qry = session.query(TaskInstance).filter(
TaskInstance.dag_id == self._dag_id,
TaskInstance.task_id == self._task_id,
Expand Down
1 change: 0 additions & 1 deletion airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def set(
:param serialize_json: Serialize the value to a JSON string
:param session: SQL Alchemy Sessions
"""

if serialize_json:
stored_value = json.dumps(value, indent=2)
else:
Expand Down
1 change: 0 additions & 1 deletion airflow/operators/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ def push(self, meta_data):
Optional: Send data check info and metadata to an external database.
Default functionality will log metadata.
"""

info = "\n".join([f"""{key}: {item}""" for key, item in meta_data.items()])
self.log.info("Log from %s:\n%s", self.dag_id, info)

Expand Down
3 changes: 0 additions & 3 deletions airflow/providers/amazon/aws/hooks/cloud_formation.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def get_stack_status(self, stack_name):
"""
Get stack status from CloudFormation.
"""

self.log.info('Poking for stack %s', stack_name)

try:
Expand All @@ -63,7 +62,6 @@ def create_stack(self, stack_name, params):
:param params: parameters to be passed to CloudFormation.
:type params: dict
"""

if 'StackName' not in params:
params['StackName'] = stack_name
self.get_conn().create_stack(**params)
Expand All @@ -77,7 +75,6 @@ def delete_stack(self, stack_name, params=None):
:param params: parameters to be passed to CloudFormation (optional).
:type params: dict
"""

params = params or {}
if 'StackName' not in params:
params['StackName'] = stack_name
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/amazon/aws/hooks/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: List[str
:type cluster_states: list
:return: id of the EMR cluster
"""

response = self.get_conn().list_clusters(ClusterStates=cluster_states)

matching_clusters = list(
Expand All @@ -73,7 +72,6 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
run_job_flow method.
Overrides for this config may be passed as the job_flow_overrides.
"""

if not self.emr_conn_id:
raise AirflowException('emr_conn_id must be present to use create_job_flow')

Expand Down

0 comments on commit f3e87c5

Please sign in to comment.