Skip to content

Enable Serde for Pydantic BaseModel and Subclasses #51059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 26, 2025

Conversation

sjyangkevin
Copy link
Contributor

@sjyangkevin sjyangkevin commented May 26, 2025

Motivation

The original purpose of this PR is to resolve #50867. In Cohere provider, since 1.4.2, the return type of CohereEmbeddingOperator is updated from list[list[float]] to EmbedByTypeResponseEmbeddings. EmbedByTypeResponseEmbeddings is a class inherit from the pydantic.BaseModel but there are multiple intermediate classes in between. To enable embeddings being passed through XComs, we need to have the capability to serialize/deserialize EmbedByTypeResponseEmbeddings. Since the base class is pydantic.BaseModel, we consider to implement this in core serde so future use cases can also benefit from it.

Close #50867.

High-level Design Solution

First, I think the serializer should be able to identify a Pydantic model, e.g., a class that inherits from the pydantic.BaseModel, or a class that is a subclass of another class that inherits from pydantic.BaseModel. A Pydantic model can be identified simply using isinstance(obj, pydantic.BaseModel). However, isinstance can be slow since it needs to traverse the inheritance tree. So, an alternative solution is to use attributes that are specific to a Pydantic model to identify it. In this case, the attributes being used are __pydantic_fields__ and __pydantic_validator__. Then, for any pydantic model, the serialization process can be implemented by calling the model_dump() method on that instance.

However, to restore the Pydantic model, the deserializer needs to know the actual class rather than the generic pydantic.BaseModel. Therefore, there is a need to keep track the actual pydantic class, e.g., cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings. When re-creating the model, this class will be used and the model_validate() method will be invoked.

The current implementation using dynamic import to handle it. However, this implementation face some limitations where the module cannot be resolved. For example, when the Pydantic model is defined inside a Python function, or is defined in a task-decorated Python funciton.

def _resolve_pydantic_class(qn: str):
    module_name, class_name = qn.rsplit(".", 1)
    module = import_module(module_name)
    return getattr(module, class_name)

Test Result

The Serde can successfully serialize EmbedByTypeResponseEmbeddings.
Screenshot from 2025-05-25 23-08-46

The Serde can successfully deserialize EmbedByTypeResponseEmbeddings.
Screenshot from 2025-05-25 23-09-24

Test DAG code

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator

from pendulum import datetime

COHERE_CONN_ID = "cohere_default"

@dag(
    start_date=datetime(2025, 5, 23),
    schedule=None,
    catchup=False,
)
def pydantic_serde():
    @task
    def push_pydantic():
        from pydantic import BaseModel, Field
        from typing import Optional

        class BarModel(BaseModel):
            whatever: int
        # this Pydantic model is created within the function, in deserialization, the module will be resolved as
        # unusual_prefix_afec8360888f39af6ea3ccaccf36a7f590a25638_pydantic_serde.pydantic_serde
        # This CANNOT be handled by the deserializer
        class FooBarModel(BaseModel):
            banana: Optional[float] = 1.1
            foo: str = Field(serialization_alias='foo_alias')
            bar: BarModel

        m = FooBarModel(banana=3.14, foo='hello', bar={'whatever': 123})
        return m
    
    @task
    def get_pydantic(m):
        # it cannot handle pydantic model created within the upstream task.
        print(m.model_dump())

    @task
    def get_embeddings():
        import pydantic
        
        cohere_hook = CohereHook()
        embeddings = cohere_hook.create_embeddings(["gruyere"])

        print("type of embeddings:", type(embeddings))
        print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel))

        return embeddings

    @task
    def print_embeddings(embeddings):
        embeddings = [x[0] for x in embeddings]
        print(embeddings)

    print_embeddings(get_embeddings())
    get_pydantic(push_pydantic())

pydantic_serde()

Limitations of the implementation

