From e75271172881a9f0349862bbecb5e74f2552ee20 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 24 Nov 2021 15:34:15 -0600 Subject: [PATCH 01/26] feat: Add context helpers to package. This change adds new methods .AddTo() and .From() to assist with threading clocks through code paths via context. Enables simple fake clock usage in test where needed. Closes #32 --- clockwork.go | 8 ++++++++ context.go | 25 +++++++++++++++++++++++++ context_test.go | 26 ++++++++++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 context.go create mode 100644 context_test.go diff --git a/clockwork.go b/clockwork.go index 1018051..b421528 100644 --- a/clockwork.go +++ b/clockwork.go @@ -1,6 +1,7 @@ package clockwork import ( + "context" "sync" "time" ) @@ -25,6 +26,8 @@ type FakeClock interface { // BlockUntil will block until the FakeClock has the given number of // sleepers (callers of Sleep or After) BlockUntil(n int) + // AddTo creates a derived context that references the clock. + AddTo(ctx context.Context) context.Context } // NewRealClock returns a Clock which simply delegates calls to the actual time @@ -193,3 +196,8 @@ func (fc *fakeClock) BlockUntil(n int) { fc.l.Unlock() <-b.ch } + +// AddTo creates a derived context that references the clock. +func (fc *fakeClock) AddTo(ctx context.Context) context.Context { + return AddTo(ctx, fc) +} diff --git a/context.go b/context.go new file mode 100644 index 0000000..ebf950a --- /dev/null +++ b/context.go @@ -0,0 +1,25 @@ +package clockwork + +import ( + "context" +) + +// contextKey is private to this package so we can ensure uniqueness here. This +// type identifies context values provided by this package. +type contextKey string + +// keyClock provides a clock for injecting during tests. If absent, a real clock should be used. +var keyClock = contextKey("clock") // clockwork.Clock + +// AddTo creates a derived context that references the specified clock. +func AddTo(ctx context.Context, clock Clock) context.Context { + return context.WithValue(ctx, keyClock, clock) +} + +// From extracts a clock from the context. If not present, a real clock is returned. +func From(ctx context.Context) Clock { + if clock, ok := ctx.Value(keyClock).(Clock); ok { + return clock + } + return NewRealClock() +} diff --git a/context_test.go b/context_test.go new file mode 100644 index 0000000..8bb9677 --- /dev/null +++ b/context_test.go @@ -0,0 +1,26 @@ +package clockwork + +import ( + "context" + "reflect" + "testing" +) + +func TestContextOps(t *testing.T) { + ctx := context.Background() + assertIsType(t, NewRealClock(), From(ctx)) + + ctx = AddTo(ctx, NewFakeClock()) + assertIsType(t, NewFakeClock(), From(ctx)) + + ctx = AddTo(ctx, NewRealClock()) + assertIsType(t, NewRealClock(), From(ctx)) +} + +func assertIsType(t *testing.T, expectedType interface{}, object interface{}) { + t.Helper() + + if reflect.TypeOf(object) != reflect.TypeOf(expectedType) { + t.Fatalf("Object expected to be of type %v, but was %v", reflect.TypeOf(expectedType), reflect.TypeOf(object)) + } +} From 169a38169ed5644ea47501b3465a8bce294f163d Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Sat, 27 Nov 2021 14:59:22 +0100 Subject: [PATCH 02/26] ci: rename workflow file Signed-off-by: Mark Sagi-Kazar --- .github/workflows/{ci.yml => ci.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .github/workflows/{ci.yml => ci.yaml} (100%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yaml similarity index 100% rename from .github/workflows/ci.yml rename to .github/workflows/ci.yaml From 84f3272a33aae738f115d0f329411d9f2f47cb2e Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Sat, 27 Nov 2021 15:00:17 +0100 Subject: [PATCH 03/26] ci: add more Go versions Signed-off-by: Mark Sagi-Kazar --- .github/workflows/ci.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e45455c..1207d96 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,9 +12,8 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ['1.11', '1.12', '1.13', '1.14'] + go: ['1.11', '1.12', '1.13', '1.14', '1.15', '1.16', '1.17'] env: - VERBOSE: 1 GOFLAGS: -mod=readonly steps: From 82d7ea2a273f85d8bdee999731c25822501a7f3e Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Mon, 29 Nov 2021 09:24:39 -0700 Subject: [PATCH 04/26] Remove AddTo convenience from fakeclock. AddTo -> AddToContext From -> FromContext --- clockwork.go | 8 -------- context.go | 8 ++++---- context_test.go | 10 +++++----- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/clockwork.go b/clockwork.go index b421528..1018051 100644 --- a/clockwork.go +++ b/clockwork.go @@ -1,7 +1,6 @@ package clockwork import ( - "context" "sync" "time" ) @@ -26,8 +25,6 @@ type FakeClock interface { // BlockUntil will block until the FakeClock has the given number of // sleepers (callers of Sleep or After) BlockUntil(n int) - // AddTo creates a derived context that references the clock. - AddTo(ctx context.Context) context.Context } // NewRealClock returns a Clock which simply delegates calls to the actual time @@ -196,8 +193,3 @@ func (fc *fakeClock) BlockUntil(n int) { fc.l.Unlock() <-b.ch } - -// AddTo creates a derived context that references the clock. -func (fc *fakeClock) AddTo(ctx context.Context) context.Context { - return AddTo(ctx, fc) -} diff --git a/context.go b/context.go index ebf950a..edbb368 100644 --- a/context.go +++ b/context.go @@ -11,13 +11,13 @@ type contextKey string // keyClock provides a clock for injecting during tests. If absent, a real clock should be used. var keyClock = contextKey("clock") // clockwork.Clock -// AddTo creates a derived context that references the specified clock. -func AddTo(ctx context.Context, clock Clock) context.Context { +// AddToContext creates a derived context that references the specified clock. +func AddToContext(ctx context.Context, clock Clock) context.Context { return context.WithValue(ctx, keyClock, clock) } -// From extracts a clock from the context. If not present, a real clock is returned. -func From(ctx context.Context) Clock { +// FromContext extracts a clock from the context. If not present, a real clock is returned. +func FromContext(ctx context.Context) Clock { if clock, ok := ctx.Value(keyClock).(Clock); ok { return clock } diff --git a/context_test.go b/context_test.go index 8bb9677..73d74e7 100644 --- a/context_test.go +++ b/context_test.go @@ -8,13 +8,13 @@ import ( func TestContextOps(t *testing.T) { ctx := context.Background() - assertIsType(t, NewRealClock(), From(ctx)) + assertIsType(t, NewRealClock(), FromContext(ctx)) - ctx = AddTo(ctx, NewFakeClock()) - assertIsType(t, NewFakeClock(), From(ctx)) + ctx = AddToContext(ctx, NewFakeClock()) + assertIsType(t, NewFakeClock(), FromContext(ctx)) - ctx = AddTo(ctx, NewRealClock()) - assertIsType(t, NewRealClock(), From(ctx)) + ctx = AddToContext(ctx, NewRealClock()) + assertIsType(t, NewRealClock(), FromContext(ctx)) } func assertIsType(t *testing.T, expectedType interface{}, object interface{}) { From 556fee8b4e70a645dbede2d83b7ac21a20ca4eda Mon Sep 17 00:00:00 2001 From: Nick Santos Date: Wed, 23 Mar 2022 16:48:05 -0400 Subject: [PATCH 05/26] fix: fixes a deadlock in BlockUntil fixes https://github.com/jonboulle/clockwork/issues/35 --- clockwork.go | 14 +++++++------- clockwork_test.go | 32 ++++++++++++++++++++++++++++---- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/clockwork.go b/clockwork.go index 1018051..8c0e257 100644 --- a/clockwork.go +++ b/clockwork.go @@ -113,12 +113,12 @@ func (fc *fakeClock) After(d time.Duration) <-chan time.Time { return done } -// notifyBlockers notifies all the blockers waiting until the -// given number of sleepers are waiting on the fakeClock. It -// returns an updated slice of blockers (i.e. those still waiting) +// notifyBlockers notifies all the blockers waiting until the at least the given +// number of sleepers are waiting on the fakeClock. It returns an updated slice +// of blockers (i.e. those still waiting) func notifyBlockers(blockers []*blocker, count int) (newBlockers []*blocker) { for _, b := range blockers { - if b.count == count { + if b.count <= count { close(b.ch) } else { newBlockers = append(newBlockers, b) @@ -179,12 +179,12 @@ func (fc *fakeClock) Advance(d time.Duration) { // (callers of Sleep or After) func (fc *fakeClock) BlockUntil(n int) { fc.l.Lock() - // Fast path: current number of sleepers is what we're looking for - if len(fc.sleepers) == n { + // Fast path: we already have >= n sleepers. + if len(fc.sleepers) >= n { fc.l.Unlock() return } - // Otherwise, set up a new blocker + // Otherwise, we have < n sleepers. Set up a new blocker to wait for more. b := &blocker{ count: n, ch: make(chan struct{}), diff --git a/clockwork_test.go b/clockwork_test.go index 6b8b5cf..e5dcf72 100644 --- a/clockwork_test.go +++ b/clockwork_test.go @@ -89,8 +89,13 @@ func TestNotifyBlockers(t *testing.T) { b5 := &blocker{10, make(chan struct{})} bs := []*blocker{b1, b2, b3, b4, b5} bs1 := notifyBlockers(bs, 2) - if n := len(bs1); n != 4 { - t.Fatalf("got %d blockers, want %d", n, 4) + if n := len(bs1); n != 3 { + t.Fatalf("got %d blockers, want %d", n, 3) + } + select { + case <-b1.ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for channel close!") } select { case <-b2.ch: @@ -98,8 +103,13 @@ func TestNotifyBlockers(t *testing.T) { t.Fatalf("timed out waiting for channel close!") } bs2 := notifyBlockers(bs1, 10) - if n := len(bs2); n != 2 { - t.Fatalf("got %d blockers, want %d", n, 2) + if n := len(bs2); n != 0 { + t.Fatalf("got %d blockers, want %d", n, 0) + } + select { + case <-b3.ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for channel close!") } select { case <-b4.ch: @@ -144,3 +154,17 @@ func TestFakeClockSince(t *testing.T) { t.Fatalf("fakeClock.Since() returned unexpected duration, got: %d, want: %d", fc.Since(now), elapsedTime) } } + +// This used to result in a deadlock. +// https://github.com/jonboulle/clockwork/issues/35 +func TestTwoBlockersOneBlock(t *testing.T) { + fc := &fakeClock{} + + ft1 := fc.NewTicker(time.Second) + ft2 := fc.NewTicker(time.Second) + + fc.BlockUntil(1) + fc.BlockUntil(2) + ft1.Stop() + ft2.Stop() +} From 2e53091e7ba9360d2820ab496e71a46e0aed4872 Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Thu, 24 Mar 2022 09:15:23 +0100 Subject: [PATCH 06/26] ci: run tests with Go 1.18 Signed-off-by: Mark Sagi-Kazar --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1207d96..206c185 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ['1.11', '1.12', '1.13', '1.14', '1.15', '1.16', '1.17'] + go: ['1.11', '1.12', '1.13', '1.14', '1.15', '1.16', '1.17', '1.18'] env: GOFLAGS: -mod=readonly From a2eb5578ef081df8a4c5d4ad2422db3fa838edb7 Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Sat, 9 Apr 2022 09:52:49 -0400 Subject: [PATCH 07/26] Add Timers Implement fake timers in a manner similar to fake tickers --- clockwork.go | 28 ++++++++++- timer.go | 97 ++++++++++++++++++++++++++++++++++++ timer_test.go | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 timer.go create mode 100644 timer_test.go diff --git a/clockwork.go b/clockwork.go index 8c0e257..9e1c814 100644 --- a/clockwork.go +++ b/clockwork.go @@ -13,6 +13,7 @@ type Clock interface { Now() time.Time Since(t time.Time) time.Duration NewTicker(d time.Duration) Ticker + NewTimer(d time.Duration) Timer } // FakeClock provides an interface for a clock which can be @@ -70,6 +71,10 @@ func (rc *realClock) NewTicker(d time.Duration) Ticker { return &realTicker{time.NewTicker(d)} } +func (rc *realClock) NewTimer(d time.Duration) Timer { + return &realTimer{time.NewTimer(d)} +} + type fakeClock struct { sleepers []*sleeper blockers []*blocker @@ -132,7 +137,7 @@ func (fc *fakeClock) Sleep(d time.Duration) { <-fc.After(d) } -// Time returns the current time of the fakeClock +// Now returns the current time of the fakeClock func (fc *fakeClock) Now() time.Time { fc.l.RLock() t := fc.time @@ -145,6 +150,8 @@ func (fc *fakeClock) Since(t time.Time) time.Duration { return fc.Now().Sub(t) } +// NewTicker returns a ticker that will expire only after calls to fakeClock +// Advance have moved the clock passed the given duration func (fc *fakeClock) NewTicker(d time.Duration) Ticker { ft := &fakeTicker{ c: make(chan time.Time, 1), @@ -156,6 +163,25 @@ func (fc *fakeClock) NewTicker(d time.Duration) Ticker { return ft } +// NewTimer returns a timer that will fire only after calls to fakeClock +// Advance have moved the clock passed the given duration +func (fc *fakeClock) NewTimer(d time.Duration) Timer { + stopped := uint32(0) + if d <= 0 { + stopped = 1 + } + ft := &fakeTimer{ + c: make(chan time.Time, 1), + stop: make(chan struct{}, 1), + reset: make(chan reset, 1), + clock: fc, + stopped: stopped, + } + + ft.run(d) + return ft +} + // Advance advances fakeClock to a new point in time, ensuring channels from any // previous invocations of After are notified appropriately before returning func (fc *fakeClock) Advance(d time.Duration) { diff --git a/timer.go b/timer.go new file mode 100644 index 0000000..d41d961 --- /dev/null +++ b/timer.go @@ -0,0 +1,97 @@ +package clockwork + +import ( + "sync/atomic" + "time" +) + +// Timer provides an interface which can be used instead of directly +// using the timer within the time module. The real-time timer t +// provides events through t.C which becomes now t.Chan() to make +// this channel requirement definable in this interface. +type Timer interface { + Chan() <-chan time.Time + Reset(d time.Duration) bool + Stop() bool +} + +type realTimer struct { + *time.Timer +} + +func (r realTimer) Chan() <-chan time.Time { + return r.C +} + +type fakeTimer struct { + c chan time.Time + clock FakeClock + stop chan struct{} + reset chan reset + stopped uint32 +} + +func (f *fakeTimer) Chan() <-chan time.Time { + return f.c +} + +func (f *fakeTimer) Reset(d time.Duration) bool { + stopped := f.Stop() + + f.reset <- reset{t: f.clock.Now().Add(d), next: f.clock.After(d)} + if d > 0 { + atomic.StoreUint32(&f.stopped, 0) + } + + return stopped +} + +func (f *fakeTimer) Stop() bool { + if atomic.CompareAndSwapUint32(&f.stopped, 0, 1) { + f.stop <- struct{}{} + return true + } + return false +} + +type reset struct { + t time.Time + next <-chan time.Time +} + +// run initializes a background goroutine to send the timer event to the timer channel +// after the period. Events are discarded if the underlying ticker channel does not have +// enough capacity. +func (f *fakeTimer) run(initialDuration time.Duration) { + nextTick := f.clock.Now().Add(initialDuration) + next := f.clock.After(initialDuration) + + waitForReset := func() (time.Time, <-chan time.Time) { + for { + select { + case <-f.stop: + continue + case r := <-f.reset: + return r.t, r.next + } + } + } + + go func() { + for { + select { + case <-f.stop: + case <-next: + atomic.StoreUint32(&f.stopped, 1) + select { + case f.c <- nextTick: + default: + } + + next = nil + } + + nextTick, next = waitForReset() + } + }() +} diff --git a/timer_test.go b/timer_test.go new file mode 100644 index 0000000..14e1ff4 --- /dev/null +++ b/timer_test.go @@ -0,0 +1,135 @@ +package clockwork + +import ( + "testing" + "time" +) + +func TestFakeClockTimerStop(t *testing.T) { + fc := &fakeClock{} + + ft := fc.NewTimer(1) + ft.Stop() + select { + case <-ft.Chan(): + t.Errorf("received unexpected tick!") + default: + } +} + +func TestFakeClockTimers(t *testing.T) { + fc := &fakeClock{} + + zero := fc.NewTimer(0) + + if zero.Stop() { + t.Errorf("zero timer could be stopped") + } + + timeout := time.NewTimer(500 * time.Millisecond) + defer timeout.Stop() + + select { + case <-zero.Chan(): + case <-timeout.C: + t.Errorf("zero timer didn't emit time") + } + + one := fc.NewTimer(1) + + select { + case <-one.Chan(): + t.Errorf("non-zero timer did emit time") + default: + } + if !one.Stop() { + t.Errorf("non-zero timer couldn't be stopped") + } + + fc.Advance(5) + + select { + case <-one.Chan(): + t.Errorf("stopped timer did emit time") + default: + } + + if one.Reset(1) { + t.Errorf("resetting stopped timer didn't return false") + } + if !one.Reset(1) { + t.Errorf("resetting active timer didn't return true") + } + + fc.Advance(1) + + select { + case <-time.After(500 * time.Millisecond): + } + + if one.Stop() { + t.Errorf("triggered timer could be stopped") + } + + timeout2 := time.NewTimer(500 * time.Millisecond) + defer timeout2.Stop() + + select { + case <-one.Chan(): + case <-timeout2.C: + t.Errorf("triggered timer didn't emit time") + } + + fc.Advance(1) + + select { + case <-one.Chan(): + t.Errorf("triggered timer emitted time more than once") + default: + } + + one.Reset(0) + + if one.Stop() { + t.Errorf("reset to zero timer could be stopped") + } + + timeout3 := time.NewTimer(500 * time.Millisecond) + defer timeout3.Stop() + + select { + case <-one.Chan(): + case <-timeout3.C: + t.Errorf("reset to zero timer didn't emit time") + } +} + +func TestFakeClockTimer_Race(t *testing.T) { + fc := NewFakeClock() + + timer := fc.NewTimer(1 * time.Millisecond) + defer timer.Stop() + + fc.Advance(1 * time.Millisecond) + + timeout := time.NewTimer(500 * time.Millisecond) + defer timeout.Stop() + + select { + case <-timer.Chan(): + // Pass + case <-timeout.C: + t.Fatalf("Timer didn't detect the clock advance!") + } +} + +func TestFakeClockTimer_Race2(t *testing.T) { + fc := NewFakeClock() + timer := fc.NewTimer(5 * time.Second) + for i := 0; i < 100; i++ { + fc.Advance(5 * time.Second) + <-timer.Chan() + timer.Reset(5 * time.Second) + } + timer.Stop() +} From 9610f832f3132cbafea219ebf3d88339bd8c705c Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Thu, 21 Apr 2022 12:12:38 +0200 Subject: [PATCH 08/26] chore: tweak CI config Signed-off-by: Mark Sagi-Kazar --- .github/.editorconfig | 2 ++ .github/dependabot.yaml | 9 +++++++++ .github/release.yml | 29 +++++++++++++++++++++++++++++ .github/workflows/.editorconfig | 2 -- 4 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 .github/.editorconfig create mode 100644 .github/dependabot.yaml create mode 100644 .github/release.yml delete mode 100644 .github/workflows/.editorconfig diff --git a/.github/.editorconfig b/.github/.editorconfig new file mode 100644 index 0000000..0902c6a --- /dev/null +++ b/.github/.editorconfig @@ -0,0 +1,2 @@ +[{*.yml,*.yaml}] +indent_size = 2 diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml new file mode 100644 index 0000000..f80132b --- /dev/null +++ b/.github/dependabot.yaml @@ -0,0 +1,9 @@ +version: 2 + +updates: + - package-ecosystem: github-actions + directory: / + labels: + - dependencies + schedule: + interval: daily diff --git a/.github/release.yml b/.github/release.yml new file mode 100644 index 0000000..ff093dc --- /dev/null +++ b/.github/release.yml @@ -0,0 +1,29 @@ +changelog: + exclude: + labels: + - release-note/ignore + categories: + - title: Exciting New Features 🎉 + labels: + - release-note/new-feature + - title: Enhancements 🚀 + labels: + - enhancement + - release-note/enhancement + - title: Bug Fixes 🐛 + labels: + - bug + - release-note/bug-fix + - title: Breaking Changes 🛠 + labels: + - release-note/breaking-change + - title: Deprecations ❌ + labels: + - release-note/deprecation + - title: Dependency Updates ⬆️ + labels: + - dependencies + - release-note/dependency-update + - title: Other Changes + labels: + - "*" diff --git a/.github/workflows/.editorconfig b/.github/workflows/.editorconfig deleted file mode 100644 index 7bd3346..0000000 --- a/.github/workflows/.editorconfig +++ /dev/null @@ -1,2 +0,0 @@ -[*.yml] -indent_size = 2 From 6881e71052b8a9f940019911a6d1db363d51a435 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 21 Apr 2022 10:35:54 +0000 Subject: [PATCH 09/26] build(deps): bump actions/checkout from 2 to 3 Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v3) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 206c185..a5479b3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -23,7 +23,7 @@ jobs: go-version: ${{ matrix.go }} - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Run tests run: go test -v -race From 9ac2057e6f940fa0df2ad2c34b6ba2ab201ed9d5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 21 Apr 2022 10:35:56 +0000 Subject: [PATCH 10/26] build(deps): bump actions/setup-go from 2 to 3 Bumps [actions/setup-go](https://github.com/actions/setup-go) from 2 to 3. - [Release notes](https://github.com/actions/setup-go/releases) - [Commits](https://github.com/actions/setup-go/compare/v2...v3) --- updated-dependencies: - dependency-name: actions/setup-go dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 206c185..2dd0874 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -18,7 +18,7 @@ jobs: steps: - name: Set up Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: ${{ matrix.go }} From e2cf8ea5ba19372e0538e3abf512a4db66d46e00 Mon Sep 17 00:00:00 2001 From: Martin von Gagern Date: Wed, 22 Jun 2022 15:03:14 +0100 Subject: [PATCH 11/26] Expose race condition for multiple Reset events This has been reported in https://github.com/jonboulle/clockwork/issues/42. A subsequent commit should address the problem. --- timer_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/timer_test.go b/timer_test.go index 14e1ff4..d691cd8 100644 --- a/timer_test.go +++ b/timer_test.go @@ -133,3 +133,45 @@ func TestFakeClockTimer_Race2(t *testing.T) { } timer.Stop() } + +func TestFakeClockTimer_ResetRace(t *testing.T) { + t.Parallel() + fc := NewFakeClock() + d := 5 * time.Second + var times []time.Time + timer := fc.NewTimer(d) + done := make(chan struct{}) + go func() { + for { + select { + case <-done: + break + case now := <-timer.Chan(): + times = append(times, now) + } + } + }() + for i := 0; i < 100; i++ { + for j := 0; j < 10; j++ { + timer.Reset(d) + } + fc.Advance(d) + } + timer.Stop() + close(done) + for i := 1; i < len(times); i++ { + if times[i-1] == times[i] { + t.Fatalf("Timer repeatedly reported the same time.") + } + } +} + +func TestFakeClockTimer_ZeroResetDoesNotBlock(t *testing.T) { + t.Parallel() + fc := NewFakeClock() + timer := fc.NewTimer(0) + for i := 0; i < 10; i++ { + timer.Reset(0) + } + <-timer.Chan() +} From 74de232f2f1842c73dc50b39b661dd13f8afd8bd Mon Sep 17 00:00:00 2001 From: Martin von Gagern Date: Mon, 27 Jun 2022 10:32:40 +0100 Subject: [PATCH 12/26] Sort sleepers to ensure delivery in the correct order --- clockwork.go | 12 ++++++++++-- clockwork_test.go | 20 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/clockwork.go b/clockwork.go index 9e1c814..5e3235b 100644 --- a/clockwork.go +++ b/clockwork.go @@ -1,6 +1,7 @@ package clockwork import ( + "sort" "sync" "time" ) @@ -76,7 +77,7 @@ func (rc *realClock) NewTimer(d time.Duration) Timer { } type fakeClock struct { - sleepers []*sleeper + sleepers sleepers blockers []*blocker time time.Time @@ -95,6 +96,12 @@ type blocker struct { ch chan struct{} } +type sleepers []*sleeper + +func (s sleepers) Len() int { return len(s) } +func (s sleepers) Less(i, j int) bool { return s[i].until.Before(s[j].until) } +func (s sleepers) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + // After mimics time.After; it waits for the given duration to elapse on the // fakeClock, then sends the current time on the returned channel. func (fc *fakeClock) After(d time.Duration) <-chan time.Time { @@ -112,6 +119,7 @@ func (fc *fakeClock) After(d time.Duration) <-chan time.Time { done: done, } fc.sleepers = append(fc.sleepers, s) + sort.Sort(fc.sleepers) // and notify any blockers fc.blockers = notifyBlockers(fc.blockers, len(fc.sleepers)) } @@ -188,7 +196,7 @@ func (fc *fakeClock) Advance(d time.Duration) { fc.l.Lock() defer fc.l.Unlock() end := fc.time.Add(d) - var newSleepers []*sleeper + var newSleepers sleepers for _, s := range fc.sleepers { if end.Sub(s.until) >= 0 { s.done <- end diff --git a/clockwork_test.go b/clockwork_test.go index e5dcf72..0eecce4 100644 --- a/clockwork_test.go +++ b/clockwork_test.go @@ -168,3 +168,23 @@ func TestTwoBlockersOneBlock(t *testing.T) { ft1.Stop() ft2.Stop() } + +func TestAfterDeliveryInOrder(t *testing.T) { + fc := &fakeClock{} + for i := 0; i < 1000; i++ { + three := fc.After(3 * time.Second) + for j := 0; j < 100; j++ { + fc.After(1 * time.Second) + } + two := fc.After(2 * time.Second) + go func() { + fc.Advance(5 * time.Second) + }() + <-three + select { + case <-two: + default: + t.Fatalf("Signals from After delivered out of order") + } + } +} From 5087041d1ebfdab5936b36b92985aa7b2fec4957 Mon Sep 17 00:00:00 2001 From: Martin von Gagern Date: Mon, 27 Jun 2022 11:28:21 +0100 Subject: [PATCH 13/26] Merge the concepts of sleeper and timer A timer with an event in the future is essentially the same thing as a sleeper. So this commit merges the two, using the `fakeTimer` structure where `sleeper` had been used before. Since management of sleepers is performed while holding the lock of the fake clock, this means that the basic timer operations are now synchronized using these locks as well, which should go a long way to making all operations easier to reason about without messing around with atomics. Fixes https://github.com/jonboulle/clockwork/issues/42. --- clockwork.go | 57 ++++++++-------------------- timer.go | 103 +++++++++++++++++++++++---------------------------- 2 files changed, 63 insertions(+), 97 deletions(-) diff --git a/clockwork.go b/clockwork.go index 5e3235b..b989615 100644 --- a/clockwork.go +++ b/clockwork.go @@ -1,7 +1,6 @@ package clockwork import ( - "sort" "sync" "time" ) @@ -84,19 +83,13 @@ type fakeClock struct { l sync.RWMutex } -// sleeper represents a caller of After or Sleep -type sleeper struct { - until time.Time - done chan time.Time -} - // blocker represents a caller of BlockUntil type blocker struct { count int ch chan struct{} } -type sleepers []*sleeper +type sleepers []*fakeTimer func (s sleepers) Len() int { return len(s) } func (s sleepers) Less(i, j int) bool { return s[i].until.Before(s[j].until) } @@ -105,25 +98,7 @@ func (s sleepers) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // After mimics time.After; it waits for the given duration to elapse on the // fakeClock, then sends the current time on the returned channel. func (fc *fakeClock) After(d time.Duration) <-chan time.Time { - fc.l.Lock() - defer fc.l.Unlock() - now := fc.time - done := make(chan time.Time, 1) - if d.Nanoseconds() <= 0 { - // special case - trigger immediately - done <- now - } else { - // otherwise, add to the set of sleepers - s := &sleeper{ - until: now.Add(d), - done: done, - } - fc.sleepers = append(fc.sleepers, s) - sort.Sort(fc.sleepers) - // and notify any blockers - fc.blockers = notifyBlockers(fc.blockers, len(fc.sleepers)) - } - return done + return fc.NewTimer(d).Chan() } // notifyBlockers notifies all the blockers waiting until the at least the given @@ -174,19 +149,18 @@ func (fc *fakeClock) NewTicker(d time.Duration) Ticker { // NewTimer returns a timer that will fire only after calls to fakeClock // Advance have moved the clock passed the given duration func (fc *fakeClock) NewTimer(d time.Duration) Timer { - stopped := uint32(0) - if d <= 0 { - stopped = 1 - } + done := make(chan time.Time, 1) ft := &fakeTimer{ - c: make(chan time.Time, 1), - stop: make(chan struct{}, 1), - reset: make(chan reset, 1), - clock: fc, - stopped: stopped, + c: done, + clock: fc, + callback: func(now time.Time) { + select { + case done <- now: + default: + } + }, } - - ft.run(d) + ft.Reset(d) return ft } @@ -198,10 +172,11 @@ func (fc *fakeClock) Advance(d time.Duration) { end := fc.time.Add(d) var newSleepers sleepers for _, s := range fc.sleepers { - if end.Sub(s.until) >= 0 { - s.done <- end - } else { + if end.Before(s.until) { + // Not due yet. newSleepers = append(newSleepers, s) + } else { + s.callback(s.until) } } fc.sleepers = newSleepers diff --git a/timer.go b/timer.go index d41d961..f3dd82a 100644 --- a/timer.go +++ b/timer.go @@ -1,7 +1,7 @@ package clockwork import ( - "sync/atomic" + "sort" "time" ) @@ -24,74 +24,65 @@ func (r realTimer) Chan() <-chan time.Time { } type fakeTimer struct { - c chan time.Time - clock FakeClock - stop chan struct{} - reset chan reset - stopped uint32 + // The channel associated with this timer. Only relevant for timers that + // originate from NewTimer or similar, i.e. not for AfterFunc or other + // internal usage. + c chan time.Time + + // The fake clock driving events for this timer. + clock *fakeClock + + // The time when the timer expires. Only meaningful if the timer is currently + // one of the fake clock's sleepers. + until time.Time + + // callback will get called synchronously with the lock of the clock being + // held. It receives the time at which the timer expired. + callback func(time.Time) } func (f *fakeTimer) Chan() <-chan time.Time { return f.c } -func (f *fakeTimer) Reset(d time.Duration) bool { - stopped := f.Stop() - - f.reset <- reset{t: f.clock.Now().Add(d), next: f.clock.After(d)} - if d > 0 { - atomic.StoreUint32(&f.stopped, 0) - } - - return stopped +func (f *fakeTimer) Stop() bool { + f.clock.l.Lock() + defer f.clock.l.Unlock() + return f.stopImpl() } -func (f *fakeTimer) Stop() bool { - if atomic.CompareAndSwapUint32(&f.stopped, 0, 1) { - f.stop <- struct{}{} - return true +func (f *fakeTimer) stopImpl() bool { + for i, t := range f.clock.sleepers { + if t == f { + // Remove element, maintaining order. + copy(f.clock.sleepers[i:], f.clock.sleepers[i+1:]) + f.clock.sleepers[len(f.clock.sleepers)-1] = nil + f.clock.sleepers = f.clock.sleepers[:len(f.clock.sleepers)-1] + return true + } } return false } -type reset struct { - t time.Time - next <-chan time.Time +func (f *fakeTimer) Reset(d time.Duration) bool { + f.clock.l.Lock() + defer f.clock.l.Unlock() + stopped := f.stopImpl() + f.resetImpl(d) + return stopped } -// run initializes a background goroutine to send the timer event to the timer channel -// after the period. Events are discarded if the underlying ticker channel does not have -// enough capacity. -func (f *fakeTimer) run(initialDuration time.Duration) { - nextTick := f.clock.Now().Add(initialDuration) - next := f.clock.After(initialDuration) - - waitForReset := func() (time.Time, <-chan time.Time) { - for { - select { - case <-f.stop: - continue - case r := <-f.reset: - return r.t, r.next - } - } +func (f *fakeTimer) resetImpl(d time.Duration) { + now := f.clock.time + if d.Nanoseconds() <= 0 { + // special case - trigger immediately + f.callback(now) + } else { + // otherwise, add to the set of sleepers + f.until = f.clock.time.Add(d) + f.clock.sleepers = append(f.clock.sleepers, f) + sort.Sort(f.clock.sleepers) + // and notify any blockers + f.clock.blockers = notifyBlockers(f.clock.blockers, len(f.clock.sleepers)) } - - go func() { - for { - select { - case <-f.stop: - case <-next: - atomic.StoreUint32(&f.stopped, 1) - select { - case f.c <- nextTick: - default: - } - - next = nil - } - - nextTick, next = waitForReset() - } - }() } From d574a97c1e79cc70d6ee2a5e6e690a6f1be6be3b Mon Sep 17 00:00:00 2001 From: Martin von Gagern Date: Mon, 27 Jun 2022 11:44:10 +0100 Subject: [PATCH 14/26] Add AfterFunc implementation Since the timer is internally using a callback now, switching from a channel send to go routine invocation is straight forward. --- clockwork.go | 26 +++++++++++++++++++++----- timer_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/clockwork.go b/clockwork.go index b989615..6947ed1 100644 --- a/clockwork.go +++ b/clockwork.go @@ -14,6 +14,7 @@ type Clock interface { Since(t time.Time) time.Duration NewTicker(d time.Duration) Ticker NewTimer(d time.Duration) Timer + AfterFunc(d time.Duration, f func()) Timer } // FakeClock provides an interface for a clock which can be @@ -72,7 +73,11 @@ func (rc *realClock) NewTicker(d time.Duration) Ticker { } func (rc *realClock) NewTimer(d time.Duration) Timer { - return &realTimer{time.NewTimer(d)} + return realTimer{time.NewTimer(d)} +} + +func (rc *realClock) AfterFunc(d time.Duration, f func()) Timer { + return realTimer{time.AfterFunc(d, f)} } type fakeClock struct { @@ -133,8 +138,8 @@ func (fc *fakeClock) Since(t time.Time) time.Duration { return fc.Now().Sub(t) } -// NewTicker returns a ticker that will expire only after calls to fakeClock -// Advance have moved the clock passed the given duration +// NewTicker returns a ticker that will expire only after calls to FakeClock +// Advance have moved the clock past the given duration. func (fc *fakeClock) NewTicker(d time.Duration) Ticker { ft := &fakeTicker{ c: make(chan time.Time, 1), @@ -146,8 +151,8 @@ func (fc *fakeClock) NewTicker(d time.Duration) Ticker { return ft } -// NewTimer returns a timer that will fire only after calls to fakeClock -// Advance have moved the clock passed the given duration +// NewTimer returns a timer that will fire only after calls to FakeClock Advance +// have moved the clock past the given duration. func (fc *fakeClock) NewTimer(d time.Duration) Timer { done := make(chan time.Time, 1) ft := &fakeTimer{ @@ -164,6 +169,17 @@ func (fc *fakeClock) NewTimer(d time.Duration) Timer { return ft } +// AfterFunc returns a timer that will invoke the given function only after +// calls to fakeClock Advance have moved the clock passed the given duration. +func (fc *fakeClock) AfterFunc(d time.Duration, f func()) Timer { + ft := &fakeTimer{ + clock: fc, + callback: func(_ time.Time) { go f() }, + } + ft.Reset(d) + return ft +} + // Advance advances fakeClock to a new point in time, ensuring channels from any // previous invocations of After are notified appropriately before returning func (fc *fakeClock) Advance(d time.Duration) { diff --git a/timer_test.go b/timer_test.go index d691cd8..d39af7f 100644 --- a/timer_test.go +++ b/timer_test.go @@ -175,3 +175,51 @@ func TestFakeClockTimer_ZeroResetDoesNotBlock(t *testing.T) { } <-timer.Chan() } + +func TestAfterFunc_Concurrent(t *testing.T) { + timeout := time.NewTimer(500 * time.Millisecond) + defer timeout.Stop() + + fc := NewFakeClock() + blocker := make(chan struct{}) + ch := make(chan int) + // AfterFunc should start goroutines, so each should be able to make progress + // independent of the others. + fc.AfterFunc(2*time.Second, func() { + <-blocker + ch <- 222 + }) + fc.AfterFunc(2*time.Second, func() { + ch <- 111 + }) + fc.AfterFunc(2*time.Second, func() { + <-blocker + ch <- 222 + }) + fc.Advance(2 * time.Second) + select { + case a := <-ch: + if a != 111 { + t.Fatalf("Expected 111, got %d", a) + } + case <-timeout.C: + t.Fatalf("Expected signal hasn't arrived") + } + close(blocker) + select { + case a := <-ch: + if a != 222 { + t.Fatalf("Expected 222, got %d", a) + } + case <-timeout.C: + t.Fatalf("Expected signal hasn't arrived") + } + select { + case a := <-ch: + if a != 222 { + t.Fatalf("Expected 222, got %d", a) + } + case <-timeout.C: + t.Fatalf("Expected signal hasn't arrived") + } +} From 44b242d7350a0e064b3237d1553bee0d8878104e Mon Sep 17 00:00:00 2001 From: Martin von Gagern Date: Mon, 27 Jun 2022 17:06:15 +0100 Subject: [PATCH 15/26] Switch ticker implementation to timers as a basis This avoids the separate goroutine and several levels of indirection. It replaces the previous approach of skipping a number of events with an approach that at least conceptually delivers each tick to the channel if possible without blocking. That is easier to reason about and will more clearly ensure correct ordering of events, as demonstrated by the new test case which had a high probability to fail with the previous implementation. --- clockwork.go | 47 ++++++++++++++++++++++++++-------------------- ticker.go | 51 ++++---------------------------------------------- ticker_test.go | 31 ++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 67 deletions(-) diff --git a/clockwork.go b/clockwork.go index 6947ed1..7b9af79 100644 --- a/clockwork.go +++ b/clockwork.go @@ -69,7 +69,7 @@ func (rc *realClock) Since(t time.Time) time.Duration { } func (rc *realClock) NewTicker(d time.Duration) Ticker { - return &realTicker{time.NewTicker(d)} + return realTicker{time.NewTicker(d)} } func (rc *realClock) NewTimer(d time.Duration) Timer { @@ -141,26 +141,35 @@ func (fc *fakeClock) Since(t time.Time) time.Duration { // NewTicker returns a ticker that will expire only after calls to FakeClock // Advance have moved the clock past the given duration. func (fc *fakeClock) NewTicker(d time.Duration) Ticker { - ft := &fakeTicker{ - c: make(chan time.Time, 1), - stop: make(chan bool, 1), - clock: fc, - period: d, + c := make(chan time.Time, 1) + var ft *fakeTicker + ft = &fakeTicker{ + fakeTimer: fakeTimer{ + c: c, + clock: fc, + callback: func(now time.Time) { + ft.resetImpl(d) + select { + case c <- now: + default: + } + }, + }, } - ft.runTickThread() + ft.Reset(d) return ft } // NewTimer returns a timer that will fire only after calls to FakeClock Advance // have moved the clock past the given duration. func (fc *fakeClock) NewTimer(d time.Duration) Timer { - done := make(chan time.Time, 1) + c := make(chan time.Time, 1) ft := &fakeTimer{ - c: done, + c: c, clock: fc, callback: func(now time.Time) { select { - case done <- now: + case c <- now: default: } }, @@ -186,17 +195,15 @@ func (fc *fakeClock) Advance(d time.Duration) { fc.l.Lock() defer fc.l.Unlock() end := fc.time.Add(d) - var newSleepers sleepers - for _, s := range fc.sleepers { - if end.Before(s.until) { - // Not due yet. - newSleepers = append(newSleepers, s) - } else { - s.callback(s.until) - } + // While first sleeper is ready to wake, wake it. We don't iterate because the + // callback of the sleeper might register a new sleeper, so the list of + // sleepers might change as we execute this. + for len(fc.sleepers) > 0 && !end.Before(fc.sleepers[0].until) { + first := fc.sleepers[0] + fc.sleepers = fc.sleepers[1:] + fc.time = first.until + first.callback(first.until) } - fc.sleepers = newSleepers - fc.blockers = notifyBlockers(fc.blockers, len(fc.sleepers)) fc.time = end } diff --git a/ticker.go b/ticker.go index 32b5d01..d7b6676 100644 --- a/ticker.go +++ b/ticker.go @@ -15,58 +15,15 @@ type Ticker interface { type realTicker struct{ *time.Ticker } -func (rt *realTicker) Chan() <-chan time.Time { +func (rt realTicker) Chan() <-chan time.Time { return rt.C } type fakeTicker struct { - c chan time.Time - stop chan bool - clock FakeClock - period time.Duration -} - -func (ft *fakeTicker) Chan() <-chan time.Time { - return ft.c + fakeTimer } func (ft *fakeTicker) Stop() { - ft.stop <- true -} - -// runTickThread initializes a background goroutine to send the tick time to the ticker channel -// after every period. Tick events are discarded if the underlying ticker channel does not have -// enough capacity. -func (ft *fakeTicker) runTickThread() { - nextTick := ft.clock.Now().Add(ft.period) - next := ft.clock.After(ft.period) - go func() { - for { - select { - case <-ft.stop: - return - case <-next: - // We send the time that the tick was supposed to occur at. - tick := nextTick - // Before sending the tick, we'll compute the next tick time and star the clock.After call. - now := ft.clock.Now() - // First, figure out how many periods there have been between "now" and the time we were - // supposed to have trigged, then advance over all of those. - skipTicks := (now.Sub(tick) + ft.period - 1) / ft.period - nextTick = nextTick.Add(skipTicks * ft.period) - // Now, keep advancing until we are past now. This should happen at most once. - for !nextTick.After(now) { - nextTick = nextTick.Add(ft.period) - } - // Figure out how long between now and the next scheduled tick, then wait that long. - remaining := nextTick.Sub(now) - next = ft.clock.After(remaining) - // Finally, we can actually send the tick. - select { - case ft.c <- tick: - default: - } - } - } - }() + // Ignore returned bool to make signature match. + ft.fakeTimer.Stop() } diff --git a/ticker_test.go b/ticker_test.go index 1f34036..c89d50d 100644 --- a/ticker_test.go +++ b/ticker_test.go @@ -87,3 +87,34 @@ func TestFakeTicker_Race2(t *testing.T) { } ft.Stop() } + +func TestFakeTicker_DeliveryOrder(t *testing.T) { + for i := 0; i < 1000; i++ { + timeout := time.NewTimer(500 * time.Millisecond) + defer timeout.Stop() + fc := NewFakeClock() + ticker := fc.NewTicker(2 * time.Second).Chan() + timer := fc.NewTimer(5 * time.Second).Chan() + go func() { + for j := 0; j < 10; j++ { + fc.BlockUntil(1) + fc.Advance(1 * time.Second) + } + }() + <-ticker + a := <-timer + // Only perform ordering check if ticker channel is drained at first. + select { + case <-ticker: + default: + select { + case b := <-ticker: + if a.After(b) { + t.Fatalf("Expected timer before ticker, got timer %v after %v", a, b) + } + case <-timeout.C: + t.Fatalf("Expected ticker event didn't arrive!") + } + } + } +} From 71412a5ef1eb7fa3c7671ac4e596fb13735473bb Mon Sep 17 00:00:00 2001 From: Martin von Gagern Date: Mon, 27 Jun 2022 17:09:16 +0100 Subject: [PATCH 16/26] Make notifyBlockers a method Repeating the same invocation in multiple places is harly idiomatic, so instead of making the invocation complicated for the sake of test cases, this makes the test case slightly more complicated for the sake of a concise method notation outside tests. --- clockwork.go | 15 ++++++++------- clockwork_test.go | 16 +++++++++++----- timer.go | 2 +- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/clockwork.go b/clockwork.go index 7b9af79..f892ea0 100644 --- a/clockwork.go +++ b/clockwork.go @@ -106,18 +106,19 @@ func (fc *fakeClock) After(d time.Duration) <-chan time.Time { return fc.NewTimer(d).Chan() } -// notifyBlockers notifies all the blockers waiting until the at least the given -// number of sleepers are waiting on the fakeClock. It returns an updated slice -// of blockers (i.e. those still waiting) -func notifyBlockers(blockers []*blocker, count int) (newBlockers []*blocker) { - for _, b := range blockers { +// notifyBlockers notifies all the blockers waiting for the current number of +// sleepers or fewer. +func (fc *fakeClock) notifyBlockers() { + var stillWaiting []*blocker + count := len(fc.sleepers) + for _, b := range fc.blockers { if b.count <= count { close(b.ch) } else { - newBlockers = append(newBlockers, b) + stillWaiting = append(stillWaiting, b) } } - return + fc.blockers = stillWaiting } // Sleep blocks until the given duration has passed on the fakeClock diff --git a/clockwork_test.go b/clockwork_test.go index 0eecce4..4b84ec6 100644 --- a/clockwork_test.go +++ b/clockwork_test.go @@ -87,9 +87,12 @@ func TestNotifyBlockers(t *testing.T) { b3 := &blocker{5, make(chan struct{})} b4 := &blocker{10, make(chan struct{})} b5 := &blocker{10, make(chan struct{})} - bs := []*blocker{b1, b2, b3, b4, b5} - bs1 := notifyBlockers(bs, 2) - if n := len(bs1); n != 3 { + fc := fakeClock{ + blockers: []*blocker{b1, b2, b3, b4, b5}, + sleepers: sleepers{nil, nil}, + } + fc.notifyBlockers() + if n := len(fc.blockers); n != 3 { t.Fatalf("got %d blockers, want %d", n, 3) } select { @@ -102,8 +105,11 @@ func TestNotifyBlockers(t *testing.T) { case <-time.After(time.Second): t.Fatalf("timed out waiting for channel close!") } - bs2 := notifyBlockers(bs1, 10) - if n := len(bs2); n != 0 { + for len(fc.sleepers) < 10 { + fc.sleepers = append(fc.sleepers, nil) + } + fc.notifyBlockers() + if n := len(fc.blockers); n != 0 { t.Fatalf("got %d blockers, want %d", n, 0) } select { diff --git a/timer.go b/timer.go index f3dd82a..e3a9df4 100644 --- a/timer.go +++ b/timer.go @@ -83,6 +83,6 @@ func (f *fakeTimer) resetImpl(d time.Duration) { f.clock.sleepers = append(f.clock.sleepers, f) sort.Sort(f.clock.sleepers) // and notify any blockers - f.clock.blockers = notifyBlockers(f.clock.blockers, len(f.clock.sleepers)) + f.clock.notifyBlockers() } } From 138fb988ed265a6134bbee6499c7860e9e548fdf Mon Sep 17 00:00:00 2001 From: Darren Jacques Date: Thu, 29 Dec 2022 20:52:24 -0800 Subject: [PATCH 17/26] Fix clockwork tests. - Only run TestFakeTicker_DeliveryOrder once per invocation. - Use channels to avoid race condition in TestFakeClockTimer_ResetRace. - Run tests in parallel. Tested: - Run 10,000 times with 2 timeout flakes. - Ran with `-race`, no race conditions reported. --- clockwork_test.go | 24 +++++++++++++++ context_test.go | 4 +-- example_test.go | 17 +++++------ ticker_test.go | 77 +++++++++++++++++++++++++++-------------------- timer_test.go | 59 ++++++++++++++++++------------------ 5 files changed, 108 insertions(+), 73 deletions(-) diff --git a/clockwork_test.go b/clockwork_test.go index 4b84ec6..fc89339 100644 --- a/clockwork_test.go +++ b/clockwork_test.go @@ -6,7 +6,13 @@ import ( "time" ) +// Use a consistent timeout across tests that block on channels. Keeps test +// timeouts limited while being able to easily extend it to allow the test +// process to get killed, providing a stack trace. +const timeout = time.Minute + func TestFakeClockAfter(t *testing.T) { + t.Parallel() fc := &fakeClock{} neg := fc.After(-1) @@ -82,6 +88,7 @@ func TestFakeClockAfter(t *testing.T) { } func TestNotifyBlockers(t *testing.T) { + t.Parallel() b1 := &blocker{1, make(chan struct{})} b2 := &blocker{2, make(chan struct{})} b3 := &blocker{5, make(chan struct{})} @@ -130,6 +137,7 @@ func TestNotifyBlockers(t *testing.T) { } func TestNewFakeClock(t *testing.T) { + t.Parallel() fc := NewFakeClock() now := fc.Now() if now.IsZero() { @@ -143,6 +151,7 @@ func TestNewFakeClock(t *testing.T) { } func TestNewFakeClockAt(t *testing.T) { + t.Parallel() t1 := time.Date(1999, time.February, 3, 4, 5, 6, 7, time.UTC) fc := NewFakeClockAt(t1) now := fc.Now() @@ -152,6 +161,7 @@ func TestNewFakeClockAt(t *testing.T) { } func TestFakeClockSince(t *testing.T) { + t.Parallel() fc := NewFakeClock() now := fc.Now() elapsedTime := time.Second @@ -164,6 +174,7 @@ func TestFakeClockSince(t *testing.T) { // This used to result in a deadlock. // https://github.com/jonboulle/clockwork/issues/35 func TestTwoBlockersOneBlock(t *testing.T) { + t.Parallel() fc := &fakeClock{} ft1 := fc.NewTicker(time.Second) @@ -176,6 +187,7 @@ func TestTwoBlockersOneBlock(t *testing.T) { } func TestAfterDeliveryInOrder(t *testing.T) { + t.Parallel() fc := &fakeClock{} for i := 0; i < 1000; i++ { three := fc.After(3 * time.Second) @@ -194,3 +206,15 @@ func TestAfterDeliveryInOrder(t *testing.T) { } } } + +// TestFakeClockRace detects data races in fakeClock when invoked with run using `go -race ...`. +// There are no failure conditions when invoked without the -race flag. +func TestFakeClockRace(t *testing.T) { + t.Parallel() + fc := &fakeClock{} + d := time.Second + go func() { fc.Advance(d) }() + go func() { fc.NewTicker(d) }() + go func() { fc.NewTimer(d) }() + go func() { fc.Sleep(d) }() +} diff --git a/context_test.go b/context_test.go index 73d74e7..ee10d5b 100644 --- a/context_test.go +++ b/context_test.go @@ -7,6 +7,7 @@ import ( ) func TestContextOps(t *testing.T) { + t.Parallel() ctx := context.Background() assertIsType(t, NewRealClock(), FromContext(ctx)) @@ -17,9 +18,8 @@ func TestContextOps(t *testing.T) { assertIsType(t, NewRealClock(), FromContext(ctx)) } -func assertIsType(t *testing.T, expectedType interface{}, object interface{}) { +func assertIsType(t *testing.T, expectedType, object interface{}) { t.Helper() - if reflect.TypeOf(object) != reflect.TypeOf(expectedType) { t.Fatalf("Object expected to be of type %v, but was %v", reflect.TypeOf(expectedType), reflect.TypeOf(object)) } diff --git a/example_test.go b/example_test.go index 3d5f291..2631c5a 100644 --- a/example_test.go +++ b/example_test.go @@ -6,21 +6,20 @@ import ( "time" ) -// myFunc is an example of a time-dependent function, using an -// injected clock +// myFunc is an example of a time-dependent function, using an injected clock. func myFunc(clock Clock, i *int) { clock.Sleep(3 * time.Second) *i += 1 } -// assertState is an example of a state assertion in a test +// assertState is an example of a state assertion in a test. func assertState(t *testing.T, i, j int) { if i != j { t.Fatalf("i %d, j %d", i, j) } } -// TestMyFunc tests myFunc's behaviour with a FakeClock +// TestMyFunc tests myFunc's behaviour with a FakeClock. func TestMyFunc(t *testing.T) { var i int c := NewFakeClock() @@ -32,18 +31,18 @@ func TestMyFunc(t *testing.T) { wg.Done() }() - // Wait until myFunc is actually sleeping on the clock + // Wait until myFunc is actually sleeping on the clock. c.BlockUntil(1) - // Assert the initial state + // Assert the initial state. assertState(t, i, 0) - // Now advance the clock forward in time + // Now advance the clock forward in time. c.Advance(1 * time.Hour) - // Wait until the function completes + // Wait until the function completes. wg.Wait() - // Assert the final state + // Assert the final state. assertState(t, i, 1) } diff --git a/ticker_test.go b/ticker_test.go index c89d50d..61a7b53 100644 --- a/ticker_test.go +++ b/ticker_test.go @@ -1,11 +1,13 @@ package clockwork import ( + "context" "testing" "time" ) func TestFakeTickerStop(t *testing.T) { + t.Parallel() fc := &fakeClock{} ft := fc.NewTicker(1) @@ -18,6 +20,10 @@ func TestFakeTickerStop(t *testing.T) { } func TestFakeTickerTick(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + fc := &fakeClock{} now := fc.Now() @@ -39,7 +45,7 @@ func TestFakeTickerTick(t *testing.T) { if tick != first { t.Errorf("wrong tick time, got: %v, want: %v", tick, first) } - case <-time.After(time.Millisecond): + case <-ctx.Done(): t.Errorf("expected tick!") } @@ -52,13 +58,17 @@ func TestFakeTickerTick(t *testing.T) { if tick != second { t.Errorf("wrong tick time, got: %v, want: %v", tick, second) } - case <-time.After(time.Millisecond): + case <-ctx.Done(): t.Errorf("expected tick!") } ft.Stop() } func TestFakeTicker_Race(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + fc := NewFakeClock() tickTime := 1 * time.Millisecond @@ -67,54 +77,57 @@ func TestFakeTicker_Race(t *testing.T) { fc.Advance(tickTime) - timeout := time.NewTimer(500 * time.Millisecond) - defer timeout.Stop() - select { case <-ticker.Chan(): - // Pass - case <-timeout.C: + case <-ctx.Done(): t.Fatalf("Ticker didn't detect the clock advance!") } } func TestFakeTicker_Race2(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() fc := NewFakeClock() ft := fc.NewTicker(5 * time.Second) for i := 0; i < 100; i++ { fc.Advance(5 * time.Second) - <-ft.Chan() + select { + case <-ft.Chan(): + case <-ctx.Done(): + t.Fatalf("Ticker didn't detect the clock advance!") + } + } ft.Stop() } func TestFakeTicker_DeliveryOrder(t *testing.T) { - for i := 0; i < 1000; i++ { - timeout := time.NewTimer(500 * time.Millisecond) - defer timeout.Stop() - fc := NewFakeClock() - ticker := fc.NewTicker(2 * time.Second).Chan() - timer := fc.NewTimer(5 * time.Second).Chan() - go func() { - for j := 0; j < 10; j++ { - fc.BlockUntil(1) - fc.Advance(1 * time.Second) - } - }() - <-ticker - a := <-timer - // Only perform ordering check if ticker channel is drained at first. + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + fc := NewFakeClock() + ticker := fc.NewTicker(2 * time.Second).Chan() + timer := fc.NewTimer(5 * time.Second).Chan() + go func() { + for j := 0; j < 10; j++ { + fc.BlockUntil(1) + fc.Advance(1 * time.Second) + } + }() + <-ticker + a := <-timer + // Only perform ordering check if ticker channel is drained at first. + select { + case <-ticker: + default: select { - case <-ticker: - default: - select { - case b := <-ticker: - if a.After(b) { - t.Fatalf("Expected timer before ticker, got timer %v after %v", a, b) - } - case <-timeout.C: - t.Fatalf("Expected ticker event didn't arrive!") + case b := <-ticker: + if a.After(b) { + t.Fatalf("Expected timer before ticker, got timer %v after %v", a, b) } + case <-ctx.Done(): + t.Fatalf("Expected ticker event didn't arrive!") } } } diff --git a/timer_test.go b/timer_test.go index d39af7f..90776af 100644 --- a/timer_test.go +++ b/timer_test.go @@ -1,11 +1,13 @@ package clockwork import ( + "context" "testing" "time" ) func TestFakeClockTimerStop(t *testing.T) { + t.Parallel() fc := &fakeClock{} ft := fc.NewTimer(1) @@ -18,6 +20,10 @@ func TestFakeClockTimerStop(t *testing.T) { } func TestFakeClockTimers(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + fc := &fakeClock{} zero := fc.NewTimer(0) @@ -26,12 +32,9 @@ func TestFakeClockTimers(t *testing.T) { t.Errorf("zero timer could be stopped") } - timeout := time.NewTimer(500 * time.Millisecond) - defer timeout.Stop() - select { case <-zero.Chan(): - case <-timeout.C: + case <-ctx.Done(): t.Errorf("zero timer didn't emit time") } @@ -71,12 +74,9 @@ func TestFakeClockTimers(t *testing.T) { t.Errorf("triggered timer could be stopped") } - timeout2 := time.NewTimer(500 * time.Millisecond) - defer timeout2.Stop() - select { case <-one.Chan(): - case <-timeout2.C: + case <-ctx.Done(): t.Errorf("triggered timer didn't emit time") } @@ -94,36 +94,32 @@ func TestFakeClockTimers(t *testing.T) { t.Errorf("reset to zero timer could be stopped") } - timeout3 := time.NewTimer(500 * time.Millisecond) - defer timeout3.Stop() - select { case <-one.Chan(): - case <-timeout3.C: + case <-ctx.Done(): t.Errorf("reset to zero timer didn't emit time") } } func TestFakeClockTimer_Race(t *testing.T) { - fc := NewFakeClock() + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + fc := NewFakeClock() timer := fc.NewTimer(1 * time.Millisecond) defer timer.Stop() - fc.Advance(1 * time.Millisecond) - timeout := time.NewTimer(500 * time.Millisecond) - defer timeout.Stop() - select { case <-timer.Chan(): - // Pass - case <-timeout.C: + case <-ctx.Done(): t.Fatalf("Timer didn't detect the clock advance!") } } func TestFakeClockTimer_Race2(t *testing.T) { + t.Parallel() fc := NewFakeClock() timer := fc.NewTimer(5 * time.Second) for i := 0; i < 100; i++ { @@ -140,12 +136,14 @@ func TestFakeClockTimer_ResetRace(t *testing.T) { d := 5 * time.Second var times []time.Time timer := fc.NewTimer(d) - done := make(chan struct{}) + timerStopped := make(chan struct{}) + doneAddingTimes := make(chan struct{}) go func() { + defer close(doneAddingTimes) for { select { - case <-done: - break + case <-timerStopped: + return case now := <-timer.Chan(): times = append(times, now) } @@ -158,9 +156,10 @@ func TestFakeClockTimer_ResetRace(t *testing.T) { fc.Advance(d) } timer.Stop() - close(done) + close(timerStopped) + <-doneAddingTimes // Prevent race condition on times. for i := 1; i < len(times); i++ { - if times[i-1] == times[i] { + if times[i-1].Equal(times[i]) { t.Fatalf("Timer repeatedly reported the same time.") } } @@ -177,9 +176,9 @@ func TestFakeClockTimer_ZeroResetDoesNotBlock(t *testing.T) { } func TestAfterFunc_Concurrent(t *testing.T) { - timeout := time.NewTimer(500 * time.Millisecond) - defer timeout.Stop() - + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() fc := NewFakeClock() blocker := make(chan struct{}) ch := make(chan int) @@ -202,7 +201,7 @@ func TestAfterFunc_Concurrent(t *testing.T) { if a != 111 { t.Fatalf("Expected 111, got %d", a) } - case <-timeout.C: + case <-ctx.Done(): t.Fatalf("Expected signal hasn't arrived") } close(blocker) @@ -211,7 +210,7 @@ func TestAfterFunc_Concurrent(t *testing.T) { if a != 222 { t.Fatalf("Expected 222, got %d", a) } - case <-timeout.C: + case <-ctx.Done(): t.Fatalf("Expected signal hasn't arrived") } select { @@ -219,7 +218,7 @@ func TestAfterFunc_Concurrent(t *testing.T) { if a != 222 { t.Fatalf("Expected 222, got %d", a) } - case <-timeout.C: + case <-ctx.Done(): t.Fatalf("Expected signal hasn't arrived") } } From e9b4e2cce7b34d2597c6bed3fdcc47377f91b9e4 Mon Sep 17 00:00:00 2001 From: Darren Jacques Date: Fri, 30 Dec 2022 10:12:51 -0800 Subject: [PATCH 18/26] Move logic to fakeClock to reduce Ticker and Timer code complexity. - Minimize the custom logic of Tickers and Timers. - Don't allow Tickers and Timers access to the lock of the fakeClock that controls them. Use closures instead. - Use a common `expirer` interface for Timers and Tickers, rather than resuing Timers for both. - Create a `firer` struct to handle commonalities between Tickers and Timers. - Use a single function & code path for Timers, regardless of if they are made with NewTimer or AfterFunc. - Use the mutex hat pattern and add documentation on what the mutex protects. - Change field names from "sleeper" to "waiter" and add documentation. - Various documentation updates. --- clockwork.go | 261 +++++++++++++++++++++++++++++++--------------- clockwork_test.go | 6 +- ticker.go | 47 ++++++--- timer.go | 85 +++++---------- 4 files changed, 239 insertions(+), 160 deletions(-) diff --git a/clockwork.go b/clockwork.go index f892ea0..e235227 100644 --- a/clockwork.go +++ b/clockwork.go @@ -1,12 +1,13 @@ package clockwork import ( + "sort" "sync" "time" ) -// Clock provides an interface that packages can use instead of directly -// using the time module, so that chronology-related behavior can be tested +// Clock provides an interface that packages can use instead of directly using +// the [time] module, so that chronology-related behavior can be tested. type Clock interface { After(d time.Duration) <-chan time.Time Sleep(d time.Duration) @@ -17,16 +18,20 @@ type Clock interface { AfterFunc(d time.Duration, f func()) Timer } -// FakeClock provides an interface for a clock which can be -// manually advanced through time +// FakeClock provides an interface for a clock which can be manually advanced +// through time. +// +// FakeClock maintains a list of "waiters," which consists of all callers +// waiting on the underlying clock (i.e. Tickers and Timers including callers of +// Sleep or After). Users can call BlockUntil to block until the clock has an +// expected number of waiters. type FakeClock interface { Clock // Advance advances the FakeClock to a new point in time, ensuring any existing - // sleepers are notified appropriately before returning + // waiters are notified appropriately before returning. Advance(d time.Duration) - // BlockUntil will block until the FakeClock has the given number of - // sleepers (callers of Sleep or After) - BlockUntil(n int) + // BlockUntil blocks until the FakeClock has the given number of waiters. + BlockUntil(waiters int) } // NewRealClock returns a Clock which simply delegates calls to the actual time @@ -39,8 +44,8 @@ func NewRealClock() Clock { // manually advanced through time for testing. The initial time of the // FakeClock will be an arbitrary non-zero time. func NewFakeClock() FakeClock { - // use a fixture that does not fulfill Time.IsZero() - return NewFakeClockAt(time.Date(1984, time.April, 4, 0, 0, 0, 0, time.UTC)) + // Use the standard layout time to avoid fulfilling Time.IsZero(). + return NewFakeClockAt(time.Date(2006, time.January, 2, 15, 4, 5, 0, time.UTC)) } // NewFakeClockAt returns a FakeClock initialised at the given time.Time. @@ -81,47 +86,55 @@ func (rc *realClock) AfterFunc(d time.Duration, f func()) Timer { } type fakeClock struct { - sleepers sleepers + // l protects all attributes of the clock, including all attributes of all + // waiters and blockers. + l sync.RWMutex + waiters []expirer blockers []*blocker time time.Time - - l sync.RWMutex } -// blocker represents a caller of BlockUntil +// blocker is a caller of BlockUntil. type blocker struct { count int - ch chan struct{} + + // ch is closed when the underlying clock has the specificed number of blockers. + ch chan struct{} } -type sleepers []*fakeTimer +// expirer is a timer or ticker that expires at some point in the future. +type expirer interface { + // expire the expirer at the given time, returning the desired duration until + // the next expiration, if any. + expire(now time.Time) (next *time.Duration) -func (s sleepers) Len() int { return len(s) } -func (s sleepers) Less(i, j int) bool { return s[i].until.Before(s[j].until) } -func (s sleepers) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + // Get and set the expiration time. + expiry() time.Time + setExpiry(time.Time) +} -// After mimics time.After; it waits for the given duration to elapse on the +// After mimics [time.After]; it waits for the given duration to elapse on the // fakeClock, then sends the current time on the returned channel. func (fc *fakeClock) After(d time.Duration) <-chan time.Time { return fc.NewTimer(d).Chan() } -// notifyBlockers notifies all the blockers waiting for the current number of -// sleepers or fewer. +// notifyBlockers closes the receive channel for all blockers waiting for the +// current number of waiters (or fewer). func (fc *fakeClock) notifyBlockers() { - var stillWaiting []*blocker - count := len(fc.sleepers) + var blocked []*blocker + count := len(fc.waiters) for _, b := range fc.blockers { if b.count <= count { close(b.ch) - } else { - stillWaiting = append(stillWaiting, b) + continue } + blocked = append(blocked, b) } - fc.blockers = stillWaiting + fc.blockers = blocked } -// Sleep blocks until the given duration has passed on the fakeClock +// Sleep blocks until the given duration has past on the fakeClock. func (fc *fakeClock) Sleep(d time.Duration) { <-fc.After(d) } @@ -129,95 +142,99 @@ func (fc *fakeClock) Sleep(d time.Duration) { // Now returns the current time of the fakeClock func (fc *fakeClock) Now() time.Time { fc.l.RLock() - t := fc.time - fc.l.RUnlock() - return t + defer fc.l.RUnlock() + return fc.time } -// Since returns the duration that has passed since the given time on the fakeClock +// Since returns the duration that has past since the given time on the +// fakeClock. func (fc *fakeClock) Since(t time.Time) time.Duration { return fc.Now().Sub(t) } -// NewTicker returns a ticker that will expire only after calls to FakeClock -// Advance have moved the clock past the given duration. +// NewTicker returns a Ticker that will expire only after calls to +// fakeClock.Advance() have moved the clock past the given duration. func (fc *fakeClock) NewTicker(d time.Duration) Ticker { - c := make(chan time.Time, 1) var ft *fakeTicker ft = &fakeTicker{ - fakeTimer: fakeTimer{ - c: c, - clock: fc, - callback: func(now time.Time) { - ft.resetImpl(d) - select { - case c <- now: - default: - } - }, - }, + firer: newFirer(), + d: d, + reset: func(d time.Duration) { fc.set(ft, d) }, + stop: func() { fc.stop(ft) }, } - ft.Reset(d) + fc.set(ft, d) return ft } -// NewTimer returns a timer that will fire only after calls to FakeClock Advance -// have moved the clock past the given duration. +// NewTimer returns a Timer that will fire only after calls to +// fakeClock.Advance() have moved the clock past the given duration. func (fc *fakeClock) NewTimer(d time.Duration) Timer { - c := make(chan time.Time, 1) - ft := &fakeTimer{ - c: c, - clock: fc, - callback: func(now time.Time) { - select { - case c <- now: - default: - } - }, - } - ft.Reset(d) - return ft + return fc.newTimer(d, nil) } -// AfterFunc returns a timer that will invoke the given function only after -// calls to fakeClock Advance have moved the clock passed the given duration. +// AfterFunc mimics [time.AfterFunc]; it returns a Timer that will invoke the +// given function only after calls to fakeClock.Advance() have moved the clock +// past the given duration. func (fc *fakeClock) AfterFunc(d time.Duration, f func()) Timer { - ft := &fakeTimer{ - clock: fc, - callback: func(_ time.Time) { go f() }, + return fc.newTimer(d, f) +} + +// newTimer returns a new timer, using an optional afterFunc. +func (fc *fakeClock) newTimer(d time.Duration, afterfunc func()) *fakeTimer { + var ft *fakeTimer + ft = &fakeTimer{ + firer: newFirer(), + reset: func(d time.Duration) bool { + fc.l.Lock() + defer fc.l.Unlock() + // fc.l must be held across the calls to stopExpirer & setExpirer. + stopped := fc.stopExpirer(ft) + fc.setExpirer(ft, d) + return stopped + }, + stop: func() bool { return fc.stop(ft) }, + + afterFunc: afterfunc, } - ft.Reset(d) + fc.set(ft, d) return ft } -// Advance advances fakeClock to a new point in time, ensuring channels from any -// previous invocations of After are notified appropriately before returning +// Advance advances fakeClock to a new point in time, ensuring waiters and +// blockers are notified appropriately before returning. func (fc *fakeClock) Advance(d time.Duration) { fc.l.Lock() defer fc.l.Unlock() end := fc.time.Add(d) - // While first sleeper is ready to wake, wake it. We don't iterate because the - // callback of the sleeper might register a new sleeper, so the list of - // sleepers might change as we execute this. - for len(fc.sleepers) > 0 && !end.Before(fc.sleepers[0].until) { - first := fc.sleepers[0] - fc.sleepers = fc.sleepers[1:] - fc.time = first.until - first.callback(first.until) + // Expire the earliest waiter until the earliest waiter's expiration is after + // end. + // + // We don't iterate because the callback of the waiter might register a new + // waiter, so the list of waiters might change as we execute this. + for len(fc.waiters) > 0 && !end.Before(fc.waiters[0].expiry()) { + w := fc.waiters[0] + fc.waiters = fc.waiters[1:] + + // Use the waiter's expriation as the current time for this expiration. + now := w.expiry() + fc.time = now + if d := w.expire(now); d != nil { + // Set the new exipration if needed. + fc.setExpirer(w, *d) + } } fc.time = end } -// BlockUntil will block until the fakeClock has the given number of sleepers -// (callers of Sleep or After) +// BlockUntil blocks until the fakeClock has the given number of waiters. func (fc *fakeClock) BlockUntil(n int) { fc.l.Lock() - // Fast path: we already have >= n sleepers. - if len(fc.sleepers) >= n { + // Fast path: we already have >= n waiters. + if len(fc.waiters) >= n { fc.l.Unlock() return } - // Otherwise, we have < n sleepers. Set up a new blocker to wait for more. + // Set up a new blocker to wait for more waiters. b := &blocker{ count: n, ch: make(chan struct{}), @@ -226,3 +243,81 @@ func (fc *fakeClock) BlockUntil(n int) { fc.l.Unlock() <-b.ch } + +// stop stops an expirer, returning true if the expirer was stopped. +func (fc *fakeClock) stop(e expirer) bool { + fc.l.Lock() + defer fc.l.Unlock() + return fc.stopExpirer(e) +} + +// stopExpirer stops an expirer, returning true if the expirer was stopped. +// +// The caller must hold fc.l. +func (fc *fakeClock) stopExpirer(e expirer) bool { + for i, t := range fc.waiters { + if t == e { + // Remove element, maintaining order. + copy(fc.waiters[i:], fc.waiters[i+1:]) + fc.waiters[len(fc.waiters)-1] = nil + fc.waiters = fc.waiters[:len(fc.waiters)-1] + return true + } + } + return false +} + +// set sets an expirer to expire at a future point in time. +func (fc *fakeClock) set(e expirer, d time.Duration) { + fc.l.Lock() + defer fc.l.Unlock() + fc.setExpirer(e, d) +} + +// setExpirer sets an expirer to expire at a future point in time. +// +// The caller must hold fc.l. +func (fc *fakeClock) setExpirer(e expirer, d time.Duration) { + if d.Nanoseconds() <= 0 { + // special case - trigger immediately, never reset. + // + // TODO: Explain what cases this covers. + e.expire(fc.time) + return + } + // Add the expirer to the set of waiters and notify any blockers. + e.setExpiry(fc.time.Add(d)) + fc.waiters = append(fc.waiters, e) + sort.Slice(fc.waiters, func(i int, j int) bool { + return fc.waiters[i].expiry().Before(fc.waiters[j].expiry()) + }) + fc.notifyBlockers() +} + +// firer is used by fakeTimer and fakeTicker used to help implement expirer. +type firer struct { + // The channel associated with the firer, used to send expriation times. + c chan time.Time + + // The time when the firer expires. Only meaningful if the firer is currently + // one of a fakeClock's waiters. + exp time.Time +} + +func newFirer() firer { + return firer{c: make(chan time.Time, 1)} +} + +func (f *firer) Chan() <-chan time.Time { + return f.c +} + +// expiry implements expirer. +func (f *firer) expiry() time.Time { + return f.exp +} + +// setExpiry implements expirer. +func (f *firer) setExpiry(t time.Time) { + f.exp = t +} diff --git a/clockwork_test.go b/clockwork_test.go index 4b84ec6..a0b9f10 100644 --- a/clockwork_test.go +++ b/clockwork_test.go @@ -89,7 +89,7 @@ func TestNotifyBlockers(t *testing.T) { b5 := &blocker{10, make(chan struct{})} fc := fakeClock{ blockers: []*blocker{b1, b2, b3, b4, b5}, - sleepers: sleepers{nil, nil}, + waiters: []expirer{nil, nil}, } fc.notifyBlockers() if n := len(fc.blockers); n != 3 { @@ -105,8 +105,8 @@ func TestNotifyBlockers(t *testing.T) { case <-time.After(time.Second): t.Fatalf("timed out waiting for channel close!") } - for len(fc.sleepers) < 10 { - fc.sleepers = append(fc.sleepers, nil) + for len(fc.waiters) < 10 { + fc.waiters = append(fc.waiters, nil) } fc.notifyBlockers() if n := len(fc.blockers); n != 0 { diff --git a/ticker.go b/ticker.go index d7b6676..f4a1932 100644 --- a/ticker.go +++ b/ticker.go @@ -1,29 +1,48 @@ package clockwork -import ( - "time" -) - -// Ticker provides an interface which can be used instead of directly -// using the ticker within the time module. The real-time ticker t -// provides ticks through t.C which becomes now t.Chan() to make -// this channel requirement definable in this interface. +import "time" + +// Ticker provides an interface which can be used instead of directly using +// [time.Ticker]. The real-time ticker t provides ticks through t.C which +// becomes t.Chan() to make this channel requirement definable in this +// interface. type Ticker interface { Chan() <-chan time.Time + Reset(d time.Duration) Stop() } type realTicker struct{ *time.Ticker } -func (rt realTicker) Chan() <-chan time.Time { - return rt.C +func (r realTicker) Chan() <-chan time.Time { + return r.C } type fakeTicker struct { - fakeTimer + firer + + // reset and stop provide the implmenetation of the respective exported + // functions. + reset func(d time.Duration) + stop func() + + // The duration of the ticker. + d time.Duration +} + +func (f *fakeTicker) Reset(d time.Duration) { + f.reset(d) +} + +func (f *fakeTicker) Stop() { + f.stop() } -func (ft *fakeTicker) Stop() { - // Ignore returned bool to make signature match. - ft.fakeTimer.Stop() +func (f *fakeTicker) expire(now time.Time) *time.Duration { + // Never block on expiration. + select { + case f.c <- now: + default: + } + return &f.d } diff --git a/timer.go b/timer.go index e3a9df4..6f928b3 100644 --- a/timer.go +++ b/timer.go @@ -1,88 +1,53 @@ package clockwork -import ( - "sort" - "time" -) +import "time" -// Timer provides an interface which can be used instead of directly -// using the timer within the time module. The real-time timer t -// provides events through t.C which becomes now t.Chan() to make -// this channel requirement definable in this interface. +// Timer provides an interface which can be used instead of directly using +// [time.Timer]. The real-time timer t provides events through t.C which becomes +// t.Chan() to make this channel requirement definable in this interface. type Timer interface { Chan() <-chan time.Time Reset(d time.Duration) bool Stop() bool } -type realTimer struct { - *time.Timer -} +type realTimer struct{ *time.Timer } func (r realTimer) Chan() <-chan time.Time { return r.C } type fakeTimer struct { - // The channel associated with this timer. Only relevant for timers that - // originate from NewTimer or similar, i.e. not for AfterFunc or other - // internal usage. - c chan time.Time - - // The fake clock driving events for this timer. - clock *fakeClock + firer - // The time when the timer expires. Only meaningful if the timer is currently - // one of the fake clock's sleepers. - until time.Time + // reset and stop provide the implmenetation of the respective exported + // functions. + reset func(d time.Duration) bool + stop func() bool - // callback will get called synchronously with the lock of the clock being - // held. It receives the time at which the timer expired. - callback func(time.Time) + // If present when the timer fires, the timer calls afterFunc in its own + // goroutine rather than sending the time on Chan(). + afterFunc func() } -func (f *fakeTimer) Chan() <-chan time.Time { - return f.c +func (f *fakeTimer) Reset(d time.Duration) bool { + return f.reset(d) } func (f *fakeTimer) Stop() bool { - f.clock.l.Lock() - defer f.clock.l.Unlock() - return f.stopImpl() + return f.stop() } -func (f *fakeTimer) stopImpl() bool { - for i, t := range f.clock.sleepers { - if t == f { - // Remove element, maintaining order. - copy(f.clock.sleepers[i:], f.clock.sleepers[i+1:]) - f.clock.sleepers[len(f.clock.sleepers)-1] = nil - f.clock.sleepers = f.clock.sleepers[:len(f.clock.sleepers)-1] - return true - } +func (f *fakeTimer) expire(now time.Time) *time.Duration { + if f.afterFunc != nil { + go f.afterFunc() + return nil } - return false -} - -func (f *fakeTimer) Reset(d time.Duration) bool { - f.clock.l.Lock() - defer f.clock.l.Unlock() - stopped := f.stopImpl() - f.resetImpl(d) - return stopped -} -func (f *fakeTimer) resetImpl(d time.Duration) { - now := f.clock.time - if d.Nanoseconds() <= 0 { - // special case - trigger immediately - f.callback(now) - } else { - // otherwise, add to the set of sleepers - f.until = f.clock.time.Add(d) - f.clock.sleepers = append(f.clock.sleepers, f) - sort.Sort(f.clock.sleepers) - // and notify any blockers - f.clock.notifyBlockers() + // Never block on expiration. + select { + case f.c <- now: + default: } + return nil } From 27067ce12c06a07851ab452a90265274778e02bc Mon Sep 17 00:00:00 2001 From: Darren Jacques Date: Fri, 30 Dec 2022 10:12:51 -0800 Subject: [PATCH 19/26] Move logic to fakeClock to reduce Ticker and Timer code complexity. - Minimize the custom logic of Tickers and Timers. - Don't allow Tickers and Timers access to the lock of the fakeClock that controls them. Use closures instead. - Use a common `expirer` interface for Timers and Tickers, rather than resuing Timers for both. - Create a `firer` struct to handle commonalities between Tickers and Timers. - Use a single function & code path for Timers, regardless of if they are made with NewTimer or AfterFunc. - Use the mutex hat pattern and add documentation on what the mutex protects. - Change field names from "sleeper" to "waiter" and add documentation. - Various documentation updates. - Update go.mod to reflect requriement for go 1.15. --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 4f4bb16..507295d 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/jonboulle/clockwork -go 1.13 +go 1.15 From d2cf3dd0d364ab6dfe84ecb551fc5ff8269ff514 Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Mon, 16 Jan 2023 09:20:08 +0100 Subject: [PATCH 20/26] chore: bump minimum Go version to 1.15 Signed-off-by: Mark Sagi-Kazar --- .github/workflows/ci.yaml | 4 +--- README.md | 4 ++-- go.mod | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 993c0d0..1b80cdb 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,9 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ['1.11', '1.12', '1.13', '1.14', '1.15', '1.16', '1.17', '1.18'] - env: - GOFLAGS: -mod=readonly + go: ['1.15', '1.16', '1.17', '1.18', '1.19'] steps: - name: Set up Go diff --git a/README.md b/README.md index cad6083..42970da 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge-flat.svg)](https://github.com/avelino/awesome-go#utilities) -[![GitHub Workflow Status](https://img.shields.io/github/workflow/status/jonboulle/clockwork/CI?style=flat-square)](https://github.com/jonboulle/clockwork/actions?query=workflow%3ACI) +[![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/jonboulle/clockwork/ci.yaml?style=flat-square)](https://github.com/jonboulle/clockwork/actions?query=workflow%3ACI) [![Go Report Card](https://goreportcard.com/badge/github.com/jonboulle/clockwork?style=flat-square)](https://goreportcard.com/report/github.com/jonboulle/clockwork) -![Go Version](https://img.shields.io/badge/go%20version-%3E=1.11-61CFDD.svg?style=flat-square) +![Go Version](https://img.shields.io/badge/go%20version-%3E=1.15-61CFDD.svg?style=flat-square) [![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/mod/github.com/jonboulle/clockwork) **A simple fake clock for Go.** diff --git a/go.mod b/go.mod index 4f4bb16..507295d 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/jonboulle/clockwork -go 1.13 +go 1.15 From b06dd11a7ee730f615b6b12990d0d92c03b20a72 Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Mon, 16 Jan 2023 09:28:04 +0100 Subject: [PATCH 21/26] ci: disable fail fast on matrix build Signed-off-by: Mark Sagi-Kazar --- .github/workflows/ci.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1b80cdb..08aaf08 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -11,6 +11,7 @@ jobs: name: Test runs-on: ubuntu-latest strategy: + fail-fast: false matrix: go: ['1.15', '1.16', '1.17', '1.18', '1.19'] From 8fdc183e99a0d3bda3dcd0f9a93283881be34d6c Mon Sep 17 00:00:00 2001 From: Luwei Ge Date: Tue, 7 Mar 2023 17:41:01 +0000 Subject: [PATCH 22/26] fix typo --- ticker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ticker.go b/ticker.go index f4a1932..b68e4d7 100644 --- a/ticker.go +++ b/ticker.go @@ -21,7 +21,7 @@ func (r realTicker) Chan() <-chan time.Time { type fakeTicker struct { firer - // reset and stop provide the implmenetation of the respective exported + // reset and stop provide the implementation of the respective exported // functions. reset func(d time.Duration) stop func() From 3c735640565e0ed78321abbd650be4cbec488026 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 15 Mar 2023 16:00:56 +0000 Subject: [PATCH 23/26] build(deps): bump actions/setup-go from 3 to 4 Bumps [actions/setup-go](https://github.com/actions/setup-go) from 3 to 4. - [Release notes](https://github.com/actions/setup-go/releases) - [Commits](https://github.com/actions/setup-go/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/setup-go dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 08aaf08..8c8b5a6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,7 +17,7 @@ jobs: steps: - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v4 with: go-version: ${{ matrix.go }} From bd7c02c3881070f1896909eeb2ea6509688085bd Mon Sep 17 00:00:00 2001 From: Darren Jacques Date: Sat, 28 Jan 2023 14:11:17 -0800 Subject: [PATCH 24/26] Add BlockUntilContext, which respects context cancellation. BlockUntil is easy to misjudge and when callers get that wrong, the test blocks forever and eventually times out. Also deletes notifyBlockers and its test, inlining this function at its only call point. --- clockwork.go | 69 ++++++++++++++++++++++++++++------------- clockwork_test.go | 79 ++++++++++++++++++----------------------------- 2 files changed, 77 insertions(+), 71 deletions(-) diff --git a/clockwork.go b/clockwork.go index e235227..d80395c 100644 --- a/clockwork.go +++ b/clockwork.go @@ -1,6 +1,7 @@ package clockwork import ( + "context" "sort" "sync" "time" @@ -119,22 +120,7 @@ func (fc *fakeClock) After(d time.Duration) <-chan time.Time { return fc.NewTimer(d).Chan() } -// notifyBlockers closes the receive channel for all blockers waiting for the -// current number of waiters (or fewer). -func (fc *fakeClock) notifyBlockers() { - var blocked []*blocker - count := len(fc.waiters) - for _, b := range fc.blockers { - if b.count <= count { - close(b.ch) - continue - } - blocked = append(blocked, b) - } - fc.blockers = blocked -} - -// Sleep blocks until the given duration has past on the fakeClock. +// Sleep blocks until the given duration has passed on the fakeClock. func (fc *fakeClock) Sleep(d time.Duration) { <-fc.After(d) } @@ -146,7 +132,7 @@ func (fc *fakeClock) Now() time.Time { return fc.time } -// Since returns the duration that has past since the given time on the +// Since returns the duration that has passed since the given time on the // fakeClock. func (fc *fakeClock) Since(t time.Time) time.Duration { return fc.Now().Sub(t) @@ -227,12 +213,41 @@ func (fc *fakeClock) Advance(d time.Duration) { } // BlockUntil blocks until the fakeClock has the given number of waiters. +// +// Prefer BlockUntilContext, which offers context cancellation to prevent +// deadlock. +// +// Deprecation warning: This function might be deprecated in later versions. func (fc *fakeClock) BlockUntil(n int) { + b := fc.newBlocker(n) + if b == nil { + return + } + <-b.ch +} + +// BlockUntilContext blocks until the fakeClock has the given number of waiters +// or the context is cancelled. +func (fc *fakeClock) BlockUntilContext(ctx context.Context, n int) error { + b := fc.newBlocker(n) + if b == nil { + return nil + } + + select { + case <-b.ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (fc *fakeClock) newBlocker(n int) *blocker { fc.l.Lock() + defer fc.l.Unlock() // Fast path: we already have >= n waiters. if len(fc.waiters) >= n { - fc.l.Unlock() - return + return nil } // Set up a new blocker to wait for more waiters. b := &blocker{ @@ -240,8 +255,7 @@ func (fc *fakeClock) BlockUntil(n int) { ch: make(chan struct{}), } fc.blockers = append(fc.blockers, b) - fc.l.Unlock() - <-b.ch + return b } // stop stops an expirer, returning true if the expirer was stopped. @@ -291,7 +305,18 @@ func (fc *fakeClock) setExpirer(e expirer, d time.Duration) { sort.Slice(fc.waiters, func(i int, j int) bool { return fc.waiters[i].expiry().Before(fc.waiters[j].expiry()) }) - fc.notifyBlockers() + + // Notify blockers of our new waiter. + var blocked []*blocker + count := len(fc.waiters) + for _, b := range fc.blockers { + if b.count <= count { + close(b.ch) + continue + } + blocked = append(blocked, b) + } + fc.blockers = blocked } // firer is used by fakeTimer and fakeTicker used to help implement expirer. diff --git a/clockwork_test.go b/clockwork_test.go index 51d6ae0..d76b1d3 100644 --- a/clockwork_test.go +++ b/clockwork_test.go @@ -1,6 +1,8 @@ package clockwork import ( + "context" + "errors" "reflect" "testing" "time" @@ -87,55 +89,6 @@ func TestFakeClockAfter(t *testing.T) { } } -func TestNotifyBlockers(t *testing.T) { - t.Parallel() - b1 := &blocker{1, make(chan struct{})} - b2 := &blocker{2, make(chan struct{})} - b3 := &blocker{5, make(chan struct{})} - b4 := &blocker{10, make(chan struct{})} - b5 := &blocker{10, make(chan struct{})} - fc := fakeClock{ - blockers: []*blocker{b1, b2, b3, b4, b5}, - waiters: []expirer{nil, nil}, - } - fc.notifyBlockers() - if n := len(fc.blockers); n != 3 { - t.Fatalf("got %d blockers, want %d", n, 3) - } - select { - case <-b1.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for channel close!") - } - select { - case <-b2.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for channel close!") - } - for len(fc.waiters) < 10 { - fc.waiters = append(fc.waiters, nil) - } - fc.notifyBlockers() - if n := len(fc.blockers); n != 0 { - t.Fatalf("got %d blockers, want %d", n, 0) - } - select { - case <-b3.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for channel close!") - } - select { - case <-b4.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for channel close!") - } - select { - case <-b5.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for channel close!") - } -} - func TestNewFakeClock(t *testing.T) { t.Parallel() fc := NewFakeClock() @@ -186,6 +139,34 @@ func TestTwoBlockersOneBlock(t *testing.T) { ft2.Stop() } +func TestBlockUntilContext(t *testing.T) { + t.Parallel() + fc := &fakeClock{} + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + blockCtx, cancelBlock := context.WithCancel(ctx) + errCh := make(chan error) + + go func() { + select { + case errCh <- fc.BlockUntilContext(blockCtx, 2): + case <-ctx.Done(): // Error case, captured below. + } + }() + cancelBlock() + + select { + case err := <-errCh: + if !errors.Is(err, context.Canceled) { + t.Errorf("BlockUntilContext returned %v, want context.Canceled.", err) + } + case <-ctx.Done(): + t.Errorf("Never receved error on context cancellation.") + } +} + func TestAfterDeliveryInOrder(t *testing.T) { t.Parallel() fc := &fakeClock{} From 1dd6ef9296f1eea351758381aa7acaaff6d24c13 Mon Sep 17 00:00:00 2001 From: Darren Jacques Date: Sat, 28 Jan 2023 13:41:18 -0800 Subject: [PATCH 25/26] Use current time for NewFakeClock's initial value This prevents callers from building tests that rely on implementation details. Typically by expecting a string containing a static, default value of NewFakeClock(). This philosophy of preventing users from relying on implementation internals is similar to Go's random map iteration behavior. For users who depend on the initial clock time, NewFakeClockAt provides the behavior they need, and makes their expectations clear in tests. --- clockwork.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clockwork.go b/clockwork.go index d80395c..3206b36 100644 --- a/clockwork.go +++ b/clockwork.go @@ -43,10 +43,11 @@ func NewRealClock() Clock { // NewFakeClock returns a FakeClock implementation which can be // manually advanced through time for testing. The initial time of the -// FakeClock will be an arbitrary non-zero time. +// FakeClock will be the current system time. +// +// Tests that require a deterministic time must use NewFakeClockAt. func NewFakeClock() FakeClock { - // Use the standard layout time to avoid fulfilling Time.IsZero(). - return NewFakeClockAt(time.Date(2006, time.January, 2, 15, 4, 5, 0, time.UTC)) + return NewFakeClockAt(time.Now()) } // NewFakeClockAt returns a FakeClock initialised at the given time.Time. From 2c5b10631ecfea63fc202bc8409d7dc5d4713ee7 Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Sat, 1 Apr 2023 20:03:33 +0200 Subject: [PATCH 26/26] ci: add Go 1.20 as a build target Signed-off-by: Mark Sagi-Kazar --- .github/workflows/ci.yaml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8c8b5a6..9c2ec25 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -2,8 +2,7 @@ name: CI on: push: - branches: - - master + branches: [master] pull_request: jobs: @@ -13,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - go: ['1.15', '1.16', '1.17', '1.18', '1.19'] + go: ['1.15', '1.16', '1.17', '1.18', '1.19', '1.20'] steps: - name: Set up Go