Skip to content

feature: Added support for Google cloud log sink management via google provider #52001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

arvindp25
Copy link
Contributor

@arvindp25 arvindp25 commented Jun 21, 2025


Issues/feature request link: #51929
This pull request introduces new operator in google provider.
With help of new operator airflow user now can create google cloud log sinks via airflow.

### Checklist

- [x] Unit tests added
- [x] System test added
- [x] Documentation updated (docstrings + relevant .rst files if applicable)
- [x] PR description includes motivation and usage example
- [x] Follows Airflow’s [contribution guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link

boring-cyborg bot commented Jun 21, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@boring-cyborg boring-cyborg bot added area:providers kind:documentation provider:google Google (including GCP) related issues labels Jun 21, 2025
@arvindp25 arvindp25 changed the title added support for Google cloud log sink management via google provider Added support for Google cloud log sink management via google provider Jun 21, 2025
@arvindp25 arvindp25 changed the title Added support for Google cloud log sink management via google provider feature: Added support for Google cloud log sink management via google provider Jun 21, 2025
Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks promising overall! Small comment regarding validation of templated fields.
@VladaZakharova - I'll be happy for additional feedback :)

@arvindp25 arvindp25 requested a review from shahar1 June 22, 2025 15:31
@arvindp25 arvindp25 force-pushed the feature/add-cloud-logging-export-operator branch from 5889af3 to ee8bc9d Compare June 22, 2025 16:34
@VladaZakharova
Copy link
Contributor

Hi
can you please show system tests results with screenshots? thanks

@arvindp25
Copy link
Contributor Author

arvindp25 commented Jun 23, 2025

Hi can you please show system tests results with screenshots? thanks
@VladaZakharova
for system test, I have recorded video please fast forward as per your need.

video.mp4

further below I have added some additional screenshot from manual dag testing.

creating multiple log sink with different destination
sink_create_multiple

updating log sink
update_sink

list sink
list_sink

delete sinks
delete sink

Please let me know if i have to do something else as Proof of work.
thanks,

Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, well done!
Waiting for @VladaZakharova 's approval to merge :)

Comment on lines 151 to 160
if self.filter_:
sink_config["filter"] = self.filter_
if self.description:
sink_config["description"] = self.description
if self.exclusion_filter:
sink_config["exclusions"] = _handle_excluison_filter(self.exclusion_filter)
if self.bigquery_options:
if isinstance(self.bigquery_options, dict):
bigquery_options = logging_v2.types.BigQueryOptions(**self.bigquery_options)
sink_config["bigquery_options"] = bigquery_options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the changes
Here I think it will be better to ask user to pass the configuration itself in dict format, instead of constructing this config in the operator - in case if some filed will be deprecated in API side we will need to make changes to the operator every time.

Copy link
Contributor Author

@arvindp25 arvindp25 Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, One doubt if operator accept sink_cofig::dict then it is possible to render it as native object?
or I should not allow it on template field.
`
def test_template_rendering(self):
operator = CloudLoggingCreateSinkOperator(
task_id=TASK_ID,
sink_config = "{{ var.value.sink_config }}",
project_id="{{ var.value.project_id }}"
)

    context = {
        "var": {"value": {"project_id": PROJECT_ID, "sink_config": sink_config_bq}},
    }

    operator.render_template_fields(context)

    assert operator.sink_config == sink_config_bq

`

I was trying to run this test case
but it fail due to mismatch in type str!=dict
and we cannot pass render_as_native_object=True at operator level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have example in example_kubernetes.py from cncf provider.
Still the user is passing object itself, not every field separately (because in my experience it adds more problems in the future :) )