During testing, I found that if a Pydantic model is not defined in the global scope, e.g., define within a (test) function, or an Airflow task (i.e., a task-decorated Python function), the Serde will not work due to the usage of dynamic import.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh self-requested a review May 26, 2025 05:47
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good start, @sjyangkevin!

Direction looks good.

Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job! I like it. My comments are mostly cosmetic.

And I am happy that other people are starting to take an interest in the code here :-)

Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small catch

Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bigger change required :-)

@sjyangkevin sjyangkevin force-pushed the issues/50867/cohere-serde branch from a8d4b89 to bdedaec Compare May 29, 2025 04:08
@sjyangkevin
Copy link
Contributor Author

sjyangkevin commented May 29, 2025

Hi all, thank you very much again for all the constructive feedback. I've push some changes based on that.

Design Principle

  1. We want to prioritize on using the serialize() and deserialize() method if those are defined in the object (custom) over the registered default serializer/deserializer
  2. For Pydantic model, and any of its subclasses, we want to be able to identify it. In the serializer/deserializer, we can register the generic one pydantic.main.BaseModel, so it can benefit all Pydantic models coming in the future.
  3. During deserialization, we need to have the capability to reconstruct the Pydantic object. In this case, we need the actual Pydantic class (e.g., cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings). This will be encoded during serialization as classname, and being propagated to the deserializer during deserialization. So, we don't need to use magic keyword __class__ to handle it, eliminating security concerns. The serde.py need to be modified accordingly for this change. Since pydantic.main.BaseModel is registered, and the actual classname (e.g., cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings) is used to search for the default deserializer. It will be a miss. Therefore, we need a fallback mechanism for Pydantic model, to check if it is a subclass of the pydantic.main.BaseModel. If so, using this hard coded key (pydantic.main.BaseModel) to invoke the registered deserializer, and propagate the serialized object into it.

I still want to keep the whitelisting mechanism, to make it even more safer. For arbitrary subclasses that do not directly inherit from Pydantic BaseModel. You will see the following error if the class is not added to the allowed_deserialization_classes.

ImportError: unusual_prefix_afec8360888f39af6ea3ccaccf36a7f590a25638_pydantic_serde.pydantic_serde.<locals>.push_pydantic.<locals>.FooBarModel was not found in allow list for deserialization imports. To allow it, add it to allowed_deserialization_classes in the configuration

For the Cohere Operator, I can have it added to allowed_deserialization_classes by creating the environment variable.

AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES=cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings

Then, I can successfully serialize/deserialize
Screenshot from 2025-05-29 00-43-23

Please let me know if you have further feedback. I am happy to discuss and make changes accordingly.

Change Summary

In serde.py

  1. I renamed _is_pydantic_basemodel() to _is_pydantic_model and added docstring to describe why not use isinstance.
  2. I modified the order in which serializers/deserializers are used. In the docstring, it states that the serde provided by the object will be the prioritized. However, in the implementation, it checks for the registered serde first. I alter the order for both serialize and deserialize, so user-provided serde will be used first if defined, then the registered one, then dataclass/attr
  3. I moved _is_pydantic_model and _is_namedtuple to the top of the file
  4. In condition check for Pydantic model, I updated qn to "pydantic.main.BaseModel" instead of qualname(BaseModel). I change classname to qualname(o), so no magic keyword (i.e., __class__) will be used in either pydantic's serialize/deserialize
  5. In deserialize, I added a fallback check, since in serialize, the actual classname is encoded. So, it will be a miss in the check for registered pydantic deserializer since the classname being registered is the generic one (i.e., pydantic.main.BaseModel). So, it will check for any pydantic model subclass and direct that to the deserializer.

In pydantic.py

  1. I removed the use of __class__ key to eliminate security concerns.
  2. I use import_string instead of reinventing the wheel.
  3. serialization is simply model_dump and deserialization is simply model_validate.

@sjyangkevin
Copy link
Contributor Author

Attach unit test result. pytest --log-cli-level=DEBUG airflow-core/tests/unit/serialization/
Screenshot from 2025-05-29 01-06-56

