Skip to content

Added kafka consumer logic #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Conversation

Poovaya
Copy link

@Poovaya Poovaya commented Apr 17, 2025

No description provided.

Copy link
Member

@tylerharter tylerharter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good first draft!

var lambdaMgr *lambda.LambdaMgr
var once sync.Once

// LambdaManager is now a singleton, one per worker. This is because lambda manager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can't create multiple instances in the same process right now because they would collide in terms of directories created. However, is it better to make it a singleton, or to eventually modify the manager so that we can have multiple ones? I think this is a longer discussion, and it shouldn't be part of this PR because it will slow things down.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, what would you recommend? If the worker is supposed to support both Kafka and HTTP/Cron triggers at the same time, this change or something similar would be needed for even bare minimum functionality

@@ -149,3 +149,20 @@ Invoke your lambda with `curl` (the result should be the same as the POST body):
```
curl -X POST localhost:5000/run/echo -d '{"hello": "world"}'
```

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for documenting it! Kafka isn't really a quick-start kind of topic though -- this is better as its own markdown file.

"github.com/twmb/franz-go/pkg/kgo"
)

type KafkaServer struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think things will get messy if we eventually have a bunch of different Server types wrapping the same lambdaMgr. Better to have a top-level struct (maybe this? https://github.com/open-lambda/open-lambda/blob/main/src/worker/event/lambdaServer.go#L16) that has the lambdaMgr and references to other types, like a KafkaServer and HTTPServer for now.

}

var data map[string]string
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not do a string => string map for the body type. We might eventually want non-string types. Let's have a JSON-serializable struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the boss side, Mandy is going to be parsing lambda config, from here: https://github.com/open-lambda/open-lambda/blob/main/src/common/lambdaConfig.go. So it probably makes sense if the information the boss passes to the worker uses the same structs that the boss reads from the lambda config.

panic(err)
}
w := httptest.NewRecorder()
kafkaServer.lambdaMgr.Get(functionName).Invoke(w, r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To minimize overhead, can we send the lambda function a whole batch at once?

return req, nil
}

func KafkaInit(w http.ResponseWriter, r *http.Request) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boss might ask a worker to start or stop consuming. So maybe we can make the API like that? I think it's fine to have a TODO comment about what you will do for stopping in this PR (and leave the implementation to another one).

if errs := fetches.Errors(); len(errs) > 0 {
// All errors are retried internally when fetching, but non-retriable errors are
// returned from polls so that users can notice and take action.
panic(fmt.Sprint(errs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to panic. If one user's function has a typo in the topic name, perhaps they would get an error, and then the whole server would go down, with the functions belonging to all other users too.

I do agree it's hard to know how to surface this. Perhaps we should make a call to a user's lambda function with error info so they can do what they want about it?

return req, nil
}

func KafkaInit(w http.ResponseWriter, r *http.Request) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refactor this function into two? One for the HTTP side of things, and one for actually starting the goroutine and consuming messages.

Also, it probably makes sense as methods on KafkaServer, no functions that use a global.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants