Skip to content

Commit

Permalink
MSSQLToGCSOperator fails: datetime is not JSON Serializable (#22882)
Browse files Browse the repository at this point in the history
* Handle date and time convert_type for MSSQLToGCSOperator
  • Loading branch information
pierrejeambrun committed Apr 11, 2022
1 parent 3f63e9d commit 03e1c9b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
5 changes: 4 additions & 1 deletion airflow/providers/google/cloud/transfers/mssql_to_gcs.py
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""MsSQL to GCS operator."""

import datetime
import decimal
from typing import Dict

Expand Down Expand Up @@ -84,7 +84,10 @@ def convert_type(cls, value, schema_type):
"""
Takes a value from MSSQL, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Datetime, Date and Time are converted to ISO formatted strings.
"""
if isinstance(value, decimal.Decimal):
return float(value)
if isinstance(value, (datetime.date, datetime.time)):
return value.isoformat()
return value
25 changes: 25 additions & 0 deletions tests/providers/google/cloud/transfers/test_mssql_to_gcs.py
Expand Up @@ -16,9 +16,12 @@
# specific language governing permissions and limitations
# under the License.

import datetime
import unittest
from unittest import mock

from parameterized import parameterized

from airflow import PY38

if not PY38:
Expand Down Expand Up @@ -50,6 +53,28 @@

@unittest.skipIf(PY38, "Mssql package not available when Python >= 3.8.")
class TestMsSqlToGoogleCloudStorageOperator(unittest.TestCase):
@parameterized.expand(
[
("string", "string"),
(32.9, 32.9),
(-2, -2),
(datetime.date(1970, 1, 2), "1970-01-02"),
(datetime.date(1000, 1, 2), "1000-01-02"),
(datetime.datetime(1970, 1, 1, 1, 0), "1970-01-01T01:00:00"),
(datetime.time(hour=0, minute=0, second=0), "00:00:00"),
(datetime.time(hour=23, minute=59, second=59), "23:59:59"),
]
)
def test_convert_type(self, value, expected):
op = MSSQLToGCSOperator(
task_id=TASK_ID,
mssql_conn_id=MSSQL_CONN_ID,
sql=SQL,
bucket=BUCKET,
filename=JSON_FILENAME,
)
assert op.convert_type(value, None) == expected

def test_init(self):
"""Test MySqlToGoogleCloudStorageOperator instance is properly initialized."""
op = MSSQLToGCSOperator(task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME)
Expand Down

0 comments on commit 03e1c9b

Please sign in to comment.