Skip to content

Commit b8da4e0

Browse files
committed
bundler: use weighted semaphore to enforced byte limit
The weighted semaphore class, since it's context-aware, can replace the previous complex mechanism involving channels. It is FIFO, so it also solves the starvation problem. Change-Id: If758576c6284abd8c9638db10600d9a9e6699c07 Reviewed-on: https://code-review.googlesource.com/11520 Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent 8d00a49 commit b8da4e0

File tree

1 file changed

+43
-43
lines changed

1 file changed

+43
-43
lines changed

support/bundler/bundler.go

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
"golang.org/x/net/context"
31+
"golang.org/x/sync/semaphore"
3132
)
3233

3334
const (
@@ -74,11 +75,11 @@ type Bundler struct {
7475
itemSliceZero reflect.Value // nil (zero value) for slice of items
7576
flushTimer *time.Timer // implements DelayThreshold
7677

77-
mu sync.Mutex
78-
spaceAvailable chan struct{} // closed and replaced when space is available
79-
bufferedSize int // total bytes buffered
80-
curBundle bundle // incoming items added to this bundle
81-
handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns
78+
mu sync.Mutex
79+
sem *semaphore.Weighted // enforces BufferedByteLimit
80+
semOnce sync.Once
81+
curBundle bundle // incoming items added to this bundle
82+
handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns
8283
}
8384

8485
type bundle struct {
@@ -94,6 +95,9 @@ type bundle struct {
9495
// handler is a function that will be called on each bundle. If itemExample is
9596
// of type T, the argument to handler is of type []T. handler is always called
9697
// sequentially for each bundle, and never in parallel.
98+
//
99+
// Configure the Bundler by setting its thresholds and limits before calling
100+
// any of its methods.
97101
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
98102
b := &Bundler{
99103
DelayThreshold: DefaultDelayThreshold,
@@ -108,14 +112,24 @@ func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
108112
return b
109113
}
110114

115+
func (b *Bundler) sema() *semaphore.Weighted {
116+
// Create the semaphore lazily, because the user may set BufferedByteLimit
117+
// after NewBundler.
118+
b.semOnce.Do(func() {
119+
b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
120+
})
121+
return b.sem
122+
}
123+
111124
// Add adds item to the current bundle. It marks the bundle for handling and
112125
// starts a new one if any of the thresholds or limits are exceeded.
113126
//
114127
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
115128
// the item can never be handled. Add returns ErrOversizedItem in this case.
116129
//
117-
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
118-
// Add returns ErrOverflow.
130+
// If adding the item would exceed the maximum memory allowed
131+
// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
132+
// memory, Add returns ErrOverflow.
119133
//
120134
// Add never blocks.
121135
func (b *Bundler) Add(item interface{}, size int) error {
@@ -124,22 +138,23 @@ func (b *Bundler) Add(item interface{}, size int) error {
124138
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
125139
return ErrOversizedItem
126140
}
127-
b.mu.Lock()
128-
defer b.mu.Unlock()
129141
// If adding this item would exceed our allotted memory
130142
// footprint, we can't accept it.
131-
if b.bufferedSize+size > b.BufferedByteLimit {
143+
// (TryAcquire also returns false if anything is waiting on the semaphore,
144+
// so calls to Add and AddWait shouldn't be mixed.)
145+
if !b.sema().TryAcquire(int64(size)) {
132146
return ErrOverflow
133147
}
134-
b.addLocked(item, size)
148+
b.add(item, size)
135149
return nil
136150
}
137151

138-
// addLocked adds item to the current bundle. It marks the bundle for handling and
152+
// add adds item to the current bundle. It marks the bundle for handling and
139153
// starts a new one if any of the thresholds or limits are exceeded.
140-
//
141-
// addLocked is called with the lock held.
142-
func (b *Bundler) addLocked(item interface{}, size int) {
154+
func (b *Bundler) add(item interface{}, size int) {
155+
b.mu.Lock()
156+
defer b.mu.Unlock()
157+
143158
// If adding this item to the current bundle would cause it to exceed the
144159
// maximum bundle size, close the current bundle and start a new one.
145160
if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
@@ -148,7 +163,6 @@ func (b *Bundler) addLocked(item interface{}, size int) {
148163
// Add the item.
149164
b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
150165
b.curBundle.size += size
151-
b.bufferedSize += size
152166

153167
// Start a timer to flush the item if one isn't already running.
154168
// startFlushLocked clears the timer and closes the bundle at the same time,
@@ -177,31 +191,25 @@ func (b *Bundler) addLocked(item interface{}, size int) {
177191
//
178192
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
179193
// AddWait blocks until space is available or ctx is done.
194+
//
195+
// Calls to Add and AddWait should not be mixed on the same Bundler.
180196
func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
181197
// If this item exceeds the maximum size of a bundle,
182198
// we can never send it.
183199
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
184200
return ErrOversizedItem
185201
}
186-
b.mu.Lock()
187-
// If adding this item would exceed our allotted memory
188-
// footprint, block until space is available.
189-
// TODO(jba): avoid starvation of large items.
190-
for b.bufferedSize+size > b.BufferedByteLimit {
191-
if b.spaceAvailable == nil {
192-
b.spaceAvailable = make(chan struct{})
193-
}
194-
avail := b.spaceAvailable
195-
b.mu.Unlock()
196-
select {
197-
case <-ctx.Done():
198-
return ctx.Err()
199-
case <-avail:
200-
b.mu.Lock()
201-
}
202+
// If adding this item would exceed our allotted memory footprint, block
203+
// until space is available. The semaphore is FIFO, so there will be no
204+
// starvation.
205+
if err := b.sema().Acquire(ctx, int64(size)); err != nil {
206+
return err
202207
}
203-
b.addLocked(item, size)
204-
b.mu.Unlock()
208+
// Here, we've reserved space for item. Other goroutines can call AddWait
209+
// and even acquire space, but no one can take away our reservation
210+
// (assuming sem.Release is used correctly). So there is no race condition
211+
// resulting from locking the mutex after sem.Acquire returns.
212+
b.add(item, size)
205213
return nil
206214
}
207215

@@ -236,15 +244,7 @@ func (b *Bundler) startFlushLocked() {
236244

237245
go func() {
238246
defer func() {
239-
b.mu.Lock()
240-
b.bufferedSize -= bun.size
241-
avail := b.spaceAvailable
242-
b.spaceAvailable = nil
243-
b.mu.Unlock()
244-
245-
if avail != nil {
246-
close(avail)
247-
}
247+
b.sem.Release(int64(bun.size))
248248
close(done)
249249
}()
250250

0 commit comments

Comments
 (0)