-
Notifications
You must be signed in to change notification settings - Fork 15.2k
feature: Added support for Google cloud log sink management via google provider #52001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feature: Added support for Google cloud log sink management via google provider #52001
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promising overall! Small comment regarding validation of templated fields.
@VladaZakharova - I'll be happy for additional feedback :)
providers/google/src/airflow/providers/google/cloud/operators/cloud_logging_sink.py
Outdated
Show resolved
Hide resolved
5889af3
to
ee8bc9d
Compare
Hi |
video.mp4further below I have added some additional screenshot from manual dag testing. creating multiple log sink with different destination Please let me know if i have to do something else as Proof of work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, well done!
Waiting for @VladaZakharova 's approval to merge :)
if self.filter_: | ||
sink_config["filter"] = self.filter_ | ||
if self.description: | ||
sink_config["description"] = self.description | ||
if self.exclusion_filter: | ||
sink_config["exclusions"] = _handle_excluison_filter(self.exclusion_filter) | ||
if self.bigquery_options: | ||
if isinstance(self.bigquery_options, dict): | ||
bigquery_options = logging_v2.types.BigQueryOptions(**self.bigquery_options) | ||
sink_config["bigquery_options"] = bigquery_options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the changes
Here I think it will be better to ask user to pass the configuration itself in dict format, instead of constructing this config in the operator - in case if some filed will be deprecated in API side we will need to make changes to the operator every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, One doubt if operator accept sink_cofig::dict then it is possible to render it as native object?
or I should not allow it on template field.
`
def test_template_rendering(self):
operator = CloudLoggingCreateSinkOperator(
task_id=TASK_ID,
sink_config = "{{ var.value.sink_config }}",
project_id="{{ var.value.project_id }}"
)
context = {
"var": {"value": {"project_id": PROJECT_ID, "sink_config": sink_config_bq}},
}
operator.render_template_fields(context)
assert operator.sink_config == sink_config_bq
`
I was trying to run this test case
but it fail due to mismatch in type str!=dict
and we cannot pass render_as_native_object=True at operator level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have example in example_kubernetes.py from cncf provider.
Still the user is passing object itself, not every field separately (because in my experience it adds more problems in the future :) )
|
||
response = client.create_sink( | ||
request={ | ||
"parent": parent, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually we define and pass parent in the hook itself, as well as constructing request body
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) | ||
|
||
client = hook.get_conn() | ||
parent = f"projects/{self.project_id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it shouldn't be here, can you please move it to hook?
self._validate_inputs() | ||
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) | ||
|
||
client = hook.get_conn() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one should be in hooks as well
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) | ||
|
||
client = hook.get_conn() | ||
sink_path = f"projects/{self.project_id}/sinks/{self.sink_name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move it to hook?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own knowledge; what are you looking to move to the hook?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creating client and constructing sink_path :)
please check other operators we have in google provider for better example
self._validate_inputs() | ||
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) | ||
|
||
client = hook.get_conn() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment, can you please move it to hook?
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) | ||
|
||
client = hook.get_conn() | ||
sink_path = f"projects/{self.project_id}/sinks/{self.sink_name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be moved to hook as well
if self.destination is not None: | ||
current_sink.destination = self.destination | ||
update_mask.append("destination") | ||
|
||
if self.filter_ is not None: | ||
current_sink.filter = self.filter_ | ||
update_mask.append("filter") | ||
|
||
if self.description is not None: | ||
current_sink.description = self.description | ||
update_mask.append("description") | ||
|
||
if self.disabled is not None: | ||
current_sink.disabled = self.disabled | ||
update_mask.append("disabled") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here the same comment as for Create operator: we don't usually pass all the parameters in the operator that we want to update. Instead we use update_mask and the body of the NEW object that will be used to update existing one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so here i can ask user to provide params sink_config
and update_mask
and pass it to hook, instead of creating mask on my own?
self.log.info("Updating fields: %s", ", ".join(update_mask)) | ||
|
||
response = client.update_sink( | ||
request={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same here about request, can you move it to hook?
] | ||
|
||
with DAG( | ||
dag_id="example_google_cloud_logging_sink", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we removed "example_" from dags names. can you please change it to only "google_cloud_logging_sink"
8455f95
to
83e661b
Compare
System Test:-
|
83e661b
to
2e67e19
Compare
@VladaZakharova I have made requested change. Could you please re-review? Added:- |
b88400b
to
e8e6689
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, small comments left 😺
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) | ||
|
||
try: | ||
sink_to_delete = hook.get_sink(sink_name=self.sink_name, project_id=self.project_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not sure we need this call here to retrieve it, maybe if we call delete() by passing the name only, it's okay to just call delete() alone? Also i hope delete() can throw NotFound if the resource doesn't exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that but then I will not able to return log sink config
delete api don't response back anything
https://cloud.google.com/logging/docs/reference/v2/rest/v2/projects.sinks/delete
I hope that is OK?
hook = CloudLoggingHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) | ||
|
||
try: | ||
current_sink = hook.get_sink(sink_name=self.sink_name, project_id=self.project_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice way here to show the configuration before updating, good job!
@@ -0,0 +1,121 @@ | |||
# Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can have only one system test, no need to add 2 almost identical. All the information can be just listed in docs, and that will be enough :)
You can also provide the ``sink_config`` as a native ``google.cloud.logging_v2.types.LogSink`` Protobuf object, | ||
and the ``update_mask`` as a ``google.protobuf.field_mask_pb2.FieldMask``. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be enough, tbh, we don't need separate system tests to show that. Maybe you can add to one of the operator in the system tests input in protobuf type
@VladaZakharova made requested change. Please check |
Issues/feature request link: #51929
This pull request introduces new operator in google provider.
With help of new operator airflow user now can create google cloud log sinks via airflow.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.