Skip to content

Commit

Permalink
Upgrade black to 20.8b1 (#10818)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil committed Sep 9, 2020
1 parent 004e1d8 commit 9549274
Show file tree
Hide file tree
Showing 334 changed files with 4,868 additions and 1,358 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Expand Up @@ -150,7 +150,7 @@ repos:
hooks:
- id: check-hooks-apply
- repo: https://github.com/psf/black
rev: 19.10b0
rev: 20.8b1
hooks:
- id: black
files: api_connexion/.*\.py|.*providers.*\.py
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_connexion/endpoints/health_endpoint.py
Expand Up @@ -40,7 +40,10 @@ def get_health():

payload = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {"status": scheduler_status, "latest_scheduler_heartbeat": latest_scheduler_heartbeat,},
"scheduler": {
"status": scheduler_status,
"latest_scheduler_heartbeat": latest_scheduler_heartbeat,
},
}

return health_schema.dump(payload)
7 changes: 6 additions & 1 deletion airflow/api_connexion/endpoints/variable_endpoint.py
Expand Up @@ -64,7 +64,12 @@ def get_variables(session, limit: Optional[int], offset: Optional[int] = None) -
if limit:
query = query.limit(limit)
variables = query.all()
return variable_collection_schema.dump({"variables": variables, "total_entries": total_entries,})
return variable_collection_schema.dump(
{
"variables": variables,
"total_entries": total_entries,
}
)


@security.requires_authentication
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_connexion/schemas/error_schema.py
Expand Up @@ -33,7 +33,9 @@ class Meta:
import_error_id = auto_field("id", dump_only=True)
timestamp = auto_field(format="iso")
filename = auto_field()
stack_trace = auto_field("stacktrace",)
stack_trace = auto_field(
"stacktrace",
)


class ImportErrorCollection(NamedTuple):
Expand Down
Expand Up @@ -57,7 +57,9 @@
# [END howto_operator_datasync_1_1]

