Skip to content

Commit

Permalink
Add cli cmd to list the provider trigger info (#30822)
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Apr 24, 2023
1 parent 6e80bcd commit 9409446
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 33 deletions.
6 changes: 6 additions & 0 deletions airflow/cli/cli_config.py
Expand Up @@ -1754,6 +1754,12 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.provider_command.hooks_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="triggers",
help="List registered provider triggers",
func=lazy_load_command("airflow.cli.commands.provider_command.triggers_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="behaviours",
help="Get information about registered connection types with custom behaviours",
Expand Down
13 changes: 13 additions & 0 deletions airflow/cli/commands/provider_command.py
Expand Up @@ -80,6 +80,19 @@ def hooks_list(args):
)


@suppress_logs_and_warning
def triggers_list(args):
AirflowConsole().print_as(
data=ProvidersManager().trigger,
output=args.output,
mapper=lambda x: {
"package_name": x.package_name,
"class": x.trigger_class_name,
"integration_name": x.integration_name,
},
)


@suppress_logs_and_warning
def connection_form_widget_list(args):
"""Lists all custom connection form fields at the command line."""
Expand Down
6 changes: 3 additions & 3 deletions airflow/provider.yaml.schema.json
Expand Up @@ -208,8 +208,8 @@
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-modules": {
"description": "List of python modules containing the triggers.",
"class-names": {
"description": "List of the trigger class name that implements trigger.",
"type": "array",
"items": {
"type": "string"
Expand All @@ -219,7 +219,7 @@
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
"class-names"
]
}
},
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apache/kafka/provider.yaml
Expand Up @@ -57,8 +57,8 @@ sensors:

triggers:
- integration-name: Apache Kafka
python-modules:
- airflow.providers.apache.kafka.triggers.await_message
class-names:
- airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger

connection-types:
- hook-class-name: airflow.providers.apache.kafka.hooks.base.KafkaBaseHook
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apache/livy/provider.yaml
Expand Up @@ -69,8 +69,8 @@ hooks:

triggers:
- integration-name: Apache Livy
python-modules:
- airflow.providers.apache.livy.triggers.livy
class-names:
- airflow.providers.apache.livy.triggers.livy.LivyTrigger


connection-types:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/cncf/kubernetes/provider.yaml
Expand Up @@ -108,8 +108,8 @@ hooks:

triggers:
- integration-name: Kubernetes
python-modules:
- airflow.providers.cncf.kubernetes.triggers.pod
class-names:
- airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger


connection-types:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/databricks/provider.yaml
Expand Up @@ -97,8 +97,8 @@ hooks:

triggers:
- integration-name: Databricks
python-modules:
- airflow.providers.databricks.triggers.databricks
class-names:
- airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger

sensors:
- integration-name: Databricks
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/dbt/cloud/provider.yaml
Expand Up @@ -66,8 +66,8 @@ hooks:

triggers:
- integration-name: dbt Cloud
python-modules:
- airflow.providers.dbt.cloud.triggers.dbt
class-names:
- airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger


connection-types:
Expand Down
52 changes: 32 additions & 20 deletions airflow/providers/google/provider.yaml
Expand Up @@ -845,35 +845,47 @@ hooks:

