Skip to content

Commit

Permalink
[AIRFLOW-5421] Add Presto to GCS transfer operator (#7718)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Mar 18, 2020
1 parent 63a3102 commit 029c84e
Show file tree
Hide file tree
Showing 14 changed files with 1,016 additions and 11 deletions.
2 changes: 1 addition & 1 deletion BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ This is the current syntax for `./breeze <./breeze>`_:
start all integrations. Selected integrations are not saved for future execution.
One of:
cassandra kerberos mongo openldap rabbitmq redis all
cassandra kerberos mongo openldap presto rabbitmq redis all
****************************************************************************************************
Manage Kind kubernetes cluster (optional)
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
apache.livy http
dingding http
discord http
google amazon,apache.cassandra,cncf.kubernetes,microsoft.azure,microsoft.mssql,mysql,postgres,sftp
google amazon,apache.cassandra,cncf.kubernetes,microsoft.azure,microsoft.mssql,mysql,postgres,presto,sftp
microsoft.azure oracle
microsoft.mssql odbc
mysql amazon,presto,vertica
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"microsoft.mssql",
"mysql",
"postgres",
"presto",
"sftp"
],
"microsoft.azure": [
Expand Down
150 changes: 150 additions & 0 deletions airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example DAG using PrestoToGCSOperator.
"""
import os
import re

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateExternalTableOperator, BigQueryDeleteDatasetOperator,
BigQueryExecuteQueryOperator,
)
from airflow.providers.google.cloud.operators.presto_to_gcs import PrestoToGCSOperator
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", 'example-project')
GCS_BUCKET = os.environ.get("GCP_PRESTO_TO_GCS_BUCKET_NAME", "test-presto-to-gcs-bucket")
DATASET_NAME = os.environ.get("GCP_PRESTO_TO_GCS_DATASET_NAME", "test_presto_to_gcs_dataset")

SOURCE_MULTIPLE_TYPES = "memory.default.test_multiple_types"
SOURCE_CUSTOMER_TABLE = "tpch.sf1.customer"


def safe_name(s: str) -> str:
"""
Remove invalid characters for filename
"""
return re.sub("[^0-9a-zA-Z_]+", "_", s)


default_args = {"start_date": days_ago(1)}

with models.DAG(
dag_id=f"example_presto_to_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

# [START howto_operator_presto_to_gcs_basic]
presto_to_gcs_basic = PrestoToGCSOperator(
task_id="presto_to_gcs_basic",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)
# [END howto_operator_presto_to_gcs_basic]

# [START howto_operator_presto_to_gcs_multiple_types]
presto_to_gcs_multiple_types = PrestoToGCSOperator(
task_id="presto_to_gcs_multiple_types",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
gzip=False,
)
# [END howto_operator_presto_to_gcs_multiple_types]

# [START howto_operator_create_external_table_multiple_types]
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
task_id="create_external_table_multiple_types",
bucket=GCS_BUCKET,
source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}",
schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)
# [END howto_operator_create_external_table_multiple_types]

read_data_from_gcs_multiple_types = BigQueryExecuteQueryOperator(
task_id="read_data_from_gcs_multiple_types",
sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}`",
use_legacy_sql=False,
)

# [START howto_operator_presto_to_gcs_many_chunks]
presto_to_gcs_many_chunks = PrestoToGCSOperator(
task_id="presto_to_gcs_many_chunks",
sql=f"select * from {SOURCE_CUSTOMER_TABLE}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
approx_max_file_size_bytes=10_000_000,
gzip=False,
)
# [END howto_operator_presto_to_gcs_many_chunks]

create_external_table_many_chunks = BigQueryCreateExternalTableOperator(
task_id="create_external_table_many_chunks",
bucket=GCS_BUCKET,
source_objects=[f"{safe_name(SOURCE_CUSTOMER_TABLE)}.*.json"],
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}",
schema_object=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
)

# [START howto_operator_read_data_from_gcs_many_chunks]
read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator(
task_id="read_data_from_gcs_many_chunks",
sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}`",
use_legacy_sql=False,
)
# [END howto_operator_read_data_from_gcs_many_chunks]

# [START howto_operator_presto_to_gcs_csv]
presto_to_gcs_csv = PrestoToGCSOperator(
task_id="presto_to_gcs_csv",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
export_format="csv",
)
# [END howto_operator_presto_to_gcs_csv]

create_dataset >> presto_to_gcs_basic
create_dataset >> presto_to_gcs_multiple_types
create_dataset >> presto_to_gcs_many_chunks
create_dataset >> presto_to_gcs_csv

presto_to_gcs_multiple_types >> create_external_table_multiple_types >> read_data_from_gcs_multiple_types
presto_to_gcs_many_chunks >> create_external_table_many_chunks >> read_data_from_gcs_many_chunks

presto_to_gcs_basic >> delete_dataset
presto_to_gcs_csv >> delete_dataset
read_data_from_gcs_multiple_types >> delete_dataset
read_data_from_gcs_many_chunks >> delete_dataset
220 changes: 220 additions & 0 deletions airflow/providers/google/cloud/operators/presto_to_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, List, Tuple

from prestodb.dbapi import Cursor as PrestoCursor

from airflow.providers.google.cloud.operators.sql_to_gcs import BaseSQLToGCSOperator
from airflow.providers.presto.hooks.presto import PrestoHook
from airflow.utils.decorators import apply_defaults


class _PrestoToGCSPrestoCursorAdapter:
"""
An adapter that adds additional feature to the Presto cursor.
The implementation of cursor in the prestodb library is not sufficient.
The following changes have been made:
* The poke mechanism for row. You can look at the next row without consuming it.
* The description attribute is available before reading the first row. Thanks to the poke mechanism.
* the iterator interface has been implemented.
A detailed description of the class methods is available in
`PEP-249 <https://www.python.org/dev/peps/pep-0249/>`__.
"""

def __init__(self, cursor: PrestoCursor):
self.cursor: PrestoCursor = cursor
self.rows: List[Any] = []
self.initialized: bool = False

@property
def description(self) -> List[Tuple]:
"""
This read-only attribute is a sequence of 7-item sequences.
Each of these sequences contains information describing one result column:
* ``name``
* ``type_code``
* ``display_size``
* ``internal_size``
* ``precision``
* ``scale``
* ``null_ok``
The first two items (``name`` and ``type_code``) are mandatory, the other
five are optional and are set to None if no meaningful values can be provided.
"""
if not self.initialized:
# Peek for first row to load description.
self.peekone()
return self.cursor.description

@property
def rowcount(self) -> int:
"""The read-only attribute specifies the number of rows"""
return self.cursor.rowcount

def close(self) -> None:
"""Close the cursor now"""
self.cursor.close()

def execute(self, *args, **kwwargs):
"""Prepare and execute a database operation (query or command)."""
self.initialized = False
self.rows = []
return self.cursor.execute(*args, **kwwargs)

def executemany(self, *args, **kwargs):
"""
Prepare a database operation (query or command) and then execute it against all parameter
sequences or mappings found in the sequence seq_of_parameters.
"""
self.initialized = False
self.rows = []
return self.cursor.executemany(*args, **kwargs)

def peekone(self) -> Any:
"""
Return the next row without consuming it.
"""
self.initialized = True
element = self.cursor.fetchone()
self.rows.insert(0, element)
return element

def fetchone(self) -> Any:
"""
Fetch the next row of a query result set, returning a single sequence, or
``None`` when no more data is available.
"""
if self.rows:
return self.rows.pop(0)
return self.cursor.fetchone()

def fetchmany(self, size=None) -> List[Any]:
"""
Fetch the next set of rows of a query result, returning a sequence of sequences
(e.g. a list of tuples). An empty sequence is returned when no more rows are available.
"""
if size is None:
size = self.cursor.arraysize

result = []
for _ in range(size):
row = self.fetchone()
if row is None:
break
result.append(row)

return result

def __next__(self) -> Any:
"""
Return the next row from the currently executing SQL statement using the same semantics as
``.fetchone()``. A ``StopIteration`` exception is raised when the result set is exhausted.
:return:
"""
result = self.fetchone()
if result is None:
raise StopIteration()
return result

def __iter__(self) -> "_PrestoToGCSPrestoCursorAdapter":
"""
Return self to make cursors compatible to the iteration protocol
"""
return self


class PrestoToGCSOperator(BaseSQLToGCSOperator):
"""Copy data from PrestoDB to Google Cloud Storage in JSON or CSV format.
:param presto_conn_id: Reference to a specific Presto hook.
:type presto_conn_id: str
"""

ui_color = "#a0e08c"

type_map = {
"BOOLEAN": "BOOL",
"TINYINT": "INT64",
"SMALLINT": "INT64",
"INTEGER": "INT64",
"BIGINT": "INT64",
"REAL": "FLOAT64",
"DOUBLE": "FLOAT64",
"DECIMAL": "NUMERIC",
"VARCHAR": "STRING",
"CHAR": "STRING",
"VARBINARY": "BYTES",
"JSON": "STRING",
"DATE": "DATE",
"TIME": "TIME",
# BigQuery don't time with timezone native.
"TIME WITH TIME ZONE": "STRING",
"TIMESTAMP": "TIMESTAMP",
# BigQuery supports a narrow range of time zones during import.
# You should use TIMESTAMP function, if you want have TIMESTAMP type
"TIMESTAMP WITH TIME ZONE": "STRING",
"IPADDRESS": "STRING",
"UUID": "STRING",
}

@apply_defaults
def __init__(
self,
presto_conn_id: str = "presto_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.presto_conn_id = presto_conn_id

def query(self):
"""
Queries presto and returns a cursor to the results.
"""
presto = PrestoHook(presto_conn_id=self.presto_conn_id)
conn = presto.get_conn()
cursor = conn.cursor()
self.log.info("Executing: %s", self.sql)
cursor.execute(self.sql)
return _PrestoToGCSPrestoCursorAdapter(cursor)

def field_to_bigquery(self, field):
"""Convert presto field type to BigQuery field type."""
clear_field_type = field[1].upper()
# remove type argument e.g. DECIMAL(2, 10) => DECIMAL
clear_field_type, _, _ = clear_field_type.partition("(")
new_field_type = self.type_map.get(clear_field_type, "STRING")

return {"name": field[0], "type": new_field_type}

def convert_type(self, value, schema_type):
"""
Do nothing. Presto uses JSON on the transport layer, so types are simple.
:param value: Presto column value
:type value: Any
:param schema_type: BigQuery data type
:type schema_type: str
"""
return value

0 comments on commit 029c84e

Please sign in to comment.