-
Notifications
You must be signed in to change notification settings - Fork 928
chore: don't cache errors in file cache #18555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+273
−102
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
780233b
test: unit test to excercise polluted file cache with error
Emyrk 8a6deb1
chore: purge file cache entries on error
aslilac bf6562a
Merge branch 'stevenmasley/file_cache_error' into lilac/dont-cache-er…
aslilac 610740a
proper release gating
aslilac ec53459
lint
aslilac df7acff
I win at debugging deadlocks today
aslilac 2ecb1d7
chore: purge file cache entries on error
aslilac 0fa1f6b
Merge branch 'main' into lilac/dont-cache-errors
aslilac db89836
hmm...
aslilac aab0335
at this point the calls are just serialized anyway
aslilac c0ea08a
Merge branch 'main' into lilac/dont-cache-errors
aslilac 78dedaa
concurrency be like
aslilac 94ce678
add comment
aslilac d12d121
add another concurrency test
aslilac 03f9217
think I finally figured out this test 😎
aslilac 82a3b7e
actually this doesn't need to be threaded anymore
aslilac 3764134
lint
aslilac ac52626
Update coderd/files/cache_internal_test.go
aslilac 725696f
one lock to rule them all
aslilac 9cda09f
update some comments
aslilac 98f18b1
wait I thought I committed this
aslilac c09028b
Merge branch 'main' into lilac/dont-cache-errors
aslilac 00fb0b0
lock on close
aslilac File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,60 +25,61 @@ type FileAcquirer interface { | |
|
||
// New returns a file cache that will fetch files from a database | ||
func New(registerer prometheus.Registerer, authz rbac.Authorizer) *Cache { | ||
return (&Cache{ | ||
lock: sync.Mutex{}, | ||
data: make(map[uuid.UUID]*cacheEntry), | ||
authz: authz, | ||
}).registerMetrics(registerer) | ||
return &Cache{ | ||
lock: sync.Mutex{}, | ||
data: make(map[uuid.UUID]*cacheEntry), | ||
authz: authz, | ||
cacheMetrics: newCacheMetrics(registerer), | ||
} | ||
} | ||
|
||
func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache { | ||
func newCacheMetrics(registerer prometheus.Registerer) cacheMetrics { | ||
subsystem := "file_cache" | ||
f := promauto.With(registerer) | ||
|
||
c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_size_bytes_current", | ||
Help: "The current amount of memory of all files currently open in the file cache.", | ||
}) | ||
|
||
c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_size_bytes_total", | ||
Help: "The total amount of memory ever opened in the file cache. This number never decrements.", | ||
}) | ||
|
||
c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_current", | ||
Help: "The count of unique files currently open in the file cache.", | ||
}) | ||
|
||
c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_total", | ||
Help: "The total count of unique files ever opened in the file cache.", | ||
}) | ||
|
||
c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_file_refs_current", | ||
Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", | ||
}) | ||
|
||
c.totalOpenFileReferences = f.NewCounterVec(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_file_refs_total", | ||
Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.", | ||
}, []string{"hit"}) | ||
|
||
return c | ||
return cacheMetrics{ | ||
currentCacheSize: f.NewGauge(prometheus.GaugeOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_size_bytes_current", | ||
Help: "The current amount of memory of all files currently open in the file cache.", | ||
}), | ||
|
||
totalCacheSize: f.NewCounter(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_size_bytes_total", | ||
Help: "The total amount of memory ever opened in the file cache. This number never decrements.", | ||
}), | ||
|
||
currentOpenFiles: f.NewGauge(prometheus.GaugeOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_current", | ||
Help: "The count of unique files currently open in the file cache.", | ||
}), | ||
|
||
totalOpenedFiles: f.NewCounter(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_files_total", | ||
Help: "The total count of unique files ever opened in the file cache.", | ||
}), | ||
|
||
currentOpenFileReferences: f.NewGauge(prometheus.GaugeOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_file_refs_current", | ||
Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", | ||
}), | ||
|
||
totalOpenFileReferences: f.NewCounterVec(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: subsystem, | ||
Name: "open_file_refs_total", | ||
Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.", | ||
}, []string{"hit"}), | ||
} | ||
} | ||
|
||
// Cache persists the files for template versions, and is used by dynamic | ||
|
@@ -106,18 +107,23 @@ type cacheMetrics struct { | |
totalCacheSize prometheus.Counter | ||
} | ||
|
||
type cacheEntry struct { | ||
// Safety: refCount must only be accessed while the Cache lock is held. | ||
refCount int | ||
value *lazy.ValueWithError[CacheEntryValue] | ||
|
||
// Safety: close must only be called while the Cache lock is held | ||
close func() | ||
// Safety: purge must only be called while the Cache lock is held | ||
purge func() | ||
} | ||
|
||
type CacheEntryValue struct { | ||
fs.FS | ||
Object rbac.Object | ||
Size int64 | ||
} | ||
|
||
type cacheEntry struct { | ||
// refCount must only be accessed while the Cache lock is held. | ||
refCount int | ||
value *lazy.ValueWithError[CacheEntryValue] | ||
} | ||
|
||
var _ fs.FS = (*CloseFS)(nil) | ||
|
||
// CloseFS is a wrapper around fs.FS that implements io.Closer. The Close() | ||
|
@@ -129,106 +135,141 @@ type CloseFS struct { | |
close func() | ||
} | ||
|
||
func (f *CloseFS) Close() { f.close() } | ||
func (f *CloseFS) Close() { | ||
f.close() | ||
} | ||
|
||
// Acquire will load the fs.FS for the given file. It guarantees that parallel | ||
// calls for the same fileID will only result in one fetch, and that parallel | ||
// calls for distinct fileIDs will fetch in parallel. | ||
// | ||
// Safety: Every call to Acquire that does not return an error must have a | ||
// matching call to Release. | ||
// Safety: Every call to Acquire that does not return an error must call close | ||
// on the returned value when it is done being used. | ||
func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) { | ||
// It's important that this `Load` call occurs outside `prepare`, after the | ||
// mutex has been released, or we would continue to hold the lock until the | ||
// entire file has been fetched, which may be slow, and would prevent other | ||
// files from being fetched in parallel. | ||
it, err := c.prepare(ctx, db, fileID).Load() | ||
e := c.prepare(db, fileID) | ||
ev, err := e.value.Load() | ||
if err != nil { | ||
c.release(fileID) | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
Comment on lines
+156
to
+157
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The 3 other early returns do not lock the cache on |
||
e.close() | ||
e.purge() | ||
return nil, err | ||
} | ||
|
||
cleanup := func() { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
e.close() | ||
} | ||
|
||
// We always run the fetch under a system context and actor, so we need to | ||
// check the caller's context (including the actor) manually before returning. | ||
|
||
// Check if the caller's context was canceled. Even though `Authorize` takes | ||
// a context, we still check it manually first because none of our mock | ||
// database implementations check for context cancellation. | ||
if err := ctx.Err(); err != nil { | ||
cleanup() | ||
return nil, err | ||
} | ||
|
||
// Check that the caller is authorized to access the file | ||
subject, ok := dbauthz.ActorFromContext(ctx) | ||
if !ok { | ||
cleanup() | ||
return nil, dbauthz.ErrNoActor | ||
} | ||
// Always check the caller can actually read the file. | ||
if err := c.authz.Authorize(ctx, subject, policy.ActionRead, it.Object); err != nil { | ||
c.release(fileID) | ||
if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil { | ||
cleanup() | ||
return nil, err | ||
} | ||
|
||
var once sync.Once | ||
var closeOnce sync.Once | ||
return &CloseFS{ | ||
FS: it.FS, | ||
FS: ev.FS, | ||
close: func() { | ||
// sync.Once makes the Close() idempotent, so we can call it | ||
// multiple times without worrying about double-releasing. | ||
once.Do(func() { c.release(fileID) }) | ||
closeOnce.Do(func() { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
e.close() | ||
}) | ||
}, | ||
}, nil | ||
} | ||
|
||
func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *lazy.ValueWithError[CacheEntryValue] { | ||
func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
|
||
hitLabel := "true" | ||
entry, ok := c.data[fileID] | ||
if !ok { | ||
value := lazy.NewWithError(func() (CacheEntryValue, error) { | ||
val, err := fetch(ctx, db, fileID) | ||
hitLabel = "false" | ||
|
||
var purgeOnce sync.Once | ||
entry = &cacheEntry{ | ||
value: lazy.NewWithError(func() (CacheEntryValue, error) { | ||
val, err := fetch(db, fileID) | ||
if err != nil { | ||
return val, err | ||
} | ||
|
||
// Always add to the cache size the bytes of the file loaded. | ||
if err == nil { | ||
// Add the size of the file to the cache size metrics. | ||
c.currentCacheSize.Add(float64(val.Size)) | ||
c.totalCacheSize.Add(float64(val.Size)) | ||
} | ||
|
||
return val, err | ||
}) | ||
return val, err | ||
}), | ||
|
||
entry = &cacheEntry{ | ||
value: value, | ||
refCount: 0, | ||
close: func() { | ||
entry.refCount-- | ||
c.currentOpenFileReferences.Dec() | ||
if entry.refCount > 0 { | ||
return | ||
} | ||
|
||
entry.purge() | ||
}, | ||
|
||
purge: func() { | ||
purgeOnce.Do(func() { | ||
c.purge(fileID) | ||
}) | ||
}, | ||
} | ||
c.data[fileID] = entry | ||
|
||
c.currentOpenFiles.Inc() | ||
c.totalOpenedFiles.Inc() | ||
hitLabel = "false" | ||
} | ||
|
||
c.currentOpenFileReferences.Inc() | ||
c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc() | ||
entry.refCount++ | ||
return entry.value | ||
return entry | ||
} | ||
|
||
// release decrements the reference count for the given fileID, and frees the | ||
// backing data if there are no further references being held. | ||
// | ||
// release should only be called after a successful call to Acquire using the Release() | ||
// method on the returned *CloseFS. | ||
func (c *Cache) release(fileID uuid.UUID) { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
|
||
// purge immediately removes an entry from the cache, even if it has open | ||
// references. | ||
// Safety: Must only be called while the Cache lock is held | ||
func (c *Cache) purge(fileID uuid.UUID) { | ||
entry, ok := c.data[fileID] | ||
if !ok { | ||
Emyrk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// If we land here, it's almost certainly because a bug already happened, | ||
// and we're freeing something that's already been freed, or we're calling | ||
// this function with an incorrect ID. Should this function return an error? | ||
return | ||
} | ||
|
||
c.currentOpenFileReferences.Dec() | ||
entry.refCount-- | ||
if entry.refCount > 0 { | ||
// If we land here, it's probably because of a fetch attempt that | ||
// resulted in an error, and got purged already. It may also be an | ||
// erroneous extra close, but we can't really distinguish between those | ||
// two cases currently. | ||
return | ||
} | ||
|
||
// Purge the file from the cache. | ||
c.currentOpenFiles.Dec() | ||
|
||
ev, err := entry.value.Load() | ||
if err == nil { | ||
c.currentCacheSize.Add(-1 * float64(ev.Size)) | ||
|
@@ -246,11 +287,18 @@ func (c *Cache) Count() int { | |
return len(c.data) | ||
} | ||
|
||
func fetch(ctx context.Context, store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { | ||
// Make sure the read does not fail due to authorization issues. | ||
// Authz is checked on the Acquire call, so this is safe. | ||
func fetch(store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { | ||
// Because many callers can be waiting on the same file fetch concurrently, we | ||
// want to prevent any failures that would cause them all to receive errors | ||
// because the caller who initiated the fetch would fail. | ||
// - We always run the fetch with an uncancelable context, and then check | ||
// context cancellation for each acquirer afterwards. | ||
// - We always run the fetch as a system user, and then check authorization | ||
// for each acquirer afterwards. | ||
// This prevents a canceled context or an unauthorized user from "holding up | ||
// the queue". | ||
aslilac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
//nolint:gocritic | ||
file, err := store.GetFileByID(dbauthz.AsFileReader(ctx), fileID) | ||
file, err := store.GetFileByID(dbauthz.AsFileReader(context.Background()), fileID) | ||
if err != nil { | ||
return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err) | ||
} | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package files | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/google/uuid" | ||
|
||
"github.com/coder/coder/v2/coderd/database" | ||
) | ||
|
||
// LeakCache prevents entries from even being released to enable testing certain | ||
// behaviors. | ||
type LeakCache struct { | ||
*Cache | ||
} | ||
|
||
func (c *LeakCache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) { | ||
// We need to call prepare first to both 1. leak a reference and 2. prevent | ||
// the behavior of immediately closing on an error (as implemented in Acquire) | ||
// from freeing the file. | ||
c.prepare(db, fileID) | ||
return c.Cache.Acquire(ctx, db, fileID) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.