Skip to content

Commit

Permalink
[AIRFLOW-6610] Move software classes to providers package (#7231)
Browse files Browse the repository at this point in the history
* [AIP-21] Move contrib.hooks.mongo_hook providers.mongo.hooks.mongo

* [AIP-21] Move contrib.hooks.openfaas_hook providers.openfass.hooks.openfaas

* [AIP-21] Move contrib.hooks.redis_hook providers.redis.hooks.redis

* [AIP-21] Move contrib.operators.docker_swarm_operator providers.docker.operators.docker_swarm

* [AIP-21] Move contrib.operators.redis_publish_operator providers.redis.operators.redis_publish

* [AIP-21] Move contrib.operators.kubernetes_pod_operator providers.cncf.kubernetes.operators.kubernetes_pod

* [AIP-21] Move contrib.sensors.bash_sensor sensors.bash

* [AIP-21] Move contrib.sensors.celery_queue_sensor providers.celery.sensors.celery_queue

* [AIP-21] Move contrib.sensors.mongo_sensor providers.mongo.sensors.mongo

* [AIP-21] Move contrib.sensors.python_sensor sensors.python

* [AIP-21] Move contrib.sensors.redis_key_sensor providers.redis.sensors.redis_key

* [AIP-21] Move contrib.sensors.redis_pub_sub_sensor providers.redis.sensors.redis_pub_sub

* [AIP-21] Move hooks.docker_hook providers.docker.hooks.docker

* [AIP-21] Move hooks.mssql_hook providers.microsoft.mssql.hooks.mssql

* [AIP-21] Move hooks.mysql_hook providers.mysql.hooks.mysql

* [AIP-21] Move hooks.oracle_hook providers.oracle.hooks.oracle

* [AIP-21] Move hooks.postgres_hook providers.postgres.hooks.postgres

* [AIP-21] Move hooks.presto_hook providers.presto.hooks.presto

* [AIP-21] Move hooks.samba_hook providers.samba.hooks.samba

* [AIP-21] Move hooks.sqlite_hook providers.sqlite.hooks.sqlite

* [AIP-21] Move operators.bash_operator operators.bash

* [AIP-21] Move operators.docker_operator providers.docker.operators.docker

* [AIP-21] Move operators.mssql_operator providers.microsoft.mssql.operators.mssql

* [AIP-21] Move operators.mysql_operator providers.mssql.operators.mysql

* [AIP-21] Move operators.oracle_operator providers.oracle.operators.oracle

* [AIP-21] Move operators.papermill_operator providers.papermill.operators.papermill

* [AIP-21] Move operators.postgres_operator providers.postgres.operators.postgres

* [AIP-21] Move operators.presto_check_operator providers.presto.operators.presto_check

* [AIP-21] Move operators.python_operator operators.python

* [AIP-21] Move operators.sqlite_operator providers.sqlite.operators.sqlite

* Update docs
  • Loading branch information
mik-laj committed Jan 21, 2020
1 parent c1ede4d commit 059eda0
Show file tree
Hide file tree
Showing 249 changed files with 5,812 additions and 3,593 deletions.
Expand Up @@ -22,7 +22,7 @@
import os

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

args = {
Expand Down
Expand Up @@ -23,7 +23,7 @@

from airflow.contrib.example_dags.libs.helper import print_stuff
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
Expand Down
Expand Up @@ -28,7 +28,7 @@
try:
# Kubernetes is optional, so not available in vanilla Airflow
# pip install 'apache-airflow[kubernetes]'
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
'owner': 'airflow',
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/example_dags/example_papermill_operator.py
Expand Up @@ -25,7 +25,7 @@
from datetime import timedelta

from airflow.models import DAG
from airflow.operators.papermill_operator import PapermillOperator
from airflow.providers.papermill.operators.papermill import PapermillOperator
from airflow.utils.dates import days_ago

default_args = {
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/example_dags/example_qubole_operator.py
Expand Up @@ -34,7 +34,7 @@
from airflow import DAG
from airflow.contrib.operators.qubole_operator import QuboleOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.dates import days_ago

default_args = {
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/example_dags/example_twitter_dag.py
Expand Up @@ -32,8 +32,8 @@
from datetime import date, timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.utils.dates import days_ago

Expand Down
286 changes: 8 additions & 278 deletions airflow/contrib/hooks/mongo_hook.py
Expand Up @@ -16,284 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Mongo DB"""
from ssl import CERT_NONE
"""This module is deprecated. Please use `airflow.providers.mongo.hooks.mongo`."""

from pymongo import MongoClient, ReplaceOne
import warnings

from airflow.hooks.base_hook import BaseHook
# pylint: disable=unused-import
from airflow.providers.mongo.hooks.mongo import MongoHook # noqa


class MongoHook(BaseHook):
"""
PyMongo Wrapper to Interact With Mongo Database
Mongo Connection Documentation
https://docs.mongodb.com/manual/reference/connection-string/index.html
You can specify connection string options in extra field of your connection
https://docs.mongodb.com/manual/reference/connection-string/index.html#connection-string-options
If you want use DNS seedlist, set `srv` to True.
ex.
{"srv": true, "replicaSet": "test", "ssl": true, "connectTimeoutMS": 30000}
"""
conn_type = 'mongo'

def __init__(self, conn_id='mongo_default', *args, **kwargs):

self.mongo_conn_id = conn_id
self.connection = self.get_connection(conn_id)
self.extras = self.connection.extra_dejson.copy()
self.client = None

srv = self.extras.pop('srv', False)
scheme = 'mongodb+srv' if srv else 'mongodb'

self.uri = '{scheme}://{creds}{host}{port}/{database}'.format(
scheme=scheme,
creds='{}:{}@'.format(
self.connection.login, self.connection.password
) if self.connection.login else '',

host=self.connection.host,
port='' if self.connection.port is None else ':{}'.format(self.connection.port),
database=self.connection.schema
)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.client is not None:
self.close_conn()

def get_conn(self):
"""
Fetches PyMongo Client
"""
if self.client is not None:
return self.client

# Mongo Connection Options dict that is unpacked when passed to MongoClient
options = self.extras

# If we are using SSL disable requiring certs from specific hostname
if options.get('ssl', False):
options.update({'ssl_cert_reqs': CERT_NONE})

self.client = MongoClient(self.uri, **options)

return self.client

def close_conn(self) -> None:
"""Closes connection"""
client = self.client
if client is not None:
client.close()
self.client = None

def get_collection(self, mongo_collection, mongo_db=None):
"""
Fetches a mongo collection object for querying.
Uses connection schema as DB unless specified.
"""
mongo_db = mongo_db if mongo_db is not None else self.connection.schema
mongo_conn = self.get_conn()

return mongo_conn.get_database(mongo_db).get_collection(mongo_collection)

def aggregate(self, mongo_collection, aggregate_query, mongo_db=None, **kwargs):
"""
Runs an aggregation pipeline and returns the results
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.aggregate
https://api.mongodb.com/python/current/examples/aggregation.html
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

return collection.aggregate(aggregate_query, **kwargs)

def find(self, mongo_collection, query, find_one=False, mongo_db=None, **kwargs):
"""
Runs a mongo find query and returns the results
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.find
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

if find_one:
return collection.find_one(query, **kwargs)
else:
return collection.find(query, **kwargs)

def insert_one(self, mongo_collection, doc, mongo_db=None, **kwargs):
"""
Inserts a single document into a mongo collection
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.insert_one
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

return collection.insert_one(doc, **kwargs)

def insert_many(self, mongo_collection, docs, mongo_db=None, **kwargs):
"""
Inserts many docs into a mongo collection.
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.insert_many
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

return collection.insert_many(docs, **kwargs)

def update_one(self, mongo_collection, filter_doc, update_doc,
mongo_db=None, **kwargs):
"""
Updates a single document in a mongo collection.
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.update_one
:param mongo_collection: The name of the collection to update.
:type mongo_collection: str
:param filter_doc: A query that matches the documents to update.
:type filter_doc: dict
:param update_doc: The modifications to apply.
:type update_doc: dict
:param mongo_db: The name of the database to use.
Can be omitted; then the database from the connection string is used.
:type mongo_db: str
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

return collection.update_one(filter_doc, update_doc, **kwargs)

def update_many(self, mongo_collection, filter_doc, update_doc,
mongo_db=None, **kwargs):
"""
Updates one or more documents in a mongo collection.
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.update_many
:param mongo_collection: The name of the collection to update.
:type mongo_collection: str
:param filter_doc: A query that matches the documents to update.
:type filter_doc: dict
:param update_doc: The modifications to apply.
:type update_doc: dict
:param mongo_db: The name of the database to use.
Can be omitted; then the database from the connection string is used.
:type mongo_db: str
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

return collection.update_many(filter_doc, update_doc, **kwargs)

def replace_one(self, mongo_collection, doc, filter_doc=None,
mongo_db=None, **kwargs):
"""
Replaces a single document in a mongo collection.
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.replace_one
.. note::
If no ``filter_doc`` is given, it is assumed that the replacement
document contain the ``_id`` field which is then used as filters.
:param mongo_collection: The name of the collection to update.
:type mongo_collection: str
:param doc: The new document.
:type doc: dict
:param filter_doc: A query that matches the documents to replace.
Can be omitted; then the _id field from doc will be used.
:type filter_doc: dict
:param mongo_db: The name of the database to use.
Can be omitted; then the database from the connection string is used.
:type mongo_db: str
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

if not filter_doc:
filter_doc = {'_id': doc['_id']}

return collection.replace_one(filter_doc, doc, **kwargs)

def replace_many(self, mongo_collection, docs,
filter_docs=None, mongo_db=None, upsert=False, collation=None,
**kwargs):
"""
Replaces many documents in a mongo collection.
Uses bulk_write with multiple ReplaceOne operations
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write
.. note::
If no ``filter_docs``are given, it is assumed that all
replacement documents contain the ``_id`` field which are then
used as filters.
:param mongo_collection: The name of the collection to update.
:type mongo_collection: str
:param docs: The new documents.
:type docs: list[dict]
:param filter_docs: A list of queries that match the documents to replace.
Can be omitted; then the _id fields from docs will be used.
:type filter_docs: list[dict]
:param mongo_db: The name of the database to use.
Can be omitted; then the database from the connection string is used.
:type mongo_db: str
:param upsert: If ``True``, perform an insert if no documents
match the filters for the replace operation.
:type upsert: bool
:param collation: An instance of
:class:`~pymongo.collation.Collation`. This option is only
supported on MongoDB 3.4 and above.
:type collation: pymongo.collation.Collation
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

if not filter_docs:
filter_docs = [{'_id': doc['_id']} for doc in docs]

requests = [
ReplaceOne(
filter_docs[i],
docs[i],
upsert=upsert,
collation=collation)
for i in range(len(docs))
]

return collection.bulk_write(requests, **kwargs)

def delete_one(self, mongo_collection, filter_doc, mongo_db=None, **kwargs):
"""
Deletes a single document in a mongo collection.
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.delete_one
:param mongo_collection: The name of the collection to delete from.
:type mongo_collection: str
:param filter_doc: A query that matches the document to delete.
:type filter_doc: dict
:param mongo_db: The name of the database to use.
Can be omitted; then the database from the connection string is used.
:type mongo_db: str
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

return collection.delete_one(filter_doc, **kwargs)

def delete_many(self, mongo_collection, filter_doc, mongo_db=None, **kwargs):
"""
Deletes one or more documents in a mongo collection.
https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.delete_many
:param mongo_collection: The name of the collection to delete from.
:type mongo_collection: str
:param filter_doc: A query that matches the documents to delete.
:type filter_doc: dict
:param mongo_db: The name of the database to use.
Can be omitted; then the database from the connection string is used.
:type mongo_db: str
"""
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

return collection.delete_many(filter_doc, **kwargs)
warnings.warn(
"This module is deprecated. Please use `airflow.providers.mongo.hooks.mongo`.",
DeprecationWarning, stacklevel=2
)

0 comments on commit 059eda0

Please sign in to comment.