@bolkedebruin
Copy link
Contributor

I'm replying from a phone so expect some mistakes ;-).

I like the direction where this is going! However, I do see some challenges remaining. The biggest challenge I would like to get rid of the "import_string" entirely. When we caught the earlier security issue I was actually thinking about add a ruff rule that prevents the addition. It's a security issue bound to happen. In your current implementation there is no guard against loading a malicious class except for the one in serde itself. So if I'm able to trick somehow the deserialization differently it still goes through. That shouldn't be the case. In addition the module is now loaded twice, probably without caching due to custom loading.

So in this case I see two options

  1. move the logic for pydantic to serde.py and remove the serializer. Drawback is that a future change will always require a core release

  2. allow passing the loaded class to the deserializer. This requires refactoring of the other deserializers to accept a class.

1 - is simpler. 2. Is IMHO more future proof

Furthermore, I prefer to fix issues in one PR especially here in this context. So please do not move the code as you did here based on the comment / doc. It is unrelated and might have subtle issues. I'd rather have that separate, because it makes sense to align the code with the comment (or vice versa!). Just not here, right now.

@sjyangkevin
Copy link
Contributor Author

Thanks a lot for the thoughtful feedback.

You're absolutely right about the risk of using import_string() in the pydantic serializer, or potentially distribute this import logic across multiple places. I will remove the import logic from the serializer module and have a deeper look into your suggestion and see how we can get rid of it while making the code clean and safe.

Regarding the serializer call order in serde.py. I now see that this change, while conceptually I feel correct, shouldn't be mixed into this PR. I will revert that part and we can discuss more and potentially open a separate PR, where I can more clearly explain the motivation. Currently, I think registered serializers take precedence over instance-provided serialize() or deserialize() methods, which unintentionally prevents custom serialization logic from being respected in some cases. Feel free to correct me if I am wrong.

As a side note, just to confirm, my understanding is that the use of import_string inside serde.py itself is acceptable since it's gated behind allowed_deserialization_clases, correct? and potentially we can add ruff rule to guard it further.

@sjyangkevin
Copy link
Contributor Author

allow passing the loaded class to the deserializer. This requires refactoring of the other deserializers to accept a class.

@bolkedebruin , I feel like this can be the way to go. I think it’s good idea to keep Pydantic modularized as well as the other serializers. I wasn’t able to find other alternatives that can be better than this solution. In serde.py, we have the import_string to resolve the actual class. Then this can ideally be passed into the serializers and use directly. We use the serde.py as a gate to validate and load the class, and serializers only use it. To resolve for the Pydantic issue (i.e. the Cohere case, user can add it to the allowed_deserialized_classes). So, in the serializers, we can totally get rid of the import_string. Let me know if you think it’s a feasible way. I can make the changes accordingly, and I would appreciate if you can provide me some guidance on how to properly test it after making the changes. Thanks!

@bolkedebruin
Copy link
Contributor

Okay! I like the more modularized approach. Let's go that way! We might need to think of a mechanism that allows serializers to register "allowed" classed, but that's probably out of scope for now (let's not include it now).

@amoghrajesh
Copy link
Contributor

I like the direction where this is going too. 2) is the way to go with a more modularised way as mentioned above. No objections..

@sjyangkevin shout to us when you need reviews :D

@sjyangkevin
Copy link
Contributor Author

Hi, just a heads up, sorry wasn't able to make progress these few days, was very busy. I will try to push a changes by early next week. Thank you for your patient, and appreciate your time in review. Feel free to share with me any feedback, will take that into next update.

@amoghrajesh
Copy link
Contributor

@sjyangkevin take your time. There is no urgency on this :)

We understand that everyone works on open source contributions during their free time, so no pressure at all!

@sjyangkevin sjyangkevin force-pushed the issues/50867/cohere-serde branch from ef143dd to 262d0bf Compare June 8, 2025 05:38
@sjyangkevin
Copy link
Contributor Author

