Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix watch event loss #17555

Merged
merged 1 commit into from
Mar 16, 2024
Merged

Fix watch event loss #17555

merged 1 commit into from
Mar 16, 2024

Conversation

chaochn47
Copy link
Member

@chaochn47
Copy link
Member Author

chaochn47 commented Mar 8, 2024

Filed flaky test report #17556, provided root cause and how to fix.

Is there any way to re-test this specific failed test E2E / test (linux-386)? @jmhbnz

@jmhbnz
Copy link
Member

jmhbnz commented Mar 8, 2024

/retest

(Will only retest workflows that have failed by default)

Copy link
Contributor

@fuweid fuweid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

The change is small and it's easy to backport. Thanks

tests/e2e/watch_delay_test.go Outdated Show resolved Hide resolved
@ahrtr
Copy link
Member

ahrtr commented Mar 8, 2024

Thanks for the fix.

The fix is simple and makes sense to me, but the 230+ lines of e2e test is a little over complicated to me. Are you able to create a simple unit test to simulate the case that w.ch is full instead?

select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
default:
// retry next time
}

Also when I read the source code, I found there maybe another slightly related issue. As the code showed above, when w.minRev < compactRev, etcd will send out a WatchResponse with CompactRevision, and also remove the watcher wg.delete(w), but when the watcher count > maxWatchersPerSync (512), it only removes the watcher from a temporary watcherGroup (see below). So it doesn't really remove the watcher? Could you double confirm this, e.g by writing a unit test? Thanks.

ret := newWatcherGroup()
for w := range wg.watchers {
if maxWatchers <= 0 {
break
}
maxWatchers--
ret.add(w)
}
return &ret, ret.chooseAll(curRev, compactRev)

@chaochn47
Copy link
Member Author

chaochn47 commented Mar 9, 2024

@ahrtr @serathius

it only removes the watcher from a temporary watcherGroup (see below). So it doesn't really remove the watcher? Could you double confirm this, e.g by writing a unit test? Thanks.

Because the .choose function is written badly. It sometimes mutates the original watchGroup, sometimes returns the same watch group, sometimes a copy. It's not great.

Yeah, I have confirmed it would cause compacted watcher still exists in the original watcher group after wg.choose with the following unit test.

Also I observed that slow watchers metrics is 829 even if I only opened 801 watchers. Haven't yet dig into that but it could be related.

func TestWatchGroupUpdate(t *testing.T) {
	ch := make(chan WatchResponse, chanBufLen)
	compactRev := int64(5)
	curRev := int64(12)
	wg := newWatcherGroup()
	wg.add(&watcher{
		key:    []byte("foo/"),
		end:    []byte("foo0"),
		minRev: 2,
		id:     0,
		ch:     ch,
	})
	wg.add(&watcher{
		key:    []byte("foo/"),
		end:    []byte("foo0"),
		minRev: 3,
		id:     1,
		ch:     ch,
	})

	wg.choose(1 /* maxWatchers */, curRev, compactRev)
	// we would expect whatever the picked compacted watcher should be deleted from the watch group
	for w := range wg.watchers {
		t.Logf("compactRev is: %d; watcher %d with minRev %d still in the watcher group", compactRev, w.id, w.minRev)
	}
	require.Equal(t, 1, len(wg.watchers))
}
dev-dsk-chaochn-2c-a26acd76 % GOWORK=off go test -v -run TestWatchGroupUpdate
=== RUN   TestWatchGroupUpdate
    watcher_group_test.go:32: compactRev is: 5; watcher 0 with minRev 2 still in the watcher group
    watcher_group_test.go:32: compactRev is: 5; watcher 1 with minRev 3 still in the watcher group
    watcher_group_test.go:34:
        	Error Trace:	/home/chaochn/workplace/EKS-etcd/src/EKS-etcd/server/storage/mvcc/watcher_group_test.go:34
        	Error:      	Not equal:
        	            	expected: 1
        	            	actual  : 2
        	Test:       	TestWatchGroupUpdate
