@@ -28,6 +28,7 @@ import (
28
28
"time"
29
29
30
30
"golang.org/x/net/context"
31
+ "golang.org/x/sync/semaphore"
31
32
)
32
33
33
34
const (
@@ -74,11 +75,11 @@ type Bundler struct {
74
75
itemSliceZero reflect.Value // nil (zero value) for slice of items
75
76
flushTimer * time.Timer // implements DelayThreshold
76
77
77
- mu sync.Mutex
78
- spaceAvailable chan struct {} // closed and replaced when space is available
79
- bufferedSize int // total bytes buffered
80
- curBundle bundle // incoming items added to this bundle
81
- handlingc <- chan struct {} // set to non-nil while a handler is running; closed when it returns
78
+ mu sync.Mutex
79
+ sem * semaphore. Weighted // enforces BufferedByteLimit
80
+ semOnce sync. Once
81
+ curBundle bundle // incoming items added to this bundle
82
+ handlingc <- chan struct {} // set to non-nil while a handler is running; closed when it returns
82
83
}
83
84
84
85
type bundle struct {
@@ -94,6 +95,9 @@ type bundle struct {
94
95
// handler is a function that will be called on each bundle. If itemExample is
95
96
// of type T, the argument to handler is of type []T. handler is always called
96
97
// sequentially for each bundle, and never in parallel.
98
+ //
99
+ // Configure the Bundler by setting its thresholds and limits before calling
100
+ // any of its methods.
97
101
func NewBundler (itemExample interface {}, handler func (interface {})) * Bundler {
98
102
b := & Bundler {
99
103
DelayThreshold : DefaultDelayThreshold ,
@@ -108,14 +112,24 @@ func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
108
112
return b
109
113
}
110
114
115
+ func (b * Bundler ) sema () * semaphore.Weighted {
116
+ // Create the semaphore lazily, because the user may set BufferedByteLimit
117
+ // after NewBundler.
118
+ b .semOnce .Do (func () {
119
+ b .sem = semaphore .NewWeighted (int64 (b .BufferedByteLimit ))
120
+ })
121
+ return b .sem
122
+ }
123
+
111
124
// Add adds item to the current bundle. It marks the bundle for handling and
112
125
// starts a new one if any of the thresholds or limits are exceeded.
113
126
//
114
127
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
115
128
// the item can never be handled. Add returns ErrOversizedItem in this case.
116
129
//
117
- // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
118
- // Add returns ErrOverflow.
130
+ // If adding the item would exceed the maximum memory allowed
131
+ // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
132
+ // memory, Add returns ErrOverflow.
119
133
//
120
134
// Add never blocks.
121
135
func (b * Bundler ) Add (item interface {}, size int ) error {
@@ -124,22 +138,23 @@ func (b *Bundler) Add(item interface{}, size int) error {
124
138
if b .BundleByteLimit > 0 && size > b .BundleByteLimit {
125
139
return ErrOversizedItem
126
140
}
127
- b .mu .Lock ()
128
- defer b .mu .Unlock ()
129
141
// If adding this item would exceed our allotted memory
130
142
// footprint, we can't accept it.
131
- if b .bufferedSize + size > b .BufferedByteLimit {
143
+ // (TryAcquire also returns false if anything is waiting on the semaphore,
144
+ // so calls to Add and AddWait shouldn't be mixed.)
145
+ if ! b .sema ().TryAcquire (int64 (size )) {
132
146
return ErrOverflow
133
147
}
134
- b .addLocked (item , size )
148
+ b .add (item , size )
135
149
return nil
136
150
}
137
151
138
- // addLocked adds item to the current bundle. It marks the bundle for handling and
152
+ // add adds item to the current bundle. It marks the bundle for handling and
139
153
// starts a new one if any of the thresholds or limits are exceeded.
140
- //
141
- // addLocked is called with the lock held.
142
- func (b * Bundler ) addLocked (item interface {}, size int ) {
154
+ func (b * Bundler ) add (item interface {}, size int ) {
155
+ b .mu .Lock ()
156
+ defer b .mu .Unlock ()
157
+
143
158
// If adding this item to the current bundle would cause it to exceed the
144
159
// maximum bundle size, close the current bundle and start a new one.
145
160
if b .BundleByteLimit > 0 && b .curBundle .size + size > b .BundleByteLimit {
@@ -148,7 +163,6 @@ func (b *Bundler) addLocked(item interface{}, size int) {
148
163
// Add the item.
149
164
b .curBundle .items = reflect .Append (b .curBundle .items , reflect .ValueOf (item ))
150
165
b .curBundle .size += size
151
- b .bufferedSize += size
152
166
153
167
// Start a timer to flush the item if one isn't already running.
154
168
// startFlushLocked clears the timer and closes the bundle at the same time,
@@ -177,31 +191,25 @@ func (b *Bundler) addLocked(item interface{}, size int) {
177
191
//
178
192
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
179
193
// AddWait blocks until space is available or ctx is done.
194
+ //
195
+ // Calls to Add and AddWait should not be mixed on the same Bundler.
180
196
func (b * Bundler ) AddWait (ctx context.Context , item interface {}, size int ) error {
181
197
// If this item exceeds the maximum size of a bundle,
182
198
// we can never send it.
183
199
if b .BundleByteLimit > 0 && size > b .BundleByteLimit {
184
200
return ErrOversizedItem
185
201
}
186
- b .mu .Lock ()
187
- // If adding this item would exceed our allotted memory
188
- // footprint, block until space is available.
189
- // TODO(jba): avoid starvation of large items.
190
- for b .bufferedSize + size > b .BufferedByteLimit {
191
- if b .spaceAvailable == nil {
192
- b .spaceAvailable = make (chan struct {})
193
- }
194
- avail := b .spaceAvailable
195
- b .mu .Unlock ()
196
- select {
197
- case <- ctx .Done ():
198
- return ctx .Err ()
199
- case <- avail :
200
- b .mu .Lock ()
201
- }
202
+ // If adding this item would exceed our allotted memory footprint, block
203
+ // until space is available. The semaphore is FIFO, so there will be no
204
+ // starvation.
205
+ if err := b .sema ().Acquire (ctx , int64 (size )); err != nil {
206
+ return err
202
207
}
203
- b .addLocked (item , size )
204
- b .mu .Unlock ()
208
+ // Here, we've reserved space for item. Other goroutines can call AddWait
209
+ // and even acquire space, but no one can take away our reservation
210
+ // (assuming sem.Release is used correctly). So there is no race condition
211
+ // resulting from locking the mutex after sem.Acquire returns.
212
+ b .add (item , size )
205
213
return nil
206
214
}
207
215
@@ -236,15 +244,7 @@ func (b *Bundler) startFlushLocked() {
236
244
237
245
go func () {
238
246
defer func () {
239
- b .mu .Lock ()
240
- b .bufferedSize -= bun .size
241
- avail := b .spaceAvailable
242
- b .spaceAvailable = nil
243
- b .mu .Unlock ()
244
-
245
- if avail != nil {
246
- close (avail )
247
- }
247
+ b .sem .Release (int64 (bun .size ))
248
248
close (done )
249
249
}()
250
250
0 commit comments