with models.DAG(
"example_datasync_1_2", start_date=days_ago(1), schedule_interval=None, # Override to match your needs
"example_datasync_1_2",
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
) as dag:
# [START howto_operator_datasync_1_2]
datasync_task_2 = AWSDataSyncOperator(
Expand Down
Expand Up @@ -56,7 +56,12 @@
task_definition="hello-world",
launch_type="FARGATE",
overrides={
"containerOverrides": [{"name": "hello-world-container", "command": ["echo", "hello", "world"],},],
"containerOverrides": [
{
"name": "hello-world-container",
"command": ["echo", "hello", "world"],
},
],
},
network_configuration={
"awsvpcConfiguration": {
Expand Down
12 changes: 9 additions & 3 deletions airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
Expand Up @@ -31,7 +31,9 @@ def upload_keys():
s3_hook = S3Hook()
for i in range(0, 3):
s3_hook.load_string(
string_data="input", key=f"path/data{i}", bucket_name=BUCKET_NAME,
string_data="input",
key=f"path/data{i}",
bucket_name=BUCKET_NAME,
)


Expand All @@ -44,15 +46,19 @@ def upload_keys():
) as dag:

create_bucket = S3CreateBucketOperator(
task_id='s3_bucket_dag_create', bucket_name=BUCKET_NAME, region_name='us-east-1',
task_id='s3_bucket_dag_create',
bucket_name=BUCKET_NAME,
region_name='us-east-1',
)

add_keys_to_bucket = PythonOperator(
task_id="s3_bucket_dag_add_keys_to_bucket", python_callable=upload_keys
)

delete_bucket = S3DeleteBucketOperator(
task_id='s3_bucket_dag_delete', bucket_name=BUCKET_NAME, force_delete=True,
task_id='s3_bucket_dag_delete',
bucket_name=BUCKET_NAME,
force_delete=True,
)

create_bucket >> add_keys_to_bucket >> delete_bucket
11 changes: 8 additions & 3 deletions airflow/providers/amazon/aws/hooks/base_aws.py
Expand Up @@ -71,7 +71,9 @@ def _create_basic_session(self, session_kwargs: Dict[str, Any]) -> boto3.session
self.log.info("Retrieving region_name from Connection.extra_config['region_name']")
region_name = self.extra_config["region_name"]
self.log.info(
"Creating session with aws_access_key_id=%s region_name=%s", aws_access_key_id, region_name,
"Creating session with aws_access_key_id=%s region_name=%s",
aws_access_key_id,
region_name,
)

return boto3.session.Session(
Expand Down Expand Up @@ -161,7 +163,9 @@ def _assume_role(
assume_role_kwargs["ExternalId"] = self.extra_config.get("external_id")
role_session_name = f"Airflow_{self.conn.conn_id}"
self.log.info(
"Doing sts_client.assume_role to role_arn=%s (role_session_name=%s)", role_arn, role_session_name,
"Doing sts_client.assume_role to role_arn=%s (role_session_name=%s)",
role_arn,
role_session_name,
)
return sts_client.assume_role(
RoleArn=role_arn, RoleSessionName=role_session_name, **assume_role_kwargs
Expand Down Expand Up @@ -317,7 +321,8 @@ def _get_credentials(self, region_name):
# http://boto3.readthedocs.io/en/latest/guide/configuration.html

self.log.info(
"Creating session using boto3 credential strategy region_name=%s", region_name,
"Creating session using boto3 credential strategy region_name=%s",
region_name,
)
session = boto3.session.Session(region_name=region_name)
return session, None
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/amazon/aws/hooks/batch_client.py
Expand Up @@ -342,7 +342,10 @@ def poll_job_status(self, job_id: str, match_status: List[str]) -> bool:
job = self.get_job_description(job_id)
job_status = job.get("status")
self.log.info(
"AWS Batch job (%s) check status (%s) in %s", job_id, job_status, match_status,
"AWS Batch job (%s) check status (%s) in %s",
job_id,
job_status,
match_status,
)

if job_status in match_status:
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/amazon/aws/hooks/redshift.py
Expand Up @@ -116,6 +116,7 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier:
:type cluster_identifier: str
"""
response = self.get_conn().create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier,
SnapshotIdentifier=snapshot_identifier,
ClusterIdentifier=cluster_identifier,
)
return response['Snapshot'] if response['Snapshot'] else None
4 changes: 3 additions & 1 deletion airflow/providers/amazon/aws/operators/batch.py
Expand Up @@ -152,7 +152,9 @@ def submit_job(self, context: Dict): # pylint: disable=unused-argument
:raises: AirflowException
"""
self.log.info(
"Running AWS Batch job - job definition: %s - on queue %s", self.job_definition, self.job_queue,
"Running AWS Batch job - job definition: %s - on queue %s",
self.job_definition,
self.job_queue,
)
self.log.info("AWS Batch job - container overrides: %s", self.overrides)

Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/amazon/aws/operators/datasync.py
Expand Up @@ -182,7 +182,8 @@ def get_hook(self):
"""
if not self.hook:
self.hook = AWSDataSyncHook(
aws_conn_id=self.aws_conn_id, wait_interval_seconds=self.wait_interval_seconds,
aws_conn_id=self.aws_conn_id,
wait_interval_seconds=self.wait_interval_seconds,
)
return self.hook

Expand Down Expand Up @@ -239,7 +240,8 @@ def _get_tasks_and_locations(self):

self.log.info("Finding DataSync TaskArns that have these LocationArns")
self.candidate_task_arns = hook.get_task_arns_for_location_arns(
self.candidate_source_location_arns, self.candidate_destination_location_arns,
self.candidate_source_location_arns,
self.candidate_destination_location_arns,
)
self.log.info("Found candidate DataSync TaskArns %s", self.candidate_task_arns)

Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/amazon/aws/operators/ec2_start_instance.py
Expand Up @@ -65,5 +65,7 @@ def execute(self, context):
instance = ec2_hook.get_instance(instance_id=self.instance_id)
instance.start()
ec2_hook.wait_for_state(
instance_id=self.instance_id, target_state="running", check_interval=self.check_interval,
instance_id=self.instance_id,
target_state="running",
check_interval=self.check_interval,
)
4 changes: 3 additions & 1 deletion airflow/providers/amazon/aws/operators/ec2_stop_instance.py
Expand Up @@ -65,5 +65,7 @@ def execute(self, context):
instance = ec2_hook.get_instance(instance_id=self.instance_id)
instance.stop()
ec2_hook.wait_for_state(
instance_id=self.instance_id, target_state="stopped", check_interval=self.check_interval,
instance_id=self.instance_id,
target_state="stopped",
check_interval=self.check_interval,
)
8 changes: 6 additions & 2 deletions airflow/providers/amazon/aws/secrets/secrets_manager.py
Expand Up @@ -85,7 +85,9 @@ def client(self):
"""
Create a Secrets Manager client
"""
session = boto3.session.Session(profile_name=self.profile_name,)
session = boto3.session.Session(
profile_name=self.profile_name,
)
return session.client(service_name="secretsmanager", **self.kwargs)

def get_conn_uri(self, conn_id: str) -> Optional[str]:
Expand Down Expand Up @@ -126,7 +128,9 @@ def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[str]:
"""
secrets_path = self.build_path(path_prefix, secret_id, self.sep)
try:
response = self.client.get_secret_value(SecretId=secrets_path,)
response = self.client.get_secret_value(
SecretId=secrets_path,
)
return response.get('SecretString')
except self.client.exceptions.ResourceNotFoundException:
self.log.debug(
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Expand Up @@ -42,7 +42,9 @@ def _upload_file_to_s3(file_obj, bucket_name, s3_key_prefix):
s3_client = S3Hook().get_conn()
file_obj.seek(0)
s3_client.upload_file(
Filename=file_obj.name, Bucket=bucket_name, Key=s3_key_prefix + str(uuid4()),
Filename=file_obj.name,
Bucket=bucket_name,
Key=s3_key_prefix + str(uuid4()),
)


Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/amazon/aws/transfers/s3_to_sftp.py
Expand Up @@ -61,8 +61,7 @@ def __init__(

@staticmethod
def get_s3_key(s3_key):
"""This parses the correct format for S3 keys regardless of how the S3 url is passed.
"""
"""This parses the correct format for S3 keys regardless of how the S3 url is passed."""

parsed_s3_key = urlparse(s3_key)
return parsed_s3_key.path.lstrip('/')
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/apache/druid/transfers/hive_to_druid.py
Expand Up @@ -152,7 +152,10 @@ def execute(self, context: Dict[str, Any]) -> None:
druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id)

try:
index_spec = self.construct_ingest_query(static_path=static_path, columns=columns,)
index_spec = self.construct_ingest_query(
static_path=static_path,
columns=columns,
)

self.log.info("Inserting rows into Druid, hdfs path: %s", static_path)

Expand Down
10 changes: 9 additions & 1 deletion airflow/providers/apache/hive/operators/hive_stats.py
Expand Up @@ -181,5 +181,13 @@ def execute(self, context: Optional[Dict[str, Any]] = None) -> None:
mysql.insert_rows(
table='hive_stats',
rows=rows,
target_fields=['ds', 'dttm', 'table_name', 'partition_repr', 'col', 'metric', 'value',],
target_fields=[
'ds',
'dttm',
'table_name',
'partition_repr',
'col',
'metric',
'value',
],
)
9 changes: 7 additions & 2 deletions airflow/providers/apache/livy/example_dags/example_livy.py
Expand Up @@ -28,7 +28,10 @@
args = {'owner': 'airflow', 'email': ['[email protected]'], 'depends_on_past': False}

with DAG(
dag_id='example_livy_operator', default_args=args, schedule_interval='@daily', start_date=days_ago(5),
dag_id='example_livy_operator',
default_args=args,
schedule_interval='@daily',
start_date=days_ago(5),
) as dag:

livy_java_task = LivyOperator(
Expand All @@ -38,7 +41,9 @@
file='/spark-examples.jar',
args=[10],
num_executors=1,
conf={'spark.shuffle.compress': 'false',},
conf={
'spark.shuffle.compress': 'false',
},
class_name='org.apache.spark.examples.SparkPi',
)

Expand Down
7 changes: 6 additions & 1 deletion airflow/providers/apache/pig/example_dags/example_pig.py
Expand Up @@ -34,4 +34,9 @@
tags=['example'],
)

run_this = PigOperator(task_id="run_example_pig_script", pig="ls /;", pig_opts="-x local", dag=dag,)
run_this = PigOperator(
task_id="run_example_pig_script",
pig="ls /;",
pig_opts="-x local",
dag=dag,
)
10 changes: 8 additions & 2 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Expand Up @@ -133,7 +133,10 @@ def get_namespace(self):
return namespace

def get_pod_log_stream(
self, pod_name: str, container: Optional[str] = "", namespace: Optional[str] = None,
self,
pod_name: str,
container: Optional[str] = "",
namespace: Optional[str] = None,
) -> Tuple[watch.Watch, Generator[str, None, None]]:
"""
Retrieves a log stream for a container in a kubernetes pod.
Expand All @@ -159,7 +162,10 @@ def get_pod_log_stream(
)

def get_pod_logs(
self, pod_name: str, container: Optional[str] = "", namespace: Optional[str] = None,
self,
pod_name: str,
container: Optional[str] = "",
namespace: Optional[str] = None,
):
"""
Retrieves a container's log from the specified pod.
Expand Down
Expand Up @@ -57,7 +57,9 @@

notebook_task_params = {
'new_cluster': new_cluster,
'notebook_task': {'notebook_path': '/Users/[email protected]/PrepareData',},
'notebook_task': {
'notebook_path': '/Users/[email protected]/PrepareData',
},
}
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(task_id='notebook_task', json=notebook_task_params)
Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/databricks/operators/databricks.py
Expand Up @@ -43,7 +43,13 @@ def _deep_string_coerce(content, json_path='json'):
coerce = _deep_string_coerce
if isinstance(content, str):
return content
elif isinstance(content, (int, float,)):
elif isinstance(
content,
(
int,
float,
),
):
# Databricks can tolerate either numeric or string types in the API backend.
return str(content)
elif isinstance(content, (list, tuple)):
Expand Down
11 changes: 9 additions & 2 deletions airflow/providers/facebook/ads/hooks/ads.py
Expand Up @@ -59,7 +59,11 @@ class FacebookAdsReportingHook(BaseHook):
"""

def __init__(self, facebook_conn_id: str = "facebook_default", api_version: str = "v6.0",) -> None:
def __init__(
self,
facebook_conn_id: str = "facebook_default",
api_version: str = "v6.0",
) -> None:
super().__init__()
self.facebook_conn_id = facebook_conn_id
self.api_version = api_version
Expand Down Expand Up @@ -92,7 +96,10 @@ def facebook_ads_config(self) -> Dict:
return config

def bulk_facebook_report(
self, params: Dict[str, Any], fields: List[str], sleep_time: int = 5,
self,
params: Dict[str, Any],
fields: List[str],
sleep_time: int = 5,
) -> List[AdsInsights]:
"""
Pulls data from the Facebook Ads API
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/google/ads/transfers/ads_to_gcs.py
Expand Up @@ -124,6 +124,9 @@ def execute(self, context: Dict):

hook = GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
hook.upload(
bucket_name=self.bucket, object_name=self.obj, filename=csvfile.name, gzip=self.gzip,
bucket_name=self.bucket,
object_name=self.obj,
filename=csvfile.name,
gzip=self.gzip,
)
self.log.info("%s uploaded to GCS", self.obj)

0 comments on commit 9549274

Please sign in to comment.