Skip to content

Commit f696ff4

Browse files
committed
Add queue util
1 parent ea55f84 commit f696ff4

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed

cli/cliutil/queue.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package cliutil
2+
3+
import (
4+
"sync"
5+
6+
"golang.org/x/xerrors"
7+
)
8+
9+
// Queue is a FIFO queue with a fixed size. If the size is exceeded, the first
10+
// item is dropped.
11+
type Queue[T any] struct {
12+
cond *sync.Cond
13+
items []T
14+
mu sync.Mutex
15+
size int
16+
closed bool
17+
}
18+
19+
// NewQueue creates a queue with the given size.
20+
func NewQueue[T any](size int) *Queue[T] {
21+
q := &Queue[T]{
22+
items: make([]T, 0, size),
23+
size: size,
24+
}
25+
q.cond = sync.NewCond(&q.mu)
26+
return q
27+
}
28+
29+
// Close aborts any pending pops and makes future pushes error.
30+
func (q *Queue[T]) Close() {
31+
q.mu.Lock()
32+
defer q.mu.Unlock()
33+
q.closed = true
34+
q.cond.Broadcast()
35+
}
36+
37+
// Push adds an item to the queue. If closed, returns an error.
38+
func (q *Queue[T]) Push(x T) error {
39+
q.mu.Lock()
40+
defer q.mu.Unlock()
41+
if q.closed {
42+
return xerrors.New("queue has been closed")
43+
}
44+
if len(q.items) >= q.size {
45+
q.items = q.items[1:]
46+
}
47+
q.items = append(q.items, x)
48+
q.cond.Broadcast()
49+
return nil
50+
}
51+
52+
// Pop removes and returns the first item from the queue, waiting until there is
53+
// something to pop if necessary. If closed, returns false.
54+
func (q *Queue[T]) Pop() (T, bool) {
55+
var head T
56+
q.mu.Lock()
57+
defer q.mu.Unlock()
58+
for len(q.items) == 0 && !q.closed {
59+
q.cond.Wait()
60+
}
61+
if q.closed {
62+
return head, false
63+
}
64+
head, q.items = q.items[0], q.items[1:]
65+
return head, true
66+
}
67+
68+
func (q *Queue[T]) Len() int {
69+
q.mu.Lock()
70+
defer q.mu.Unlock()
71+
return len(q.items)
72+
}

cli/cliutil/queue_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package cliutil_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/coder/coder/v2/cli/cliutil"
10+
)
11+
12+
func TestQueue(t *testing.T) {
13+
t.Parallel()
14+
15+
t.Run("DropsFirst", func(t *testing.T) {
16+
t.Parallel()
17+
18+
q := cliutil.NewQueue[int](10)
19+
require.Equal(t, 0, q.Len())
20+
21+
for i := 0; i < 20; i++ {
22+
err := q.Push(i)
23+
require.NoError(t, err)
24+
if i < 10 {
25+
require.Equal(t, i+1, q.Len())
26+
} else {
27+
require.Equal(t, 10, q.Len())
28+
}
29+
}
30+
31+
val, ok := q.Pop()
32+
require.True(t, ok)
33+
require.Equal(t, 10, val)
34+
require.Equal(t, 9, q.Len())
35+
})
36+
37+
t.Run("Pop", func(t *testing.T) {
38+
t.Parallel()
39+
40+
q := cliutil.NewQueue[int](10)
41+
for i := 0; i < 5; i++ {
42+
err := q.Push(i)
43+
require.NoError(t, err)
44+
}
45+
46+
// No blocking, should pop immediately.
47+
for i := 0; i < 5; i++ {
48+
val, ok := q.Pop()
49+
require.True(t, ok)
50+
require.Equal(t, i, val)
51+
}
52+
53+
// Pop should block until the next push.
54+
go func() {
55+
err := q.Push(55)
56+
assert.NoError(t, err)
57+
}()
58+
59+
item, ok := q.Pop()
60+
require.True(t, ok)
61+
require.Equal(t, 55, item)
62+
})
63+
64+
t.Run("Close", func(t *testing.T) {
65+
t.Parallel()
66+
67+
q := cliutil.NewQueue[int](10)
68+
69+
done := make(chan bool)
70+
go func() {
71+
_, ok := q.Pop()
72+
done <- ok
73+
}()
74+
75+
q.Close()
76+
77+
require.False(t, <-done)
78+
79+
_, ok := q.Pop()
80+
require.False(t, ok)
81+
82+
err := q.Push(10)
83+
require.Error(t, err)
84+
})
85+
}

0 commit comments

Comments
 (0)