Hi @amoghrajesh , @bolkedebruin , I would like to follow up with some updates

  1. I reverted the changes that I made to serde.py to alter the order of serializers/deserializers
  2. I updated the function signature of deserialize in all serializer modules, by adding an optional parameter cls: Any. I found that existing serializers mostly use classname. If I replaced the classname with cls, there will be some refactors to the code. Therefore, I would like to reduce the chance of introducing subtle issues. The existing serializers can function as it is, the pydantic one can then accept cls from serde.py.
  3. I modified the pydantic's serialize method as well, instead of returning pydantic.main.BaseModel as the serialized_classname, I let it return qualname(o). In this way, any arbitrary pydantic model can be scanned during deserialization. It means cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings must be added to the allowed_deserialization_clases such that it can be deserialized.
  4. import_string is removed from the pydantic serializer.

It has passed all the unit tests, and I've updated my test DAG code as shown below.

Arbitrary pydantic model must be added to allowed_deserialization_clases

Before adding to allowed_deserialization_clases
Screenshot from 2025-06-08 01-16-41
After adding to allowed_deserialization_clases
Screenshot from 2025-06-08 01-16-53

Test DAG code

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator

from pendulum import datetime

COHERE_CONN_ID = "cohere_default"

@dag(
    start_date=datetime(2025, 5, 23),
    schedule=None,
    catchup=False,
)
def pydantic_serde():

    @task
    def get_pandas():
        import pandas as pd
        import numpy as np

        return pd.DataFrame(np.random.randn(3, 2), columns=list('AB'))
    
    @task
    def print_pandas(df):
        print(df)

    @task
    def get_numpy():
        import numpy as np

        n = np.random.rand(3,2)[0][0]
        print(type(n))
    
    @task
    def print_numpy(n):
        print(n)

    @task
    def get_embeddings():
        import pydantic
        
        cohere_hook = CohereHook()
        embeddings = cohere_hook.create_embeddings(["gruyere"])

        print("type of embeddings:", type(embeddings))
        print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel))

        return embeddings

    @task
    def print_embeddings(embeddings):
        print(embeddings)

    print_embeddings(get_embeddings())
    print_numpy(get_numpy())
    print_pandas(get_pandas())

pydantic_serde()

Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are slowly getting there. I do prefer a refactor, so that classname isn't there anymore. Otherwise we just have redundant code and tech debt.

@sjyangkevin
Copy link
Contributor Author

We are slowly getting there. I do prefer a refactor, so that classname isn't there anymore. Otherwise we just have redundant code and tech debt.

Thanks for the feedback! I was a little hesitated since it will be a huge refactor. I totally agree with your point, and will gradually refactor those. I would start with the serde.py, and once we are good on the overall structure, will update submodule to follow the general structure.

@potiuk
Copy link
Member

potiuk commented Jun 16, 2025

We are slowly getting there. I do prefer a refactor, so that classname isn't there anymore. Otherwise we just have redundant code and tech debt.

Thanks for the feedback! I was a little hesitated since it will be a huge refactor. I totally agree with your point, and will gradually refactor those. I would start with the serde.py, and once we are good on the overall structure, will update submodule to follow the general structure.

Also - if we are thinking about refactoring stuff. I think (and I know @bolkedebruin had other opinion on that) - there was a discussion on whether it's good that we are depending on other dependencies (say pandas, deltalake, iceberg, kubernetes) which are part of the "core" airflow - but also we have "providers" for many of those that provide operators / hooks and other "core extensions" related to the respective "external entity".

In my view, serializers for kubernetes, should come from kubernetes provider. Deltalake -> should come from databricks (or deltalake provider if we decide to have one), iceberg should come from iceberg provider.

Again -> I know @bolkedebruin had different view on that, but my goal is to have core as small as possible, and add anything to it as "extensions" - for which we already have "providers" as a mechanism to do so.

