Skip to content

Commit

Permalink
Fix MyPy errors for google.cloud.transfers (#20229)
Browse files Browse the repository at this point in the history
Part of #19891
  • Loading branch information
potiuk committed Dec 14, 2021
1 parent 0b7734e commit 1570519
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/facebook/ads/hooks/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def facebook_ads_config(self) -> Dict:

def bulk_facebook_report(
self,
params: Dict[str, Any],
params: Optional[Dict[str, Any]],
fields: List[str],
sleep_time: int = 5,
) -> Union[List[AdsInsights], Dict[str, List[AdsInsights]]]:
Expand Down
43 changes: 23 additions & 20 deletions airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ def __init__(
bucket_name: str,
object_name: str,
fields: List[str],
params: Dict[str, Any] = None,
parameters: Dict[str, Any] = None,
params: Optional[Dict[str, Any]] = None,
parameters: Optional[Dict[str, Any]] = None,
gzip: bool = False,
upload_as_account: bool = False,
api_version: Optional[str] = None,
Expand Down Expand Up @@ -200,30 +200,33 @@ def _prepare_rows_for_upload(

def _decide_and_flush(self, converted_rows_with_action: Dict[FlushAction, list]):
total_data_count = 0
if FlushAction.EXPORT_ONCE in converted_rows_with_action:
once_action = converted_rows_with_action.get(FlushAction.EXPORT_ONCE)
if once_action is not None:
self._flush_rows(
converted_rows=converted_rows_with_action.get(FlushAction.EXPORT_ONCE),
converted_rows=once_action,
object_name=self.object_name,
)
total_data_count += len(converted_rows_with_action.get(FlushAction.EXPORT_ONCE))
elif FlushAction.EXPORT_EVERY_ACCOUNT in converted_rows_with_action:
for converted_rows in converted_rows_with_action.get(FlushAction.EXPORT_EVERY_ACCOUNT):
self._flush_rows(
converted_rows=converted_rows.get("converted_rows"),
object_name=self._transform_object_name_with_account_id(
account_id=converted_rows.get("account_id")
),
)
total_data_count += len(converted_rows.get("converted_rows"))
total_data_count += len(once_action)
else:
message = (
"FlushAction not found in the data. Please check the FlushAction in the operator. Converted "
"Rows with Action: " + str(converted_rows_with_action)
)
raise AirflowException(message)
every_account_action = converted_rows_with_action.get(FlushAction.EXPORT_EVERY_ACCOUNT)
if every_account_action:
for converted_rows in every_account_action:
self._flush_rows(
converted_rows=converted_rows.get("converted_rows"),
object_name=self._transform_object_name_with_account_id(
account_id=converted_rows.get("account_id")
),
)
total_data_count += len(converted_rows.get("converted_rows"))
else:
message = (
"FlushAction not found in the data. Please check the FlushAction in "
"the operator. Converted Rows with Action: " + str(converted_rows_with_action)
)
raise AirflowException(message)
return total_data_count

def _flush_rows(self, converted_rows: list, object_name: str):
def _flush_rows(self, converted_rows: Optional[List[Any]], object_name: str):
if converted_rows:
headers = converted_rows[0].keys()
with tempfile.NamedTemporaryFile("w", suffix=".csv") as csvfile:
Expand Down

0 comments on commit 1570519

Please sign in to comment.