Skip to content

Commit

Permalink
fix: ensure datetime-related values fully compatible with MySQL and B…
Browse files Browse the repository at this point in the history
…igQuery (#15026)
  • Loading branch information
tianjianjiang committed Jun 13, 2021
1 parent 001dc22 commit b272f9c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
17 changes: 9 additions & 8 deletions airflow/providers/google/cloud/transfers/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
"""MySQL to GCS operator."""

import base64
import calendar
from datetime import date, datetime, timedelta
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from typing import Dict

Expand Down Expand Up @@ -100,10 +99,12 @@ def convert_type(self, value, schema_type: str):
Takes a value from MySQLdb, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
* Datetimes are converted to UTC seconds.
* Datetimes are converted to `str(value)` (`datetime.isoformat(' ')`)
strings.
* Times are converted to `str((datetime.min + value).time())` strings.
* Decimals are converted to floats.
* Dates are converted to ISO formatted string if given schema_type is
DATE, or UTC seconds otherwise.
* Dates are converted to ISO formatted strings if given schema_type is
DATE, or `datetime.isoformat(' ')` strings otherwise.
* Binary type fields are converted to integer if given schema_type is
INTEGER, or encoded with base64 otherwise. Imported BYTES data must
be base64-encoded according to BigQuery documentation:
Expand All @@ -117,16 +118,16 @@ def convert_type(self, value, schema_type: str):
if value is None:
return value
if isinstance(value, datetime):
value = calendar.timegm(value.timetuple())
value = str(value)
elif isinstance(value, timedelta):
value = value.total_seconds()
value = str((datetime.min + value).time())
elif isinstance(value, Decimal):
value = float(value)
elif isinstance(value, date):
if schema_type == "DATE":
value = value.isoformat()
else:
value = calendar.timegm(value.timetuple())
value = str(datetime.combine(value, time.min))
elif isinstance(value, bytes):
if schema_type == "INTEGER":
value = int.from_bytes(value, "big")
Expand Down
9 changes: 7 additions & 2 deletions tests/providers/google/cloud/transfers/test_mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,14 @@ def test_init(self):
@parameterized.expand(
[
("string", None, "string"),
(datetime.date(1970, 1, 2), None, 86400),
(datetime.date(1970, 1, 2), None, "1970-01-02 00:00:00"),
(datetime.date(1000, 1, 2), None, "1000-01-02 00:00:00"),
(datetime.date(1970, 1, 2), "DATE", "1970-01-02"),
(datetime.datetime(1970, 1, 1, 1, 0), None, 3600),
(datetime.date(1000, 1, 2), "DATE", "1000-01-02"),
(datetime.datetime(1970, 1, 1, 1, 0), None, "1970-01-01 01:00:00"),
(datetime.datetime(1000, 1, 1, 1, 0), None, "1000-01-01 01:00:00"),
(datetime.timedelta(), None, "00:00:00"),
(datetime.timedelta(hours=23, minutes=59, seconds=59), None, "23:59:59"),
(decimal.Decimal(5), None, 5),
(b"bytes", "BYTES", "Ynl0ZXM="),
(b"\x00\x01", "INTEGER", 1),
Expand Down

0 comments on commit b272f9c

Please sign in to comment.