For me the Pydantic thing comes as very similar thing - it should be "extension" that should be IMHO implemented outside of core. So maybe it's the right time to do this kind of refactoring -> Implement "discovery" mechanism in providers manager to discover which serializers are installed (similarly as all other extensions) - and similarly speciic "pydantic" model could be provided as "extension" - by a provider or manually.

I'd love to hear thought about it.

@sjyangkevin
Copy link
Contributor Author

sjyangkevin commented Jun 16, 2025

Hi @potiuk , very appreciate the insights and I would like to share some thoughts. Feel free to correct me if I am wrong on anything below.

We had a discussion in #50867 , and the issue with serializing Pydantic model raised in Cohere provider. Considering pydantic class may potentially be used by other providers, we think that it can be good to have it implemented in the core module such that it can be generic and reusable. In the current serialization module, I feel pandas, numpy, datetime, are similar to this case, which are common objects maybe used by multiple providers, or by tasks to pass in XComs. This approach may help avoid implementing similar things in different providers.

serialization come from providers can also provide multiple benefits. 1.) we do not need a core release when updates are needed for serialization/deserialization for data created from a specific providers (iceberg should from iceberg provider, etc.) 2.) core can be minimal to just discover and register serde as extensions

I am also very interested in looking into the option of how we can move it out of core and let provider managers to reuse common objects and register those as needed. Also, how we could keep it DRY, resolve security concerns, while being able to extend it easily.

@amoghrajesh
Copy link
Contributor

there was a discussion on whether it's good that we are depending on other dependencies (say pandas, deltalake, iceberg, kubernetes) which are part of the "core" airflow - but also we have "providers" for many of those that provide operators / hooks and other "core extensions" related to the respective "external entity".

In my view, serializers for kubernetes, should come from kubernetes provider. Deltalake -> should come from databricks (or deltalake provider if we decide to have one), iceberg should come from iceberg provider.

Good pointers, i absolutely agree that the kubernetes, pandas, deltalake and iceberg should not belong in core and should be safely moved to providers, i would love to hear what @bolkedebruin thinks on this. Is it the versioning that is stopping us from doing that?

For me the Pydantic thing comes as very similar thing - it should be "extension" that should be IMHO implemented outside of core. So maybe it's the right time to do this kind of refactoring -> Implement "discovery" mechanism in providers manager to discover which serializers are installed (similarly as all other extensions) - and similarly speciic "pydantic" model could be provided as "extension" - by a provider or manually.

However, i think pydantic is more of a core thing, it not essentially belongs to a provider, it can be consumed and used in core without additional dependencies. So there's no stopping anybody from returning a pydantic dataclass object as xcom.

@Lee-W Lee-W self-requested a review June 19, 2025 03:43
@sjyangkevin sjyangkevin force-pushed the issues/50867/cohere-serde branch from 262d0bf to 53dadf7 Compare June 19, 2025 04:32
@sjyangkevin
Copy link
Contributor Author

Hi @bolkedebruin , I've pushed the refactor for all the interfaces to use cls instead of classname. Below are change highlights.

  1. Update the interface of deserialize to deserialize(cls: type, version: int, data: object)
  2. Group _is_namedtuple and _is_pydantic_model with other private methods
  3. Use a constant PYDANTIC_MODEL_QUALNAME for "pydantic.main.BaseModel"
  4. Remove import private method from serde.py in serializers
  5. Add more unit test cases (bignum, builtin, pydantic)

It would be great if I could get your guidance on how to better implement the following. Thank you for your time and patient in reviewing it.

  1. In deserialize, I use class: type as the type hint, do you have any suggestion on this?

  2. Use attributes to identify Pydantic model. Now, I have another private method in the pydantic serializer, which is duplicate code of the one in serde.py. Is there anywhere we can move it to a common place and safely import by both? I am thinking about airflow.utils. Or, we should change the way how it's checked.

  3. I wasn't sure how to properly test iceberg, and deltalake, and there are a few pendulum (e.g., v2) tests are skipped. Is there any service or things I can setup locally to run those tests?

  4. The test case test_timezone_deserialize_zoneinfo tries to deserialize "backports.zoneinfo.ZoneInfo". However, this module seems not in the breeze environment, and cannot be passed as backports.zoneinfo.ZoneInfo.