--- FAIL: TestWatchGroupUpdate (0.00s)
FAIL
exit status 1
FAIL	go.etcd.io/etcd/server/v3/storage/mvcc	0.011s

I can submit another separate PR to fix it.

@chaochn47
Copy link
Member Author

Are you able to create a simple unit test to simulate the case that w.ch is full instead?

I would like to keep the existing e2e test to ensure no regression of etcd watch behavior after the fix gets in. I would simplify it by removing unnecessary code paths.

Unit test is good to have and I am working on it.

@ahrtr
Copy link
Member

ahrtr commented Mar 10, 2024

Please ping me back when you resolve all comments.

@ahrtr
Copy link
Member

ahrtr commented Mar 10, 2024

I would like to keep the existing e2e test to ensure no regression of etcd watch behavior after the fix gets in. I would simplify it by removing unnecessary code paths.

Unit test is good to have and I am working on it.

OK

@serathius
Copy link
Member

I think we might need to do more changes in the watch code soon.

Let's go with a minimal fix proposed by the original PR to address the issue.

@chaochn47 can you simplify the e2e test? I should be able to help with if needed.

tests/e2e/watch_delay_test.go Outdated Show resolved Hide resolved
tests/e2e/watch_delay_test.go Outdated Show resolved Hide resolved
@chaochn47
Copy link
Member Author

Ping @ahrtr @serathius

@serathius
Copy link
Member

Have you considered using a gofailpoint to simulate watch stream being clogged?

I managed to reproduce the issue using a following test:

func TestV3NoEventsLostOnCompact(t *testing.T) {
	if integration.ThroughProxy {
		t.Skip("grpc proxy currently does not support requesting progress notifications")
	}
	integration.BeforeTest(t)

	clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
	defer clus.Terminate(t)

	client := clus.RandClient()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	writeCount := mvcc.WatchStreamResponseBufferLen * 11 / 10

	wch := client.Watch(ctx, "foo")
	require.NoError(t, gofail.Enable("watchResponseSend", `sleep(1000)`))
	var rev int64 = 0
	for i := 0; i < writeCount; i++ {
		resp, err := client.Put(ctx, "foo", "bar")
		require.NoError(t, err)
		rev = resp.Header.Revision
	}
	_, err := client.Compact(ctx, rev)
	require.NoError(t, err)
	time.Sleep(time.Second)
	require.NoError(t, gofail.Disable("watchResponseSend"))

	event_count := 0
	compacted := false
	for resp := range wch {
		err = resp.Err()
		if err != nil {
			if !strings.Contains(err.Error(), "required revision has been compacted") {
				t.Fatal(err)
			}
			compacted = true
			break
		}
		event_count += len(resp.Events)
		if event_count == writeCount {
			break
		}
	}
	assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", event_count, writeCount)
}

Where watchResponseSend is a new failpoint before grpc.Send

@serathius serathius added the priority/critical-urgent Highest priority. Must be actively worked on as someone's top priority right now. label Mar 13, 2024
@chaochn47
Copy link
Member Author

chaochn47 commented Mar 14, 2024

Have you considered using a gofailpoint to simulate watch stream being clogged?

I managed to reproduce the issue using a following test:

Thanks! I have updated the integration test based on the one you provided!

Added unit test, integration test with failpoint and e2e test together and let you guys pick which one is the best.

@chaochn47 chaochn47 force-pushed the fix-watch-event-loss branch 3 times, most recently from 1712f23 to 12514b9 Compare March 14, 2024 06:42
server/storage/mvcc/watchable_store.go Outdated Show resolved Hide resolved
server/storage/mvcc/watchable_store.go Outdated Show resolved Hide resolved
server/etcdserver/api/v3rpc/watch.go Outdated Show resolved Hide resolved
tests/robustness/makefile.mk Show resolved Hide resolved
server/storage/mvcc/watchable_store_test.go Outdated Show resolved Hide resolved
tests/e2e/watch_delay_test.go Outdated Show resolved Hide resolved
@ahrtr
Copy link
Member

