Skip to content

Commit

Permalink
[AIRFLOW-6102] [AIP-21] Rename Dataproc operators (#7151)
Browse files Browse the repository at this point in the history
  • Loading branch information
michalslowikowski00 authored and turbaszek committed Jan 13, 2020
1 parent 5444bfe commit f4d3e5e
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 81 deletions.
28 changes: 14 additions & 14 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,19 +483,19 @@ The following table shows changes in import paths.
|airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator |airflow.gcp.operators.dataflow.DataFlowJavaOperator |
|airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator |airflow.gcp.operators.dataflow.DataFlowPythonOperator |
|airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator |airflow.gcp.operators.dataflow.DataflowTemplateOperator |
|airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator |airflow.gcp.operators.dataproc.DataProcHadoopOperator |
|airflow.contrib.operators.dataproc_operator.DataProcHiveOperator |airflow.gcp.operators.dataproc.DataProcHiveOperator |
|airflow.contrib.operators.dataproc_operator.DataProcJobBaseOperator |airflow.gcp.operators.dataproc.DataProcJobBaseOperator |
|airflow.contrib.operators.dataproc_operator.DataProcPigOperator |airflow.gcp.operators.dataproc.DataProcPigOperator |
|airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator |airflow.gcp.operators.dataproc.DataProcPySparkOperator |
|airflow.contrib.operators.dataproc_operator.DataProcSparkOperator |airflow.gcp.operators.dataproc.DataProcSparkOperator |
|airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator |airflow.gcp.operators.dataproc.DataProcSparkSqlOperator |
|airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator |airflow.gcp.operators.dataproc.DataprocClusterCreateOperator |
|airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator |airflow.gcp.operators.dataproc.DataprocClusterDeleteOperator |
|airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator |airflow.gcp.operators.dataproc.DataprocClusterScaleOperator |
|airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator |airflow.gcp.operators.dataproc.DataprocSubmitHadoopJobOperator |
|airflow.contrib.operators.dataproc_operator.DataProcHiveOperator |airflow.gcp.operators.dataproc.DataprocSubmitHiveJobOperator |
|airflow.contrib.operators.dataproc_operator.DataProcJobBaseOperator |airflow.gcp.operators.dataproc.DataprocJobBaseOperator |
|airflow.contrib.operators.dataproc_operator.DataProcPigOperator |airflow.gcp.operators.dataproc.DataprocSubmitPigJobOperator |
|airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator |airflow.gcp.operators.dataproc.DataprocSubmitPySparkJobOperator |
|airflow.contrib.operators.dataproc_operator.DataProcSparkOperator |airflow.gcp.operators.dataproc.DataprocSubmitSparkJobOperator |
|airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator |airflow.gcp.operators.dataproc.DataprocSubmitSparkSqlJobOperator |
|airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator |airflow.gcp.operators.dataproc.DataprocCreateClusterOperator |
|airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator |airflow.gcp.operators.dataproc.DataprocDeleteClusterOperator |
|airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator |airflow.gcp.operators.dataproc.DataprocScaleClusterOperator |
|airflow.contrib.operators.dataproc_operator.DataprocOperationBaseOperator |airflow.gcp.operators.dataproc.DataprocOperationBaseOperator |
|airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateInlineOperator |airflow.gcp.operators.dataproc.DataprocWorkflowTemplateInstantiateInlineOperator |
|airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateOperator |airflow.gcp.operators.dataproc.DataprocWorkflowTemplateInstantiateOperator |
|airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateInlineOperator |airflow.gcp.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator |
|airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateOperator |airflow.gcp.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator |
|airflow.contrib.operators.datastore_export_operator.DatastoreExportOperator |airflow.gcp.operators.datastore.DatastoreExportOperator |
|airflow.contrib.operators.datastore_import_operator.DatastoreImportOperator |airflow.gcp.operators.datastore.DatastoreImportOperator |
|airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperator |airflow.operators.local_to_gcs.FileToGoogleCloudStorageOperator |
Expand Down Expand Up @@ -741,7 +741,7 @@ The Mesos Executor is removed from the code base as it was not widely used and n
It is highly recommended to have 1TB+ disk size for Dataproc to have sufficient throughput:
https://cloud.google.com/compute/docs/disks/performance

Hence, the default value for `master_disk_size` in DataprocClusterCreateOperator has beeen changes from 500GB to 1TB.
Hence, the default value for `master_disk_size` in DataprocCreateClusterOperator has beeen changes from 500GB to 1TB.

### Changes to SalesforceHook

Expand Down Expand Up @@ -1303,7 +1303,7 @@ Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in
`AIRFLOW_GPL_UNIDECODE=yes`. In case of the latter a GPL runtime dependency will be installed due to a
dependency (python-nvd3 -> python-slugify -> unidecode).

### Replace DataProcHook.await calls to DataProcHook.wait
### Replace DataprocHook.await calls to DataprocHook.wait

The method name was changed to be compatible with the Python 3.7 async/await keywords

Expand Down
10 changes: 5 additions & 5 deletions airflow/contrib/hooks/gcp_dataproc_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import warnings

# pylint: disable=unused-import
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook # noqa
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook

warnings.warn(
"This module is deprecated. Please use `airflow.providers.google.cloud.hooks.dataproc`.",
Expand All @@ -31,13 +30,14 @@

class DataProcHook(DataprocHook):
"""
This class is deprecated. Please use `airflow.providers.google.cloud.hooks.dataproc.DataprocHook`.
This class is deprecated.
Please use `airflow.providers.google.cloud.hooks.dataproc.DataprocHook`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This class is deprecated. Please use `airflow.providers.google."
"cloud.hooks.dataproc.DataprocHook`.",
"""This class is deprecated.
Please use `airflow.providers.google.cloud.hooks.dataproc.DataprocHook`.""",
DeprecationWarning, stacklevel=2
)

Expand Down
193 changes: 186 additions & 7 deletions airflow/contrib/operators/dataproc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,196 @@

import warnings

# pylint: disable=unused-import
from airflow.providers.google.cloud.operators.dataproc import ( # noqa
DataprocClusterCreateOperator, DataprocClusterDeleteOperator, DataprocClusterScaleOperator,
DataProcHadoopOperator, DataProcHiveOperator, DataProcJobBaseOperator, DataProcJobBuilder,
DataProcPigOperator, DataProcPySparkOperator, DataProcSparkOperator, DataProcSparkSqlOperator,
DataprocSubmitJobOperator, DataprocUpdateClusterOperator,
DataprocWorkflowTemplateInstantiateInlineOperator, DataprocWorkflowTemplateInstantiateOperator,
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator, DataprocDeleteClusterOperator,
DataprocInstantiateInlineWorkflowTemplateOperator, DataprocInstantiateWorkflowTemplateOperator,
DataprocJobBaseOperator, DataprocScaleClusterOperator, DataprocSubmitHadoopJobOperator,
DataprocSubmitHiveJobOperator, DataprocSubmitPigJobOperator, DataprocSubmitPySparkJobOperator,
DataprocSubmitSparkJobOperator, DataprocSubmitSparkSqlJobOperator,
)

warnings.warn(
"This module is deprecated. Please use `airflow.providers.google.cloud.operators.dataproc`.",
DeprecationWarning,
stacklevel=2,
)


class DataprocClusterCreateOperator(DataprocCreateClusterOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocCreateClusterOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocCreateClusterOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataprocClusterDeleteOperator(DataprocDeleteClusterOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocDeleteClusterOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocDeleteClusterOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataprocClusterScaleOperator(DataprocScaleClusterOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocScaleClusterOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocScaleClusterOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataProcHadoopOperator(DataprocSubmitHadoopJobOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitHadoopJobOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitHadoopJobOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataProcHiveOperator(DataprocSubmitHiveJobOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitHiveJobOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitHiveJobOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataProcJobBaseOperator(DataprocJobBaseOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocJobBaseOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocJobBaseOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataProcPigOperator(DataprocSubmitPigJobOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitPigJobOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitPigJobOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataProcPySparkOperator(DataprocSubmitPySparkJobOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitPySparkJobOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitPySparkJobOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataProcSparkOperator(DataprocSubmitSparkJobOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitSparkJobOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitSparkJobOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataProcSparkSqlOperator(DataprocSubmitSparkSqlJobOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitSparkSqlJobOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocSubmitSparkSqlJobOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataprocWorkflowTemplateInstantiateInlineOperator(DataprocInstantiateInlineWorkflowTemplateOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)


class DataprocWorkflowTemplateInstantiateOperator(DataprocInstantiateWorkflowTemplateOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator`.""",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocClusterCreateOperator, DataprocClusterDeleteOperator, DataprocSubmitJobOperator,
DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitJobOperator,
DataprocUpdateClusterOperator,
)
from airflow.utils.dates import days_ago
Expand Down Expand Up @@ -125,7 +125,7 @@
default_args={"start_date": days_ago(1)},
schedule_interval=None,
) as dag:
create_cluster = DataprocClusterCreateOperator(
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION
)

Expand Down Expand Up @@ -166,7 +166,7 @@
task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID
)

delete_cluster = DataprocClusterDeleteOperator(
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
Expand Down

0 comments on commit f4d3e5e

Please sign in to comment.