I've read the PR link and discussion shared by @potiuk , I also think that is a good way to go, and I am interested in contributing to that part as the next step.

Please feel free to let me know if anything needs to be changed, I would really appreciate any feedback and eager to make it better. I am having some issues running pre-commit locally (it's extremely slow). If the push didn't pass the checks, will make sure all checks pass before next push.

@sjyangkevin sjyangkevin requested a review from bolkedebruin June 19, 2025 04:38
Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting there. Some small nits.

I'm fine with having type as type.

@sjyangkevin sjyangkevin force-pushed the issues/50867/cohere-serde branch from 53dadf7 to 50409c5 Compare June 19, 2025 21:28
@sjyangkevin sjyangkevin requested a review from bolkedebruin June 19, 2025 21:39
@sjyangkevin
Copy link
Contributor Author

Sorry, there are still some issues with static checks, when I run locally, it seems to be fixed, i try to sort it out and push again later.

@sjyangkevin sjyangkevin force-pushed the issues/50867/cohere-serde branch from 50409c5 to 4482d54 Compare June 20, 2025 01:53
@sjyangkevin
Copy link
Contributor Author

I would like to attach my test DAG code which check most of the serializers and deserializers, except for iceberg, and deltalake. Hope this can be helpful for review. Thanks.

Screenshot from 2025-06-24 23-51-31

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator

from pendulum import datetime

COHERE_CONN_ID = "cohere_default"

@dag(
    start_date=datetime(2025, 5, 23),
    schedule=None,
    catchup=False,
)
def pydantic_serde():

    @task
    def get_pandas():
        import pandas as pd
        import numpy as np

        return pd.DataFrame(np.random.randn(3, 2), columns=list('AB'))
    
    @task
    def print_pandas(df):
        print("Pandas DataFrame")
        print(df)

    @task
    def get_bignum():
        import decimal
        return decimal.Decimal(1234567891011)
    
    @task
    def print_bignum(n):
        print("bignum:", n)

    @task
    def get_list():
        return [1, 2, 3, 4]
    
    @task
    def print_list(l):
        print(l)

    @task
    def get_set():
        return set([1, 2, 3, 4])
    
    @task
    def print_set(s):
        print(s)

    @task
    def get_tuple():
        return (1, 2, 3, 4)
    
    @task
    def print_tuple(t):
        print(t)

    @task
    def get_frozenset():
        return frozenset([1,2,3,4])
    
    @task
    def print_frozenset(fs):
        print(fs)

    @task
    def get_numpy():
        import numpy as np

        n = np.random.rand(3,2)[0][0]
        print(type(n))
        return n
    
    @task
    def get_datetime():
        import datetime
        return datetime.datetime.now()
    
    @task
    def print_datetime(dt):
        print(dt)

    @task
    def get_timezone():
        from zoneinfo import ZoneInfo
        from datetime import datetime

        return datetime(2020, 10, 31, 12, tzinfo=ZoneInfo("America/Toronto"))
    
    @task
    def get_pendulum_tz():
        import pendulum
        return pendulum.timezone("Europe/Paris")

    @task
    def print_pendulum_tz(tz):
        print(tz)

    @task
    def print_timezone(tz):
        print(tz)

    @task
    def get_pendulum_datetime():
        import pendulum
        return pendulum.now()
    
    @task
    def print_pendulum_datetime(dt):
        print(dt)

    @task
    def print_numpy(n):
        print("NumPy Array")
        print(n)

    @task
    def get_embeddings():
        # this uses the older provider version when embedding is returned as a pydantic model
        import pydantic
        
        cohere_hook = CohereHook()
        embeddings = cohere_hook.create_embeddings(["gruyere"])

        print("type of embeddings:", type(embeddings))
        print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel))

        return embeddings

    @task
    def print_embeddings(embeddings):
        print("Pydantic Model")
        print(embeddings)

    print_embeddings(get_embeddings())
    print_numpy(get_numpy())
    print_pandas(get_pandas())
    print_list(get_list())
    print_set(get_set())
    print_tuple(get_tuple())
    print_bignum(get_bignum())
    print_datetime(get_datetime())
    print_pendulum_datetime(get_pendulum_datetime())
    print_frozenset(get_frozenset())
    print_timezone(get_timezone())
    print_pendulum_tz(get_pendulum_tz())