ahrtr commented Mar 14, 2024

The e2e test is still a little over complicated to me. It will definitely be painful for other contributors to update/maintain such e2e test in future. I think it should be part of the robustness test, which already supports generating traffic, watching, compaction and verifying results (including watchResponses), why do you have to write similar complex test in e2e test suite? But I won't insist on that.

@chaochn47
Copy link
Member Author

chaochn47 commented Mar 14, 2024

The e2e test is still a little over complicated to me. It will definitely be painful for other contributors to update/maintain such e2e test in future. I think it should be part of the robustness test, which already supports generating traffic, watching, compaction and verifying results (including watchResponses), why do you have to write similar complex test in e2e test suite? But I won't insist on that.

Okay. This e2e test is removed from the PR and can be added in robustness or performance test suite later. This test was created originally to simulate kubernetes traffic. With unit test and integration test, it's enough to capture regression.

@chaochn47
Copy link
Member Author

/retest

Copy link
Member

@ahrtr ahrtr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with two very minor comments, which can be resolved separately.

We can let this PR in for now if there is no other comment. cc @serathius

server/storage/mvcc/watchable_store_test.go Outdated Show resolved Hide resolved
server/storage/mvcc/watchable_store_test.go Outdated Show resolved Hide resolved
@@ -250,6 +251,63 @@ func TestWatchCompacted(t *testing.T) {
}
}

func TestWatchNoEventLossOnCompact(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how relevant is maxWatchersPerSync to the issue and this test as len(watchers) < 4.

What about the case where len(watchers) > maxWatchersPerSync as pointed out in #17555 (comment) ? I haven't verified it, but I expect that unremoved watcher from s.unsynced will cause syncWatchers to return != 0, causing function to be called earlier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how relevant is maxWatchersPerSync to the issue and this test as len(watchers) < 4.

It seems to be true. Confirmed that it always runs into the if branch (see below), and confirmed that it has no any impact on the test case.

But it isn't a big deal, and also from another prospective it should be OK to explicitly set a value to ensure len(wg.watchers) < maxWatchers although it's already true by default.

if len(wg.watchers) < maxWatchers {
return wg, wg.chooseAll(curRev, compactRev)
}

What about the case where len(watchers) > maxWatchersPerSync as pointed out in #17555 (comment) ?

Suggest to discuss & fix it separately. We may want to do minor local code refactor. @chaochn47 are you able to continue to work on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed that it has no any impact on the test case.

I mean that the even I don't change maxWatchersPerSync in the test case , the test case could also reproduce the issue without the fix and the issue disappeared after applying the patch in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

// if its next revision of events are compacted and no lost events sent to client.
func TestV3NoEventsLostOnCompact(t *testing.T) {
if integration.ThroughProxy {
t.Skip("grpc proxy currently does not support requesting progress notifications")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this comment is relevant.

@ahrtr
Copy link
Member

ahrtr commented Mar 18, 2024

@chaochn47 do you have bandwidth to backport this to 3.5 and 3.4? We may need to release a 3.4 and 3.5 patch soon.

@jmhbnz jmhbnz mentioned this pull request Mar 18, 2024
2 tasks
chaochn47 added a commit to chaochn47/etcd that referenced this pull request Mar 19, 2024
chaochn47 added a commit to chaochn47/etcd that referenced this pull request Mar 19, 2024
@chaochn47
Copy link
Member Author

@chaochn47 do you have bandwidth to backport this to 3.5 and 3.4? We may need to release a 3.4 and 3.5 patch soon.

@ahrtr @jmhbnz sure. Just provided backports PRs. Could you please take a look?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport/v3.4 backport/v3.5 priority/critical-urgent Highest priority. Must be actively worked on as someone's top priority right now.
Development

Successfully merging this pull request may close these issues.

etcd watch events starvation / lost multiplexed on a single watch stream
5 participants