-
Notifications
You must be signed in to change notification settings - Fork 15.3k
Enable Serde for Pydantic BaseModel and Subclasses #51059
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable Serde for Pydantic BaseModel and Subclasses #51059
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good start, @sjyangkevin!
Direction looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! I like it. My comments are mostly cosmetic.
And I am happy that other people are starting to take an interest in the code here :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bigger change required :-)
a8d4b89
to
bdedaec
Compare
I'm replying from a phone so expect some mistakes ;-). I like the direction where this is going! However, I do see some challenges remaining. The biggest challenge I would like to get rid of the "import_string" entirely. When we caught the earlier security issue I was actually thinking about add a ruff rule that prevents the addition. It's a security issue bound to happen. In your current implementation there is no guard against loading a malicious class except for the one in serde itself. So if I'm able to trick somehow the deserialization differently it still goes through. That shouldn't be the case. In addition the module is now loaded twice, probably without caching due to custom loading. So in this case I see two options
1 - is simpler. 2. Is IMHO more future proof Furthermore, I prefer to fix issues in one PR especially here in this context. So please do not move the code as you did here based on the comment / doc. It is unrelated and might have subtle issues. I'd rather have that separate, because it makes sense to align the code with the comment (or vice versa!). Just not here, right now. |
Thanks a lot for the thoughtful feedback. You're absolutely right about the risk of using Regarding the serializer call order in As a side note, just to confirm, my understanding is that the use of |
@bolkedebruin , I feel like this can be the way to go. I think it’s good idea to keep Pydantic modularized as well as the other serializers. I wasn’t able to find other alternatives that can be better than this solution. In serde.py, we have the |
Okay! I like the more modularized approach. Let's go that way! We might need to think of a mechanism that allows serializers to register "allowed" classed, but that's probably out of scope for now (let's not include it now). |
I like the direction where this is going too. 2) is the way to go with a more modularised way as mentioned above. No objections.. @sjyangkevin shout to us when you need reviews :D |
Hi, just a heads up, sorry wasn't able to make progress these few days, was very busy. I will try to push a changes by early next week. Thank you for your patient, and appreciate your time in review. Feel free to share with me any feedback, will take that into next update. |
@sjyangkevin take your time. There is no urgency on this :) We understand that everyone works on open source contributions during their free time, so no pressure at all! |
ef143dd
to
262d0bf
Compare
Hi @amoghrajesh , @bolkedebruin , I would like to follow up with some updates
It has passed all the unit tests, and I've updated my test DAG code as shown below. Arbitrary pydantic model must be added to
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are slowly getting there. I do prefer a refactor, so that classname isn't there anymore. Otherwise we just have redundant code and tech debt.
Thanks for the feedback! I was a little hesitated since it will be a huge refactor. I totally agree with your point, and will gradually refactor those. I would start with the |
Also - if we are thinking about refactoring stuff. I think (and I know @bolkedebruin had other opinion on that) - there was a discussion on whether it's good that we are depending on other dependencies (say pandas, deltalake, iceberg, kubernetes) which are part of the "core" airflow - but also we have "providers" for many of those that provide operators / hooks and other "core extensions" related to the respective "external entity". In my view, serializers for kubernetes, should come from kubernetes provider. Deltalake -> should come from databricks (or deltalake provider if we decide to have one), iceberg should come from iceberg provider. Again -> I know @bolkedebruin had different view on that, but my goal is to have core as small as possible, and add anything to it as "extensions" - for which we already have "providers" as a mechanism to do so. For me the Pydantic thing comes as very similar thing - it should be "extension" that should be IMHO implemented outside of core. So maybe it's the right time to do this kind of refactoring -> Implement "discovery" mechanism in providers manager to discover which serializers are installed (similarly as all other extensions) - and similarly speciic "pydantic" model could be provided as "extension" - by a provider or manually. I'd love to hear thought about it. |
Hi @potiuk , very appreciate the insights and I would like to share some thoughts. Feel free to correct me if I am wrong on anything below. We had a discussion in #50867 , and the issue with serializing Pydantic model raised in Cohere provider. Considering pydantic class may potentially be used by other providers, we think that it can be good to have it implemented in the core module such that it can be generic and reusable. In the current serialization module, I feel pandas, numpy, datetime, are similar to this case, which are common objects maybe used by multiple providers, or by tasks to pass in XComs. This approach may help avoid implementing similar things in different providers. serialization come from providers can also provide multiple benefits. 1.) we do not need a core release when updates are needed for serialization/deserialization for data created from a specific providers (iceberg should from iceberg provider, etc.) 2.) core can be minimal to just discover and register serde as extensions I am also very interested in looking into the option of how we can move it out of core and let provider managers to reuse common objects and register those as needed. Also, how we could keep it DRY, resolve security concerns, while being able to extend it easily. |
In my view, serializers for kubernetes, should come from kubernetes provider. Deltalake -> should come from databricks (or deltalake provider if we decide to have one), iceberg should come from iceberg provider. Good pointers, i absolutely agree that the kubernetes, pandas, deltalake and iceberg should not belong in core and should be safely moved to providers, i would love to hear what @bolkedebruin thinks on this. Is it the versioning that is stopping us from doing that?
However, i think pydantic is more of a core thing, it not essentially belongs to a provider, it can be consumed and used in core without additional dependencies. So there's no stopping anybody from returning a pydantic dataclass object as xcom. |
262d0bf
to
53dadf7
Compare
Hi @bolkedebruin , I've pushed the refactor for all the interfaces to use
It would be great if I could get your guidance on how to better implement the following. Thank you for your time and patient in reviewing it.
I've read the PR link and discussion shared by @potiuk , I also think that is a good way to go, and I am interested in contributing to that part as the next step. Please feel free to let me know if anything needs to be changed, I would really appreciate any feedback and eager to make it better. I am having some issues running pre-commit locally (it's extremely slow). If the push didn't pass the checks, will make sure all checks pass before next push. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are getting there. Some small nits.
I'm fine with having type
as type.
53dadf7
to
50409c5
Compare
Sorry, there are still some issues with static checks, when I run locally, it seems to be fixed, i try to sort it out and push again later. |
resolve conflicts in test case
50409c5
to
4482d54
Compare
I would like to attach my test DAG code which check most of the serializers and deserializers, except for iceberg, and deltalake. Hope this can be helpful for review. Thanks. from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator
from pendulum import datetime
COHERE_CONN_ID = "cohere_default"
@dag(
start_date=datetime(2025, 5, 23),
schedule=None,
catchup=False,
)
def pydantic_serde():
@task
def get_pandas():
import pandas as pd
import numpy as np
return pd.DataFrame(np.random.randn(3, 2), columns=list('AB'))
@task
def print_pandas(df):
print("Pandas DataFrame")
print(df)
@task
def get_bignum():
import decimal
return decimal.Decimal(1234567891011)
@task
def print_bignum(n):
print("bignum:", n)
@task
def get_list():
return [1, 2, 3, 4]
@task
def print_list(l):
print(l)
@task
def get_set():
return set([1, 2, 3, 4])
@task
def print_set(s):
print(s)
@task
def get_tuple():
return (1, 2, 3, 4)
@task
def print_tuple(t):
print(t)
@task
def get_frozenset():
return frozenset([1,2,3,4])
@task
def print_frozenset(fs):
print(fs)
@task
def get_numpy():
import numpy as np
n = np.random.rand(3,2)[0][0]
print(type(n))
return n
@task
def get_datetime():
import datetime
return datetime.datetime.now()
@task
def print_datetime(dt):
print(dt)
@task
def get_timezone():
from zoneinfo import ZoneInfo
from datetime import datetime
return datetime(2020, 10, 31, 12, tzinfo=ZoneInfo("America/Toronto"))
@task
def get_pendulum_tz():
import pendulum
return pendulum.timezone("Europe/Paris")
@task
def print_pendulum_tz(tz):
print(tz)
@task
def print_timezone(tz):
print(tz)
@task
def get_pendulum_datetime():
import pendulum
return pendulum.now()
@task
def print_pendulum_datetime(dt):
print(dt)
@task
def print_numpy(n):
print("NumPy Array")
print(n)
@task
def get_embeddings():
# this uses the older provider version when embedding is returned as a pydantic model
import pydantic
cohere_hook = CohereHook()
embeddings = cohere_hook.create_embeddings(["gruyere"])
print("type of embeddings:", type(embeddings))
print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel))
return embeddings
@task
def print_embeddings(embeddings):
print("Pydantic Model")
print(embeddings)
print_embeddings(get_embeddings())
print_numpy(get_numpy())
print_pandas(get_pandas())
print_list(get_list())
print_set(get_set())
print_tuple(get_tuple())
print_bignum(get_bignum())
print_datetime(get_datetime())
print_pendulum_datetime(get_pendulum_datetime())
print_frozenset(get_frozenset())
print_timezone(get_timezone())
print_pendulum_tz(get_pendulum_tz())
pydantic_serde() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! I think we are there
Nice! Thank you! |
@@ -522,15 +568,15 @@ def test_timezone_serialize_no_name(self): | |||
def test_timezone_deserialize_zoneinfo(self): | |||
from airflow.serialization.serializers.timezone import deserialize | |||
|
|||
zi = deserialize("backports.zoneinfo.ZoneInfo", 1, "Asia/Taipei") | |||
zi = deserialize(ZoneInfo, 1, "Asia/Taipei") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this change broke canary tests in https://github.com/apache/airflow/actions/runs/15910813471/job/44877861492
I am not sure whether (1) I should revert this londer and mdeium complex PR or if the code is just broken and needs to consinder "timezone.Zoneinfo" or if the pytest needs to be adjusted.
If you want to re-produce, it is only happeing in Python 3.9 with downgrade of pendulum: breeze --python 3.9 testing core-tests --test-type Serialization --downgrade-pendulum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for caching it. I think it might be related more to the test case, I will have a deeper look into it and share updates here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverting for now -> this is safer for all the incoming prs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#52312 -> reverting for now - > I think @sjyangkevin -> re-create the PR after this revert is merged, and we will add full tests needed
to it and you will be able to reproduce it and fix it in the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the feedback and sorry that I wasn’t aware of this during local testing.
I would like to ensure I fully understand the process I should go to fix the issue. I think first I should wait for the merge of the revert PR. Then, I can use the breeze command mentioned by @jscheffl to reproduce the issue locally and to fix the issue. After that, I could re-create the PR and that PR will be checked with full tests, and I can continue the fixes according to the CI outcomes.
Please correct me if I understand any of the steps wrong. I am also eager to learn if there is anything I can do to prevent this happen. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bolkedebruin you can add the "full tests needed" label to the PR and reopen it, it should run those tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Also for the future: - we have a range of labels you can use (as maintainer) to modify PR behaviour - https://github.com/apache/airflow/blob/main/dev/breeze/doc/ci/06_debugging.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - when you re-open your PR @bolkedebruin @sjyangkevin and set the label and when it fails - ping me (I might see it regardless) - I want to take a look if we can improve selective checks to run all the "needed" tests automatically. I have a feeling that currently "serde" dependent tests are not as "isolated" as they shoudl be - i.e. unrelated parts of the code - implicitly depend on it. Eventually it should be either isolated or we should have a way to infer dependency on it. This is also part of the work of Task Isolation (cc: @kaxil @ashb @amoghrajesh -> because if we depend on serde in other parts of the code, it should be explicit - for example if we extract serde code to common distribution, there would be an explicit dependency on it from every other distribution that needs it and we could infer that we should run tests when serde changes.
For now we - unfortunately - need to hard-code it likely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the goals for me when we talk about splitting stuff is to make all the dependencies explicit rather than implicit and unexpected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-created the PR #52360 , but found a conflict with main branch. I will take some time to resolve this conflict since this change looks like break the test.
)" This reverts commit a041a2a.
Motivation
The original purpose of this PR is to resolve #50867. In Cohere provider, since 1.4.2, the return type of
CohereEmbeddingOperator
is updated fromlist[list[float]]
toEmbedByTypeResponseEmbeddings
.EmbedByTypeResponseEmbeddings
is a class inherit from thepydantic.BaseModel
but there are multiple intermediate classes in between. To enable embeddings being passed through XComs, we need to have the capability to serialize/deserializeEmbedByTypeResponseEmbeddings
. Since the base class ispydantic.BaseModel
, we consider to implement this in core serde so future use cases can also benefit from it.Close #50867.
High-level Design Solution
First, I think the serializer should be able to identify a Pydantic model, e.g., a class that inherits from the
pydantic.BaseModel
, or a class that is a subclass of another class that inherits frompydantic.BaseModel
. A Pydantic model can be identified simply usingisinstance(obj, pydantic.BaseModel)
. However,isinstance
can be slow since it needs to traverse the inheritance tree. So, an alternative solution is to use attributes that are specific to a Pydantic model to identify it. In this case, the attributes being used are__pydantic_fields__
and__pydantic_validator__
. Then, for any pydantic model, the serialization process can be implemented by calling themodel_dump()
method on that instance.However, to restore the Pydantic model, the deserializer needs to know the actual class rather than the generic
pydantic.BaseModel
. Therefore, there is a need to keep track the actual pydantic class, e.g.,cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings
. When re-creating the model, this class will be used and themodel_validate()
method will be invoked.The current implementation using dynamic import to handle it. However, this implementation face some limitations where the module cannot be resolved. For example, when the Pydantic model is defined inside a Python function, or is defined in a task-decorated Python funciton.
Test Result
The Serde can successfully serialize

EmbedByTypeResponseEmbeddings
.The Serde can successfully deserialize

EmbedByTypeResponseEmbeddings
.Test DAG code
Limitations of the implementation
During testing, I found that if a Pydantic model is not defined in the global scope, e.g., define within a (test) function, or an Airflow task (i.e., a task-decorated Python function), the Serde will not work due to the usage of dynamic import.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.