Skip to content

Commit

Permalink
bundler: use weighted semaphore to enforced byte limit
Browse files Browse the repository at this point in the history
The weighted semaphore class, since it's context-aware, can replace
the previous complex mechanism involving channels.

It is FIFO, so it also solves the starvation problem.

Change-Id: If758576c6284abd8c9638db10600d9a9e6699c07
Reviewed-on: https://code-review.googlesource.com/11520
Reviewed-by: Jonathan Amsterdam <[email protected]>
  • Loading branch information
jba committed Mar 20, 2017
1 parent 8d00a49 commit b8da4e0
Showing 1 changed file with 43 additions and 43 deletions.
86 changes: 43 additions & 43 deletions support/bundler/bundler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)

const (
Expand Down Expand Up @@ -74,11 +75,11 @@ type Bundler struct {
itemSliceZero reflect.Value // nil (zero value) for slice of items
flushTimer *time.Timer // implements DelayThreshold

mu sync.Mutex
spaceAvailable chan struct{} // closed and replaced when space is available
bufferedSize int // total bytes buffered
curBundle bundle // incoming items added to this bundle
handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns
mu sync.Mutex
sem *semaphore.Weighted // enforces BufferedByteLimit
semOnce sync.Once
curBundle bundle // incoming items added to this bundle
handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns
}

type bundle struct {
Expand All @@ -94,6 +95,9 @@ type bundle struct {
// handler is a function that will be called on each bundle. If itemExample is
// of type T, the argument to handler is of type []T. handler is always called
// sequentially for each bundle, and never in parallel.
//
// Configure the Bundler by setting its thresholds and limits before calling
// any of its methods.
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
b := &Bundler{
DelayThreshold: DefaultDelayThreshold,
Expand All @@ -108,14 +112,24 @@ func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
return b
}

func (b *Bundler) sema() *semaphore.Weighted {
// Create the semaphore lazily, because the user may set BufferedByteLimit
// after NewBundler.
b.semOnce.Do(func() {
b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
})
return b.sem
}

// Add adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
//
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
// the item can never be handled. Add returns ErrOversizedItem in this case.
//
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
// Add returns ErrOverflow.
// If adding the item would exceed the maximum memory allowed
// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
// memory, Add returns ErrOverflow.
//
// Add never blocks.
func (b *Bundler) Add(item interface{}, size int) error {
Expand All @@ -124,22 +138,23 @@ func (b *Bundler) Add(item interface{}, size int) error {
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
return ErrOversizedItem
}
b.mu.Lock()
defer b.mu.Unlock()
// If adding this item would exceed our allotted memory
// footprint, we can't accept it.
if b.bufferedSize+size > b.BufferedByteLimit {
// (TryAcquire also returns false if anything is waiting on the semaphore,
// so calls to Add and AddWait shouldn't be mixed.)
if !b.sema().TryAcquire(int64(size)) {
return ErrOverflow
}
b.addLocked(item, size)
b.add(item, size)
return nil
}

// addLocked adds item to the current bundle. It marks the bundle for handling and
// add adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
//
// addLocked is called with the lock held.
func (b *Bundler) addLocked(item interface{}, size int) {
func (b *Bundler) add(item interface{}, size int) {
b.mu.Lock()
defer b.mu.Unlock()

// If adding this item to the current bundle would cause it to exceed the
// maximum bundle size, close the current bundle and start a new one.
if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
Expand All @@ -148,7 +163,6 @@ func (b *Bundler) addLocked(item interface{}, size int) {
// Add the item.
b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
b.curBundle.size += size
b.bufferedSize += size

// Start a timer to flush the item if one isn't already running.
// startFlushLocked clears the timer and closes the bundle at the same time,
Expand Down Expand Up @@ -177,31 +191,25 @@ func (b *Bundler) addLocked(item interface{}, size int) {
//
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
// AddWait blocks until space is available or ctx is done.
//
// Calls to Add and AddWait should not be mixed on the same Bundler.
func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
// If this item exceeds the maximum size of a bundle,
// we can never send it.
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
return ErrOversizedItem
}
b.mu.Lock()
// If adding this item would exceed our allotted memory
// footprint, block until space is available.
// TODO(jba): avoid starvation of large items.
for b.bufferedSize+size > b.BufferedByteLimit {
if b.spaceAvailable == nil {
b.spaceAvailable = make(chan struct{})
}
avail := b.spaceAvailable
b.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-avail:
b.mu.Lock()
}
// If adding this item would exceed our allotted memory footprint, block
// until space is available. The semaphore is FIFO, so there will be no
// starvation.
if err := b.sema().Acquire(ctx, int64(size)); err != nil {
return err
}
b.addLocked(item, size)
b.mu.Unlock()
// Here, we've reserved space for item. Other goroutines can call AddWait
// and even acquire space, but no one can take away our reservation
// (assuming sem.Release is used correctly). So there is no race condition
// resulting from locking the mutex after sem.Acquire returns.
b.add(item, size)
return nil
}

Expand Down Expand Up @@ -236,15 +244,7 @@ func (b *Bundler) startFlushLocked() {

go func() {
defer func() {
b.mu.Lock()
b.bufferedSize -= bun.size
avail := b.spaceAvailable
b.spaceAvailable = nil
b.mu.Unlock()

if avail != nil {
close(avail)
}
b.sem.Release(int64(bun.size))
close(done)
}()

Expand Down

0 comments on commit b8da4e0

Please sign in to comment.