pydantic_serde()

Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! I think we are there

@bolkedebruin bolkedebruin merged commit a041a2a into apache:main Jun 26, 2025
53 checks passed
@sjyangkevin
Copy link
Contributor Author

Awesome! I think we are there

Nice! Thank you!

@@ -522,15 +568,15 @@ def test_timezone_serialize_no_name(self):
def test_timezone_deserialize_zoneinfo(self):
from airflow.serialization.serializers.timezone import deserialize

zi = deserialize("backports.zoneinfo.ZoneInfo", 1, "Asia/Taipei")
zi = deserialize(ZoneInfo, 1, "Asia/Taipei")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this change broke canary tests in https://github.com/apache/airflow/actions/runs/15910813471/job/44877861492
I am not sure whether (1) I should revert this londer and mdeium complex PR or if the code is just broken and needs to consinder "timezone.Zoneinfo" or if the pytest needs to be adjusted.

If you want to re-produce, it is only happeing in Python 3.9 with downgrade of pendulum: breeze --python 3.9 testing core-tests --test-type Serialization --downgrade-pendulum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for caching it. I think it might be related more to the test case, I will have a deeper look into it and share updates here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverting for now -> this is safer for all the incoming prs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#52312 -> reverting for now - > I think @sjyangkevin -> re-create the PR after this revert is merged, and we will add full tests needed to it and you will be able to reproduce it and fix it in the PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback and sorry that I wasn’t aware of this during local testing.

I would like to ensure I fully understand the process I should go to fix the issue. I think first I should wait for the merge of the revert PR. Then, I can use the breeze command mentioned by @jscheffl to reproduce the issue locally and to fix the issue. After that, I could re-create the PR and that PR will be checked with full tests, and I can continue the fixes according to the CI outcomes.

Please correct me if I understand any of the steps wrong. I am also eager to learn if there is anything I can do to prevent this happen. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bolkedebruin you can add the "full tests needed" label to the PR and reopen it, it should run those tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Also for the future: - we have a range of labels you can use (as maintainer) to modify PR behaviour - https://github.com/apache/airflow/blob/main/dev/breeze/doc/ci/06_debugging.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - when you re-open your PR @bolkedebruin @sjyangkevin and set the label and when it fails - ping me (I might see it regardless) - I want to take a look if we can improve selective checks to run all the "needed" tests automatically. I have a feeling that currently "serde" dependent tests are not as "isolated" as they shoudl be - i.e. unrelated parts of the code - implicitly depend on it. Eventually it should be either isolated or we should have a way to infer dependency on it. This is also part of the work of Task Isolation (cc: @kaxil @ashb @amoghrajesh -> because if we depend on serde in other parts of the code, it should be explicit - for example if we extract serde code to common distribution, there would be an explicit dependency on it from every other distribution that needs it and we could infer that we should run tests when serde changes.

For now we - unfortunately - need to hard-code it likely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the goals for me when we talk about splitting stuff is to make all the dependencies explicit rather than implicit and unexpected.

Copy link
Contributor Author

@sjyangkevin sjyangkevin Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-created the PR #52360 , but found a conflict with main branch. I will take some time to resolve this conflict since this change looks like break the test.
Screenshot from 2025-06-27 13-11-40

Screenshot from 2025-06-27 13-13-22

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CohereEmbeddingOperator cannot serialize/deserialize object of type EmbedByTypeResponseEmbeddings
6 participants