response = client.create_sink(
request={
"parent": parent,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually we define and pass parent in the hook itself, as well as constructing request body

hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

client = hook.get_conn()
parent = f"projects/{self.project_id}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it shouldn't be here, can you please move it to hook?

self._validate_inputs()
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

client = hook.get_conn()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one should be in hooks as well

hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

client = hook.get_conn()
sink_path = f"projects/{self.project_id}/sinks/{self.sink_name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to hook?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own knowledge; what are you looking to move to the hook?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating client and constructing sink_path :)
please check other operators we have in google provider for better example

self._validate_inputs()
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

client = hook.get_conn()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment, can you please move it to hook?

hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

client = hook.get_conn()
sink_path = f"projects/{self.project_id}/sinks/{self.sink_name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be moved to hook as well

Comment on lines 364 to 379
if self.destination is not None:
current_sink.destination = self.destination
update_mask.append("destination")

if self.filter_ is not None:
current_sink.filter = self.filter_
update_mask.append("filter")

if self.description is not None:
current_sink.description = self.description
update_mask.append("description")

if self.disabled is not None:
current_sink.disabled = self.disabled
update_mask.append("disabled")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the same comment as for Create operator: we don't usually pass all the parameters in the operator that we want to update. Instead we use update_mask and the body of the NEW object that will be used to update existing one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here i can ask user to provide params sink_config and update_mask and pass it to hook, instead of creating mask on my own?

self.log.info("Updating fields: %s", ", ".join(update_mask))

response = client.update_sink(
request={
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same here about request, can you move it to hook?

]

with DAG(
dag_id="example_google_cloud_logging_sink",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we removed "example_" from dags names. can you please change it to only "google_cloud_logging_sink"

@arvindp25 arvindp25 force-pushed the feature/add-cloud-logging-export-operator branch 2 times, most recently from 8455f95 to 83e661b Compare June 28, 2025 12:17
@arvindp25
Copy link
Contributor Author

arvindp25 commented Jun 28, 2025

System Test:-
video
https://github.com/user-attachments/assets/01fce6e2-cc4f-4d11-b8c7-b1d06b61a47e

unit test hook:-
image

unit test operator:-
image

manual dag testing:-
creating sink ✅

test_bq_sink and test_pubsub_sink got created.
image

update_sink ✅
my-airflow-test-sink updated successfully.
image

list sink ✅
total number of sink on project 5 can be matched in log output
image

delete_sink ✅
all three sink got deleted.
image

@potiuk potiuk force-pushed the feature/add-cloud-logging-export-operator branch from 83e661b to 2e67e19 Compare June 28, 2025 14:28
@arvindp25
Copy link
Contributor Author

@VladaZakharova I have made requested change. Could you please re-review?
change:-
1.moved connection logic to hook
2. used sink_config for configuration support both dict and protobuf obj
3. removed example_ from dag names

Added:-
protobuf support.
system test with protobuf object.

@arvindp25 arvindp25 requested a review from VladaZakharova June 28, 2025 16:37
@arvindp25 arvindp25 force-pushed the feature/add-cloud-logging-export-operator branch from b88400b to e8e6689 Compare June 29, 2025 10:59
Copy link
Contributor

@VladaZakharova VladaZakharova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, small comments left 😺

hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

try:
sink_to_delete = hook.get_sink(sink_name=self.sink_name, project_id=self.project_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure we need this call here to retrieve it, maybe if we call delete() by passing the name only, it's okay to just call delete() alone? Also i hope delete() can throw NotFound if the resource doesn't exist

Copy link
Contributor Author

@arvindp25 arvindp25 Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that but then I will not able to return log sink config
delete api don't response back anything
https://cloud.google.com/logging/docs/reference/v2/rest/v2/projects.sinks/delete
I hope that is OK?

hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

try:
current_sink = hook.get_sink(sink_name=self.sink_name, project_id=self.project_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice way here to show the configuration before updating, good job!

@@ -0,0 +1,121 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have only one system test, no need to add 2 almost identical. All the information can be just listed in docs, and that will be enough :)

Comment on lines 96 to 98
You can also provide the ``sink_config`` as a native ``google.cloud.logging_v2.types.LogSink`` Protobuf object,
and the ``update_mask`` as a ``google.protobuf.field_mask_pb2.FieldMask``.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be enough, tbh, we don't need separate system tests to show that. Maybe you can add to one of the operator in the system tests input in protobuf type

@arvindp25 arvindp25 requested a review from VladaZakharova June 30, 2025 14:01
@arvindp25
Copy link
Contributor Author

arvindp25 commented Jun 30, 2025

@VladaZakharova made requested change. Please check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:documentation provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants