From b8da4e0ac07cc88db17f50a60361d3f5c817fde0 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Sat, 18 Mar 2017 06:09:20 -0400 Subject: [PATCH] bundler: use weighted semaphore to enforced byte limit The weighted semaphore class, since it's context-aware, can replace the previous complex mechanism involving channels. It is FIFO, so it also solves the starvation problem. Change-Id: If758576c6284abd8c9638db10600d9a9e6699c07 Reviewed-on: https://code-review.googlesource.com/11520 Reviewed-by: Jonathan Amsterdam --- support/bundler/bundler.go | 86 +++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go index e25d2e88d9f..c4e4c9a8c20 100644 --- a/support/bundler/bundler.go +++ b/support/bundler/bundler.go @@ -28,6 +28,7 @@ import ( "time" "golang.org/x/net/context" + "golang.org/x/sync/semaphore" ) const ( @@ -74,11 +75,11 @@ type Bundler struct { itemSliceZero reflect.Value // nil (zero value) for slice of items flushTimer *time.Timer // implements DelayThreshold - mu sync.Mutex - spaceAvailable chan struct{} // closed and replaced when space is available - bufferedSize int // total bytes buffered - curBundle bundle // incoming items added to this bundle - handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns + mu sync.Mutex + sem *semaphore.Weighted // enforces BufferedByteLimit + semOnce sync.Once + curBundle bundle // incoming items added to this bundle + handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns } type bundle struct { @@ -94,6 +95,9 @@ type bundle struct { // handler is a function that will be called on each bundle. If itemExample is // of type T, the argument to handler is of type []T. handler is always called // sequentially for each bundle, and never in parallel. +// +// Configure the Bundler by setting its thresholds and limits before calling +// any of its methods. func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler { b := &Bundler{ DelayThreshold: DefaultDelayThreshold, @@ -108,14 +112,24 @@ func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler { return b } +func (b *Bundler) sema() *semaphore.Weighted { + // Create the semaphore lazily, because the user may set BufferedByteLimit + // after NewBundler. + b.semOnce.Do(func() { + b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit)) + }) + return b.sem +} + // Add adds item to the current bundle. It marks the bundle for handling and // starts a new one if any of the thresholds or limits are exceeded. // // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then // the item can never be handled. Add returns ErrOversizedItem in this case. // -// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), -// Add returns ErrOverflow. +// If adding the item would exceed the maximum memory allowed +// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for +// memory, Add returns ErrOverflow. // // Add never blocks. func (b *Bundler) Add(item interface{}, size int) error { @@ -124,22 +138,23 @@ func (b *Bundler) Add(item interface{}, size int) error { if b.BundleByteLimit > 0 && size > b.BundleByteLimit { return ErrOversizedItem } - b.mu.Lock() - defer b.mu.Unlock() // If adding this item would exceed our allotted memory // footprint, we can't accept it. - if b.bufferedSize+size > b.BufferedByteLimit { + // (TryAcquire also returns false if anything is waiting on the semaphore, + // so calls to Add and AddWait shouldn't be mixed.) + if !b.sema().TryAcquire(int64(size)) { return ErrOverflow } - b.addLocked(item, size) + b.add(item, size) return nil } -// addLocked adds item to the current bundle. It marks the bundle for handling and +// add adds item to the current bundle. It marks the bundle for handling and // starts a new one if any of the thresholds or limits are exceeded. -// -// addLocked is called with the lock held. -func (b *Bundler) addLocked(item interface{}, size int) { +func (b *Bundler) add(item interface{}, size int) { + b.mu.Lock() + defer b.mu.Unlock() + // If adding this item to the current bundle would cause it to exceed the // maximum bundle size, close the current bundle and start a new one. if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit { @@ -148,7 +163,6 @@ func (b *Bundler) addLocked(item interface{}, size int) { // Add the item. b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item)) b.curBundle.size += size - b.bufferedSize += size // Start a timer to flush the item if one isn't already running. // startFlushLocked clears the timer and closes the bundle at the same time, @@ -177,31 +191,25 @@ func (b *Bundler) addLocked(item interface{}, size int) { // // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), // AddWait blocks until space is available or ctx is done. +// +// Calls to Add and AddWait should not be mixed on the same Bundler. func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error { // If this item exceeds the maximum size of a bundle, // we can never send it. if b.BundleByteLimit > 0 && size > b.BundleByteLimit { return ErrOversizedItem } - b.mu.Lock() - // If adding this item would exceed our allotted memory - // footprint, block until space is available. - // TODO(jba): avoid starvation of large items. - for b.bufferedSize+size > b.BufferedByteLimit { - if b.spaceAvailable == nil { - b.spaceAvailable = make(chan struct{}) - } - avail := b.spaceAvailable - b.mu.Unlock() - select { - case <-ctx.Done(): - return ctx.Err() - case <-avail: - b.mu.Lock() - } + // If adding this item would exceed our allotted memory footprint, block + // until space is available. The semaphore is FIFO, so there will be no + // starvation. + if err := b.sema().Acquire(ctx, int64(size)); err != nil { + return err } - b.addLocked(item, size) - b.mu.Unlock() + // Here, we've reserved space for item. Other goroutines can call AddWait + // and even acquire space, but no one can take away our reservation + // (assuming sem.Release is used correctly). So there is no race condition + // resulting from locking the mutex after sem.Acquire returns. + b.add(item, size) return nil } @@ -236,15 +244,7 @@ func (b *Bundler) startFlushLocked() { go func() { defer func() { - b.mu.Lock() - b.bufferedSize -= bun.size - avail := b.spaceAvailable - b.spaceAvailable = nil - b.mu.Unlock() - - if avail != nil { - close(avail) - } + b.sem.Release(int64(bun.size)) close(done) }()