triggers:
- integration-name: Google BigQuery Data Transfer Service
python-modules:
- airflow.providers.google.cloud.triggers.bigquery_dts
class-names:
- airflow.providers.google.cloud.triggers.bigquery_dts.BigQueryDataTransferRunTrigger
- integration-name: Google BigQuery
python-modules:
- airflow.providers.google.cloud.triggers.bigquery
class-names:
- airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger
- airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger
- airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger
- airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger
- airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger
- airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger
- airflow.providers.google.cloud.triggers.bigquery.BigQueryTablePartitionExistenceTrigger
- integration-name: Google Cloud Build
python-modules:
- airflow.providers.google.cloud.triggers.cloud_build
class-names:
- airflow.providers.google.cloud.triggers.cloud_build.CloudBuildCreateBuildTrigger
- integration-name: Google Cloud Composer
python-modules:
- airflow.providers.google.cloud.triggers.cloud_composer
class-names:
- airflow.providers.google.cloud.triggers.cloud_composer.CloudComposerExecutionTrigger
- integration-name: Google Dataflow
python-modules:
- airflow.providers.google.cloud.triggers.dataflow
class-names:
- airflow.providers.google.cloud.triggers.dataflow.TemplateJobStartTrigger
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.triggers.datafusion
class-names:
- airflow.providers.google.cloud.triggers.datafusion.DataFusionStartPipelineTrigger
- integration-name: Google Dataproc
python-modules:
- airflow.providers.google.cloud.triggers.dataproc
class-names:
- airflow.providers.google.cloud.triggers.dataproc.DataprocBaseTrigger
- airflow.providers.google.cloud.triggers.dataproc.DataprocSubmitTrigger
- airflow.providers.google.cloud.triggers.dataproc.DataprocClusterTrigger
- airflow.providers.google.cloud.triggers.dataproc.DataprocBatchTrigger
- airflow.providers.google.cloud.triggers.dataproc.DataprocDeleteClusterTrigger
- airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger
- integration-name: Google Cloud Storage (GCS)
python-modules:
- airflow.providers.google.cloud.triggers.gcs
class-names:
- airflow.providers.google.cloud.triggers.gcs.GCSBlobTrigger
- integration-name: Google Kubernetes Engine
python-modules:
- airflow.providers.google.cloud.triggers.kubernetes_engine
class-names:
- airflow.providers.google.cloud.triggers.kubernetes_engine.GKEStartPodTrigger
- airflow.providers.google.cloud.triggers.kubernetes_engine.GKEOperationTrigger
- integration-name: Google Machine Learning Engine
python-modules:
- airflow.providers.google.cloud.triggers.mlengine
class-names:
- airflow.providers.google.cloud.triggers.mlengine.MLEngineStartTrainingJobTrigger

transfers:
- source-integration-name: Presto
Expand Down
30 changes: 30 additions & 0 deletions airflow/providers_manager.py
Expand Up @@ -197,6 +197,14 @@ class HookClassProvider(NamedTuple):
package_name: str


class TriggerInfo(NamedTuple):
"""Trigger class and provider it comes from."""

trigger_class_name: str
package_name: str
integration_name: str


class HookInfo(NamedTuple):
"""Hook information."""

Expand Down Expand Up @@ -387,6 +395,7 @@ def __init__(self):
self._logging_class_name_set: set[str] = set()
self._secrets_backend_class_name_set: set[str] = set()
self._api_auth_backend_module_names: set[str] = set()
self._trigger_info_set: set[TriggerInfo] = set()
self._provider_schema_validator = _create_provider_info_schema_validator()
self._customized_form_fields_schema_validator = (
_create_customized_form_field_behaviours_schema_validator()
Expand Down Expand Up @@ -941,6 +950,27 @@ def _discover_auth_backends(self) -> None:
if _sanity_check(provider_package, auth_backend_module_name + ".init_app", provider):
self._api_auth_backend_module_names.add(auth_backend_module_name)

@provider_info_cache("triggers")
def initialize_providers_triggers(self):
"""Initialization of providers triggers."""
self.initialize_providers_list()
for provider_package, provider in self._provider_dict.items():
for trigger in provider.data.get("triggers", []):
for trigger_class_name in trigger.get("class-names"):
self._trigger_info_set.add(
TriggerInfo(
package_name=provider_package,
trigger_class_name=trigger_class_name,
integration_name=trigger.get("integration-name", ""),
)
)

@property
def trigger(self) -> list[TriggerInfo]:
"""Returns information about available providers trigger class."""
self.initialize_providers_triggers()
return sorted(self._trigger_info_set, key=lambda x: x.package_name)

@property
def providers(self) -> dict[str, ProviderInfo]:
"""Returns information about available providers."""
Expand Down

0 comments on commit 9409446

Please sign in to comment.