Skip to content

Commit

Permalink
Update Oracle library to latest version (#24311)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmeerwood committed Jun 14, 2022
1 parent 7d8a17b commit 2a084ee
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 119 deletions.
20 changes: 10 additions & 10 deletions airflow/providers/google/cloud/transfers/oracle_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from decimal import Decimal
from typing import Dict

import cx_Oracle
import oracledb

from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator
from airflow.providers.oracle.hooks.oracle import OracleHook
Expand All @@ -45,15 +45,15 @@ class OracleToGCSOperator(BaseSQLToGCSOperator):
ui_color = '#a0e08c'

type_map = {
cx_Oracle.DB_TYPE_BINARY_DOUBLE: 'DECIMAL',
cx_Oracle.DB_TYPE_BINARY_FLOAT: 'DECIMAL',
cx_Oracle.DB_TYPE_BINARY_INTEGER: 'INTEGER',
cx_Oracle.DB_TYPE_BOOLEAN: 'BOOLEAN',
cx_Oracle.DB_TYPE_DATE: 'TIMESTAMP',
cx_Oracle.DB_TYPE_NUMBER: 'NUMERIC',
cx_Oracle.DB_TYPE_TIMESTAMP: 'TIMESTAMP',
cx_Oracle.DB_TYPE_TIMESTAMP_LTZ: 'TIMESTAMP',
cx_Oracle.DB_TYPE_TIMESTAMP_TZ: 'TIMESTAMP',
oracledb.DB_TYPE_BINARY_DOUBLE: 'DECIMAL', # type: ignore
oracledb.DB_TYPE_BINARY_FLOAT: 'DECIMAL', # type: ignore
oracledb.DB_TYPE_BINARY_INTEGER: 'INTEGER', # type: ignore
oracledb.DB_TYPE_BOOLEAN: 'BOOLEAN', # type: ignore
oracledb.DB_TYPE_DATE: 'TIMESTAMP', # type: ignore
oracledb.DB_TYPE_NUMBER: 'NUMERIC', # type: ignore
oracledb.DB_TYPE_TIMESTAMP: 'TIMESTAMP', # type: ignore
oracledb.DB_TYPE_TIMESTAMP_LTZ: 'TIMESTAMP', # type: ignore
oracledb.DB_TYPE_TIMESTAMP_TZ: 'TIMESTAMP', # type: ignore
}

def __init__(self, *, oracle_conn_id='oracle_default', ensure_utc=False, **kwargs):
Expand Down
53 changes: 21 additions & 32 deletions airflow/providers/oracle/hooks/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import datetime
from typing import Dict, List, Optional, Union

import cx_Oracle
import oracledb

try:
import numpy
Expand Down Expand Up @@ -57,7 +57,7 @@ class OracleHook(DbApiHook):

supports_autocommit = True

def get_conn(self) -> 'OracleHook':
def get_conn(self) -> oracledb.Connection:
"""
Returns a oracle connection object
Optional parameters for using a custom DSN connection
Expand All @@ -84,8 +84,8 @@ def get_conn(self) -> 'OracleHook':
)
}
see more param detail in
`cx_Oracle.connect <https://cx-oracle.readthedocs.io/en/latest/module.html#cx_Oracle.connect>`_
see more param detail in `oracledb.connect
<https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.connect>`_
"""
Expand All @@ -98,9 +98,9 @@ def get_conn(self) -> 'OracleHook':
service_name = conn.extra_dejson.get('service_name')
port = conn.port if conn.port else 1521
if conn.host and sid and not service_name:
conn_config['dsn'] = cx_Oracle.makedsn(conn.host, port, sid)
conn_config['dsn'] = oracledb.makedsn(conn.host, port, sid)
elif conn.host and service_name and not sid:
conn_config['dsn'] = cx_Oracle.makedsn(conn.host, port, service_name=service_name)
conn_config['dsn'] = oracledb.makedsn(conn.host, port, service_name=service_name)
else:
dsn = conn.extra_dejson.get('dsn')
if dsn is None:
Expand All @@ -119,51 +119,40 @@ def get_conn(self) -> 'OracleHook':
dsn += "/" + conn.schema
conn_config['dsn'] = dsn

if 'encoding' in conn.extra_dejson:
conn_config['encoding'] = conn.extra_dejson.get('encoding')
# if `encoding` is specific but `nencoding` is not
# `nencoding` should use same values as `encoding` to set encoding, inspired by
# https://github.com/oracle/python-cx_Oracle/issues/157#issuecomment-371877993
if 'nencoding' not in conn.extra_dejson:
conn_config['nencoding'] = conn.extra_dejson.get('encoding')
if 'nencoding' in conn.extra_dejson:
conn_config['nencoding'] = conn.extra_dejson.get('nencoding')
if 'threaded' in conn.extra_dejson:
conn_config['threaded'] = conn.extra_dejson.get('threaded')
if 'events' in conn.extra_dejson:
conn_config['events'] = conn.extra_dejson.get('events')

mode = conn.extra_dejson.get('mode', '').lower()
if mode == 'sysdba':
conn_config['mode'] = cx_Oracle.SYSDBA
conn_config['mode'] = oracledb.AUTH_MODE_SYSDBA
elif mode == 'sysasm':
conn_config['mode'] = cx_Oracle.SYSASM
conn_config['mode'] = oracledb.AUTH_MODE_SYSASM
elif mode == 'sysoper':
conn_config['mode'] = cx_Oracle.SYSOPER
conn_config['mode'] = oracledb.AUTH_MODE_SYSOPER
elif mode == 'sysbkp':
conn_config['mode'] = cx_Oracle.SYSBKP
conn_config['mode'] = oracledb.AUTH_MODE_SYSBKP
elif mode == 'sysdgd':
conn_config['mode'] = cx_Oracle.SYSDGD
conn_config['mode'] = oracledb.AUTH_MODE_SYSDGD
elif mode == 'syskmt':
conn_config['mode'] = cx_Oracle.SYSKMT
conn_config['mode'] = oracledb.AUTH_MODE_SYSKMT
elif mode == 'sysrac':
conn_config['mode'] = cx_Oracle.SYSRAC
conn_config['mode'] = oracledb.AUTH_MODE_SYSRAC

purity = conn.extra_dejson.get('purity', '').lower()
if purity == 'new':
conn_config['purity'] = cx_Oracle.ATTR_PURITY_NEW
conn_config['purity'] = oracledb.PURITY_NEW
elif purity == 'self':
conn_config['purity'] = cx_Oracle.ATTR_PURITY_SELF
conn_config['purity'] = oracledb.PURITY_SELF
elif purity == 'default':
conn_config['purity'] = cx_Oracle.ATTR_PURITY_DEFAULT
conn_config['purity'] = oracledb.PURITY_DEFAULT

conn = cx_Oracle.connect(**conn_config)
conn = oracledb.connect(**conn_config)
if mod is not None:
conn.module = mod

# if Connection.schema is defined, set schema after connecting successfully
# cannot be part of conn_config
# https://cx-oracle.readthedocs.io/en/latest/api_manual/connection.html?highlight=schema#Connection.current_schema
# https://python-oracledb.readthedocs.io/en/latest/api_manual/connection.html?highlight=schema#Connection.current_schema
# Only set schema when not using conn.schema as Service Name
if schema and service_name:
conn.current_schema = schema
Expand All @@ -184,7 +173,7 @@ def insert_rows(
the whole set of inserts is treated as one transaction
Changes from standard DbApiHook implementation:
- Oracle SQL queries in cx_Oracle can not be terminated with a semicolon (`;`)
- Oracle SQL queries in oracledb can not be terminated with a semicolon (`;`)
- Replace NaN values with NULL using `numpy.nan_to_num` (not using
`is_nan()` because of input types error for strings)
- Coerce datetime cells to Oracle DATETIME format during insert
Expand Down Expand Up @@ -245,7 +234,7 @@ def bulk_insert_rows(
commit_every: int = 5000,
):
"""
A performant bulk insert for cx_Oracle
A performant bulk insert for oracledb
that uses prepared statements via `executemany()`.
For best performance, pass in `rows` as an iterator.
Expand Down Expand Up @@ -307,7 +296,7 @@ def callproc(
provided `parameters` argument.
See
https://cx-oracle.readthedocs.io/en/latest/api_manual/cursor.html#Cursor.var
https://python-oracledb.readthedocs.io/en/latest/api_manual/cursor.html#Cursor.var
for further reference.
"""
if parameters is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ Reference
---------

For further information, look at:
* `cx_Oracle Documentation <https://cx-oracle.readthedocs.io/en/latest/>`__
* `oracledb Documentation <https://python-oracledb.readthedocs.io/en/latest/>`__
* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-oracle/connections/oracle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ Extra (optional)
Schema = "orcl"
More details on all Oracle connect parameters supported can be found in `cx_Oracle documentation
<https://cx-oracle.readthedocs.io/en/latest/api_manual/module.html#cx_Oracle.connect>`_.
More details on all Oracle connect parameters supported can be found in `oracledb documentation
<https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.connect>`_.

Information on creating an Oracle Connection through the web user interface can be found in Airflow's :doc:`Managing Connections Documentation <apache-airflow:howto/connection>`.

Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-oracle/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Requirements
PIP package Version required
================== ==================
``apache-airflow`` ``>=2.2.0``
``cx_Oracle`` ``>=5.1.2``
``oracledb`` ``>=1.0.0``
================== ==================

.. include:: ../../airflow/providers/oracle/CHANGELOG.rst
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,6 @@ def _get_params(root_schema: dict, prefix: str = "", default_section: str = "")
'celery',
'cloudant',
'cryptography',
'cx_Oracle',
'datadog',
'distributed',
'docker',
Expand All @@ -567,6 +566,7 @@ def _get_params(root_schema: dict, prefix: str = "", default_section: str = "")
'kubernetes',
'msrestazure',
'oss2',
'oracledb',
'pandas',
'pandas_gbq',
'paramiko',
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
'opsgenie-sdk>=2.1.5',
]
oracle = [
'cx_Oracle>=5.1.2',
'oracledb>=1.0.0',
]
pagerduty = [
'pdpyras>=4.1.2',
Expand Down
6 changes: 3 additions & 3 deletions tests/providers/google/cloud/transfers/test_oracle_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import unittest
from unittest import mock

import cx_Oracle
import oracledb

from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator

Expand All @@ -32,8 +32,8 @@

ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)]
CURSOR_DESCRIPTION = (
('some_str', cx_Oracle.DB_TYPE_VARCHAR, None, None, None, None, None),
('some_num', cx_Oracle.DB_TYPE_NUMBER, None, None, None, None, None),
('some_str', oracledb.DB_TYPE_VARCHAR, None, None, None, None, None), # type: ignore
('some_num', oracledb.DB_TYPE_NUMBER, None, None, None, None, None), # type: ignore
)
NDJSON_LINES = [
b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def test_write_temp_file(self):
delimiter = '|'
encoding = 'utf-8'
cursor_description = [
('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0),
('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 1),
('id', "<class 'oracledb.NUMBER'>", 39, None, 38, 0, 0),
('description', "<class 'oracledb.STRING'>", 60, 240, None, None, 1),
]
cursor_rows = [[1, 'description 1'], [2, 'description 2']]
mock_cursor = MagicMock()
Expand Down Expand Up @@ -95,8 +95,8 @@ def test_execute(self, mock_data_lake_hook, mock_oracle_hook):
delimiter = '|'
encoding = 'latin-1'
cursor_description = [
('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0),
('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 1),
('id', "<class 'oracledb.NUMBER'>", 39, None, 38, 0, 0),
('description', "<class 'oracledb.STRING'>", 60, 240, None, None, 1),
]
cursor_rows = [[1, 'description 1'], [2, 'description 2']]
cursor_mock = MagicMock()
Expand Down

0 comments on commit 2a084ee

Please sign in to comment.