@@ -25,60 +25,61 @@ type FileAcquirer interface {
25
25
26
26
// New returns a file cache that will fetch files from a database
27
27
func New (registerer prometheus.Registerer , authz rbac.Authorizer ) * Cache {
28
- return (& Cache {
29
- lock : sync.Mutex {},
30
- data : make (map [uuid.UUID ]* cacheEntry ),
31
- authz : authz ,
32
- }).registerMetrics (registerer )
28
+ return & Cache {
29
+ lock : sync.Mutex {},
30
+ data : make (map [uuid.UUID ]* cacheEntry ),
31
+ authz : authz ,
32
+ cacheMetrics : newCacheMetrics (registerer ),
33
+ }
33
34
}
34
35
35
- func ( c * Cache ) registerMetrics ( registerer prometheus.Registerer ) * Cache {
36
+ func newCacheMetrics ( registerer prometheus.Registerer ) cacheMetrics {
36
37
subsystem := "file_cache"
37
38
f := promauto .With (registerer )
38
39
39
- c . currentCacheSize = f . NewGauge (prometheus. GaugeOpts {
40
- Namespace : "coderd" ,
41
- Subsystem : subsystem ,
42
- Name : "open_files_size_bytes_current" ,
43
- Help : "The current amount of memory of all files currently open in the file cache. " ,
44
- })
45
-
46
- c . totalCacheSize = f . NewCounter (prometheus. CounterOpts {
47
- Namespace : "coderd" ,
48
- Subsystem : subsystem ,
49
- Name : "open_files_size_bytes_total" ,
50
- Help : "The total amount of memory ever opened in the file cache. This number never decrements. " ,
51
- })
52
-
53
- c . currentOpenFiles = f . NewGauge (prometheus. GaugeOpts {
54
- Namespace : "coderd" ,
55
- Subsystem : subsystem ,
56
- Name : "open_files_current" ,
57
- Help : "The count of unique files currently open in the file cache. " ,
58
- })
59
-
60
- c . totalOpenedFiles = f . NewCounter (prometheus. CounterOpts {
61
- Namespace : "coderd" ,
62
- Subsystem : subsystem ,
63
- Name : "open_files_total" ,
64
- Help : "The total count of unique files ever opened in the file cache. " ,
65
- })
66
-
67
- c . currentOpenFileReferences = f . NewGauge (prometheus. GaugeOpts {
68
- Namespace : "coderd" ,
69
- Subsystem : subsystem ,
70
- Name : "open_file_refs_current" ,
71
- Help : "The count of file references currently open in the file cache. Multiple references can be held for the same file. " ,
72
- })
73
-
74
- c . totalOpenFileReferences = f . NewCounterVec (prometheus. CounterOpts {
75
- Namespace : "coderd" ,
76
- Subsystem : subsystem ,
77
- Name : "open_file_refs_total" ,
78
- Help : "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache. " ,
79
- }, [] string { " hit" })
80
-
81
- return c
40
+ return cacheMetrics {
41
+ currentCacheSize : f . NewGauge (prometheus. GaugeOpts {
42
+ Namespace : "coderd" ,
43
+ Subsystem : subsystem ,
44
+ Name : "open_files_size_bytes_current " ,
45
+ Help : "The current amount of memory of all files currently open in the file cache." ,
46
+ }),
47
+
48
+ totalCacheSize : f . NewCounter (prometheus. CounterOpts {
49
+ Namespace : "coderd" ,
50
+ Subsystem : subsystem ,
51
+ Name : "open_files_size_bytes_total " ,
52
+ Help : "The total amount of memory ever opened in the file cache. This number never decrements." ,
53
+ }),
54
+
55
+ currentOpenFiles : f . NewGauge (prometheus. GaugeOpts {
56
+ Namespace : "coderd" ,
57
+ Subsystem : subsystem ,
58
+ Name : "open_files_current " ,
59
+ Help : "The count of unique files currently open in the file cache." ,
60
+ }),
61
+
62
+ totalOpenedFiles : f . NewCounter (prometheus. CounterOpts {
63
+ Namespace : "coderd" ,
64
+ Subsystem : subsystem ,
65
+ Name : "open_files_total " ,
66
+ Help : "The total count of unique files ever opened in the file cache." ,
67
+ }),
68
+
69
+ currentOpenFileReferences : f . NewGauge (prometheus. GaugeOpts {
70
+ Namespace : "coderd" ,
71
+ Subsystem : subsystem ,
72
+ Name : "open_file_refs_current " ,
73
+ Help : "The count of file references currently open in the file cache. Multiple references can be held for the same file." ,
74
+ }),
75
+
76
+ totalOpenFileReferences : f . NewCounterVec (prometheus. CounterOpts {
77
+ Namespace : "coderd" ,
78
+ Subsystem : subsystem ,
79
+ Name : "open_file_refs_total " ,
80
+ Help : "The total number of file references ever opened in the file cache. The ' hit' label indicates if the file was loaded from the cache." ,
81
+ }, [] string { "hit" }),
82
+ }
82
83
}
83
84
84
85
// Cache persists the files for template versions, and is used by dynamic
@@ -106,18 +107,23 @@ type cacheMetrics struct {
106
107
totalCacheSize prometheus.Counter
107
108
}
108
109
110
+ type cacheEntry struct {
111
+ // Safety: refCount must only be accessed while the Cache lock is held.
112
+ refCount int
113
+ value * lazy.ValueWithError [CacheEntryValue ]
114
+
115
+ // Safety: close must only be called while the Cache lock is held
116
+ close func ()
117
+ // Safety: purge must only be called while the Cache lock is held
118
+ purge func ()
119
+ }
120
+
109
121
type CacheEntryValue struct {
110
122
fs.FS
111
123
Object rbac.Object
112
124
Size int64
113
125
}
114
126
115
- type cacheEntry struct {
116
- // refCount must only be accessed while the Cache lock is held.
117
- refCount int
118
- value * lazy.ValueWithError [CacheEntryValue ]
119
- }
120
-
121
127
var _ fs.FS = (* CloseFS )(nil )
122
128
123
129
// CloseFS is a wrapper around fs.FS that implements io.Closer. The Close()
@@ -129,106 +135,141 @@ type CloseFS struct {
129
135
close func ()
130
136
}
131
137
132
- func (f * CloseFS ) Close () { f .close () }
138
+ func (f * CloseFS ) Close () {
139
+ f .close ()
140
+ }
133
141
134
142
// Acquire will load the fs.FS for the given file. It guarantees that parallel
135
143
// calls for the same fileID will only result in one fetch, and that parallel
136
144
// calls for distinct fileIDs will fetch in parallel.
137
145
//
138
- // Safety: Every call to Acquire that does not return an error must have a
139
- // matching call to Release .
146
+ // Safety: Every call to Acquire that does not return an error must call close
147
+ // on the returned value when it is done being used .
140
148
func (c * Cache ) Acquire (ctx context.Context , db database.Store , fileID uuid.UUID ) (* CloseFS , error ) {
141
149
// It's important that this `Load` call occurs outside `prepare`, after the
142
150
// mutex has been released, or we would continue to hold the lock until the
143
151
// entire file has been fetched, which may be slow, and would prevent other
144
152
// files from being fetched in parallel.
145
- it , err := c .prepare (ctx , db , fileID ).Load ()
153
+ e := c .prepare (db , fileID )
154
+ ev , err := e .value .Load ()
146
155
if err != nil {
147
- c .release (fileID )
156
+ c .lock .Lock ()
157
+ defer c .lock .Unlock ()
158
+ e .close ()
159
+ e .purge ()
148
160
return nil , err
149
161
}
150
162
163
+ cleanup := func () {
164
+ c .lock .Lock ()
165
+ defer c .lock .Unlock ()
166
+ e .close ()
167
+ }
168
+
169
+ // We always run the fetch under a system context and actor, so we need to
170
+ // check the caller's context (including the actor) manually before returning.
171
+
172
+ // Check if the caller's context was canceled. Even though `Authorize` takes
173
+ // a context, we still check it manually first because none of our mock
174
+ // database implementations check for context cancellation.
175
+ if err := ctx .Err (); err != nil {
176
+ cleanup ()
177
+ return nil , err
178
+ }
179
+
180
+ // Check that the caller is authorized to access the file
151
181
subject , ok := dbauthz .ActorFromContext (ctx )
152
182
if ! ok {
183
+ cleanup ()
153
184
return nil , dbauthz .ErrNoActor
154
185
}
155
- // Always check the caller can actually read the file.
156
- if err := c .authz .Authorize (ctx , subject , policy .ActionRead , it .Object ); err != nil {
157
- c .release (fileID )
186
+ if err := c .authz .Authorize (ctx , subject , policy .ActionRead , ev .Object ); err != nil {
187
+ cleanup ()
158
188
return nil , err
159
189
}
160
190
161
- var once sync.Once
191
+ var closeOnce sync.Once
162
192
return & CloseFS {
163
- FS : it .FS ,
193
+ FS : ev .FS ,
164
194
close : func () {
165
195
// sync.Once makes the Close() idempotent, so we can call it
166
196
// multiple times without worrying about double-releasing.
167
- once .Do (func () { c .release (fileID ) })
197
+ closeOnce .Do (func () {
198
+ c .lock .Lock ()
199
+ defer c .lock .Unlock ()
200
+ e .close ()
201
+ })
168
202
},
169
203
}, nil
170
204
}
171
205
172
- func (c * Cache ) prepare (ctx context. Context , db database.Store , fileID uuid.UUID ) * lazy. ValueWithError [ CacheEntryValue ] {
206
+ func (c * Cache ) prepare (db database.Store , fileID uuid.UUID ) * cacheEntry {
173
207
c .lock .Lock ()
174
208
defer c .lock .Unlock ()
175
209
176
210
hitLabel := "true"
177
211
entry , ok := c .data [fileID ]
178
212
if ! ok {
179
- value := lazy .NewWithError (func () (CacheEntryValue , error ) {
180
- val , err := fetch (ctx , db , fileID )
213
+ hitLabel = "false"
214
+
215
+ var purgeOnce sync.Once
216
+ entry = & cacheEntry {
217
+ value : lazy .NewWithError (func () (CacheEntryValue , error ) {
218
+ val , err := fetch (db , fileID )
219
+ if err != nil {
220
+ return val , err
221
+ }
181
222
182
- // Always add to the cache size the bytes of the file loaded.
183
- if err == nil {
223
+ // Add the size of the file to the cache size metrics.
184
224
c .currentCacheSize .Add (float64 (val .Size ))
185
225
c .totalCacheSize .Add (float64 (val .Size ))
186
- }
187
226
188
- return val , err
189
- })
227
+ return val , err
228
+ }),
190
229
191
- entry = & cacheEntry {
192
- value : value ,
193
- refCount : 0 ,
230
+ close : func () {
231
+ entry .refCount --
232
+ c .currentOpenFileReferences .Dec ()
233
+ if entry .refCount > 0 {
234
+ return
235
+ }
236
+
237
+ entry .purge ()
238
+ },
239
+
240
+ purge : func () {
241
+ purgeOnce .Do (func () {
242
+ c .purge (fileID )
243
+ })
244
+ },
194
245
}
195
246
c .data [fileID ] = entry
247
+
196
248
c .currentOpenFiles .Inc ()
197
249
c .totalOpenedFiles .Inc ()
198
- hitLabel = "false"
199
250
}
200
251
201
252
c .currentOpenFileReferences .Inc ()
202
253
c .totalOpenFileReferences .WithLabelValues (hitLabel ).Inc ()
203
254
entry .refCount ++
204
- return entry . value
255
+ return entry
205
256
}
206
257
207
- // release decrements the reference count for the given fileID, and frees the
208
- // backing data if there are no further references being held.
209
- //
210
- // release should only be called after a successful call to Acquire using the Release()
211
- // method on the returned *CloseFS.
212
- func (c * Cache ) release (fileID uuid.UUID ) {
213
- c .lock .Lock ()
214
- defer c .lock .Unlock ()
215
-
258
+ // purge immediately removes an entry from the cache, even if it has open
259
+ // references.
260
+ // Safety: Must only be called while the Cache lock is held
261
+ func (c * Cache ) purge (fileID uuid.UUID ) {
216
262
entry , ok := c .data [fileID ]
217
263
if ! ok {
218
- // If we land here, it's almost certainly because a bug already happened,
219
- // and we're freeing something that's already been freed, or we're calling
220
- // this function with an incorrect ID. Should this function return an error?
221
- return
222
- }
223
-
224
- c .currentOpenFileReferences .Dec ()
225
- entry .refCount --
226
- if entry .refCount > 0 {
264
+ // If we land here, it's probably because of a fetch attempt that
265
+ // resulted in an error, and got purged already. It may also be an
266
+ // erroneous extra close, but we can't really distinguish between those
267
+ // two cases currently.
227
268
return
228
269
}
229
270
271
+ // Purge the file from the cache.
230
272
c .currentOpenFiles .Dec ()
231
-
232
273
ev , err := entry .value .Load ()
233
274
if err == nil {
234
275
c .currentCacheSize .Add (- 1 * float64 (ev .Size ))
@@ -246,11 +287,18 @@ func (c *Cache) Count() int {
246
287
return len (c .data )
247
288
}
248
289
249
- func fetch (ctx context.Context , store database.Store , fileID uuid.UUID ) (CacheEntryValue , error ) {
250
- // Make sure the read does not fail due to authorization issues.
251
- // Authz is checked on the Acquire call, so this is safe.
290
+ func fetch (store database.Store , fileID uuid.UUID ) (CacheEntryValue , error ) {
291
+ // Because many callers can be waiting on the same file fetch concurrently, we
292
+ // want to prevent any failures that would cause them all to receive errors
293
+ // because the caller who initiated the fetch would fail.
294
+ // - We always run the fetch with an uncancelable context, and then check
295
+ // context cancellation for each acquirer afterwards.
296
+ // - We always run the fetch as a system user, and then check authorization
297
+ // for each acquirer afterwards.
298
+ // This prevents a canceled context or an unauthorized user from "holding up
299
+ // the queue".
252
300
//nolint:gocritic
253
- file , err := store .GetFileByID (dbauthz .AsFileReader (ctx ), fileID )
301
+ file , err := store .GetFileByID (dbauthz .AsFileReader (context . Background () ), fileID )
254
302
if err != nil {
255
303
return CacheEntryValue {}, xerrors .Errorf ("failed to read file from database: %w" , err )
256
304
}
0 commit comments