Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): fix out of order issue when exactly once is enabled #9472

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
add ordering test and fix race condition with resource cleanup
  • Loading branch information
hongalex committed Feb 26, 2024
commit 74887f6412bdbc0cb6dfac52bf7244090269e749
124 changes: 106 additions & 18 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,29 +1177,32 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) {
}

received := make(chan string, numItems)
ctx, cancel := context.WithCancel(ctx)
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Errorf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Errorf("timed out after 30s waiting for item %d", i)
cancel()
}
}
cancel()
}()

for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Fatalf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Fatalf("timed out after 30s waiting for item %d", i)
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}
}
Expand Down Expand Up @@ -1445,6 +1448,91 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
}
}

func TestIntegration_OrderingWithExactlyOnce(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
var sub *Subscription
if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
EnableExactlyOnceDelivery: true,
}); err != nil {
t.Fatal(err)
}
defer sub.Delete(ctx)
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
}

topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true

orderingKey := "some-ordering-key"
numItems := 10
for i := 0; i < numItems; i++ {
r := topic.Publish(ctx, &Message{
ID: fmt.Sprintf("id-%d", i),
Data: []byte(fmt.Sprintf("item-%d", i)),
OrderingKey: orderingKey,
})
go func() {
if _, err := r.Get(ctx); err != nil {
t.Error(err)
}
}()
}

received := make(chan string, numItems)
ctx, cancel := context.WithCancel(ctx)
go func() {
for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Errorf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Errorf("timed out after 30s waiting for item %d", i)
cancel()
}
}
cancel()
}()

if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}

}

func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down