Skip to content

Commit

Permalink
[AIRFLOW-7117] Honor self.schema in sql_to_gcs as schema to upload (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
whynick1 committed Apr 3, 2020
1 parent cc9b1bc commit 7ef75d2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
16 changes: 13 additions & 3 deletions airflow/providers/google/cloud/operators/sql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,28 @@ def _get_col_type_dict(self):
def _write_local_schema_file(self, cursor):
"""
Takes a cursor, and writes the BigQuery schema for the results to a
local file system.
local file system. Schema for database will be read from cursor if
not specified.
:return: A dictionary where key is a filename to be used as an object
name in GCS, and values are file handles to local files that
contains the BigQuery schema fields in .json format.
"""
schema = [self.field_to_bigquery(field) for field in cursor.description]
if self.schema:
self.log.info("Using user schema")
schema = self.schema
else:
self.log.info("Starts generating schema")
schema = [self.field_to_bigquery(field) for field in cursor.description]

if isinstance(schema, list):
schema = json.dumps(schema, sort_keys=True)

self.log.info('Using schema for %s', self.schema_filename)
self.log.debug("Current schema: %s", schema)

tmp_schema_file_handle = NamedTemporaryFile(delete=True)
tmp_schema_file_handle.write(json.dumps(schema, sort_keys=True).encode('utf-8'))
tmp_schema_file_handle.write(schema.encode('utf-8'))
schema_file_to_upload = {
'file_name': self.schema_filename,
'file_handle': tmp_schema_file_handle,
Expand Down
38 changes: 38 additions & 0 deletions tests/providers/google/cloud/operators/test_mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
BUCKET = 'gs://test'
JSON_FILENAME = 'test_{}.ndjson'
CSV_FILENAME = 'test_{}.csv'
SCHEMA = [
{'mode': 'REQUIRED', 'name': 'some_str', 'type': 'FLOAT'},
{'mode': 'REQUIRED', 'name': 'some_num', 'type': 'TIMESTAMP'}
]

ROWS = [
('mock_row_content_1', 42),
Expand Down Expand Up @@ -65,6 +69,10 @@
b'[{"mode": "REQUIRED", "name": "some_str", "type": "FLOAT"}, ',
b'{"mode": "REQUIRED", "name": "some_num", "type": "STRING"}]'
]
CUSTOM_SCHEMA_JSON = [
b'[{"mode": "REQUIRED", "name": "some_str", "type": "FLOAT"}, ',
b'{"mode": "REQUIRED", "name": "some_num", "type": "TIMESTAMP"}]'
]


class TestMySqlToGoogleCloudStorageOperator(unittest.TestCase):
Expand Down Expand Up @@ -293,6 +301,36 @@ def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip): # pylint: disab
# once for the file and once for the schema
self.assertEqual(2, gcs_hook_mock.upload.call_count)

@mock.patch('airflow.providers.google.cloud.operators.mysql_to_gcs.MySqlHook')
@mock.patch('airflow.providers.google.cloud.operators.sql_to_gcs.GCSHook')
def test_schema_file_with_custom_schema(self, gcs_hook_mock_class, mysql_hook_mock_class):
"""Test writing schema files with customized schema"""
mysql_hook_mock = mysql_hook_mock_class.return_value
mysql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
mysql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION

gcs_hook_mock = gcs_hook_mock_class.return_value

def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip): # pylint: disable=unused-argument
if obj == SCHEMA_FILENAME:
self.assertFalse(gzip)
with open(tmp_filename, 'rb') as file:
self.assertEqual(b''.join(CUSTOM_SCHEMA_JSON), file.read())

gcs_hook_mock.upload.side_effect = _assert_upload

op = MySQLToGCSOperator(
task_id=TASK_ID,
sql=SQL,
bucket=BUCKET,
filename=JSON_FILENAME,
schema_filename=SCHEMA_FILENAME,
schema=SCHEMA)
op.execute(None)

# once for the file and once for the schema
self.assertEqual(2, gcs_hook_mock.upload.call_count)

@mock.patch('airflow.providers.google.cloud.operators.mysql_to_gcs.MySqlHook')
@mock.patch('airflow.providers.google.cloud.operators.sql_to_gcs.GCSHook')
def test_query_with_error(self, mock_gcs_hook, mock_mysql_hook):
Expand Down

0 comments on commit 7ef75d2

Please sign in to comment.