From 780233b882f3ac217808f4c65ba357348b49586d Mon Sep 17 00:00:00 2001 From: Steven Masley Date: Sun, 22 Jun 2025 22:56:09 -0500 Subject: [PATCH 01/19] test: unit test to excercise polluted file cache with error --- coderd/files/cache_test.go | 60 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/coderd/files/cache_test.go b/coderd/files/cache_test.go index 5efb4ba19be28..7c270f9e90347 100644 --- a/coderd/files/cache_test.go +++ b/coderd/files/cache_test.go @@ -2,6 +2,7 @@ package files_test import ( "context" + "sync" "sync/atomic" "testing" "time" @@ -9,7 +10,9 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "golang.org/x/sync/errgroup" "cdr.dev/slog/sloggers/slogtest" @@ -18,6 +21,7 @@ import ( "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbgen" + "github.com/coder/coder/v2/coderd/database/dbmock" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/files" "github.com/coder/coder/v2/coderd/rbac" @@ -25,6 +29,62 @@ import ( "github.com/coder/coder/v2/testutil" ) +// TestCancelledFetch runs 2 Acquire calls. The first fails with a ctx.Canceled +// error. The second call should ignore the first error and try to fetch the file +// again, which should succeed. +func TestCancelledFetch(t *testing.T) { + t.Parallel() + + fileID := uuid.New() + rdy := make(chan struct{}) + dbM := dbmock.NewMockStore(gomock.NewController(t)) + + // First call should fail + dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { + // Wait long enough for the second call to be queued up. + <-rdy + return database.File{}, context.Canceled + }) + + // Second call should succeed + dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { + return database.File{ + ID: fileID, + Data: make([]byte, 100), + }, nil + }) + + //nolint:gocritic // Unit testing + ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) + cache := files.NewFromStore(dbM, prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) + + var wg sync.WaitGroup + wg.Add(2) + + // First call that will fail + go func() { + _, err := cache.Acquire(ctx, fileID) + assert.ErrorIs(t, err, context.Canceled) + wg.Done() + }() + + // Second call, that should succeed + go func() { + fs, err := cache.Acquire(ctx, fileID) + assert.NoError(t, err) + if fs != nil { + fs.Close() + } + wg.Done() + }() + + // We need that second Acquire call to be queued up + time.Sleep(testutil.IntervalFast) + + close(rdy) + wg.Wait() +} + // nolint:paralleltest,tparallel // Serially testing is easier func TestCacheRBAC(t *testing.T) { t.Parallel() From 8a6deb18b033828460782a5703fe3179d4b4074c Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 24 Jun 2025 22:38:41 +0000 Subject: [PATCH 02/19] chore: purge file cache entries on error --- coderd/files/cache.go | 226 ++++++++++++++++++++++++------------------ 1 file changed, 130 insertions(+), 96 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 3698aac9286c8..d139c15117c94 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -25,60 +25,61 @@ type FileAcquirer interface { // New returns a file cache that will fetch files from a database func New(registerer prometheus.Registerer, authz rbac.Authorizer) *Cache { - return (&Cache{ - lock: sync.Mutex{}, - data: make(map[uuid.UUID]*cacheEntry), - authz: authz, - }).registerMetrics(registerer) + return &Cache{ + lock: sync.Mutex{}, + data: make(map[uuid.UUID]*cacheEntry), + authz: authz, + cacheMetrics: newCacheMetrics(registerer), + } } -func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache { +func newCacheMetrics(registerer prometheus.Registerer) cacheMetrics { subsystem := "file_cache" f := promauto.With(registerer) - c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_size_bytes_current", - Help: "The current amount of memory of all files currently open in the file cache.", - }) - - c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_size_bytes_total", - Help: "The total amount of memory ever opened in the file cache. This number never decrements.", - }) - - c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_current", - Help: "The count of unique files currently open in the file cache.", - }) - - c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_total", - Help: "The total count of unique files ever opened in the file cache.", - }) - - c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_file_refs_current", - Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", - }) - - c.totalOpenFileReferences = f.NewCounterVec(prometheus.CounterOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_file_refs_total", - Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.", - }, []string{"hit"}) - - return c + return cacheMetrics{ + currentCacheSize: f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_size_bytes_current", + Help: "The current amount of memory of all files currently open in the file cache.", + }), + + totalCacheSize: f.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_size_bytes_total", + Help: "The total amount of memory ever opened in the file cache. This number never decrements.", + }), + + currentOpenFiles: f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_current", + Help: "The count of unique files currently open in the file cache.", + }), + + totalOpenedFiles: f.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_total", + Help: "The total count of unique files ever opened in the file cache.", + }), + + currentOpenFileReferences: f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_file_refs_current", + Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", + }), + + totalOpenFileReferences: f.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_file_refs_total", + Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.", + }, []string{"hit"}), + } } // Cache persists the files for template versions, and is used by dynamic @@ -106,18 +107,21 @@ type cacheMetrics struct { totalCacheSize prometheus.Counter } +type cacheEntry struct { + // refCount must only be accessed while the cacheEntry lock is held. + lock sync.Mutex + refCount int + value *lazy.ValueWithError[CacheEntryValue] + + close func() +} + type CacheEntryValue struct { fs.FS Object rbac.Object Size int64 } -type cacheEntry struct { - // refCount must only be accessed while the Cache lock is held. - refCount int - value *lazy.ValueWithError[CacheEntryValue] -} - var _ fs.FS = (*CloseFS)(nil) // CloseFS is a wrapper around fs.FS that implements io.Closer. The Close() @@ -142,93 +146,116 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID // mutex has been released, or we would continue to hold the lock until the // entire file has been fetched, which may be slow, and would prevent other // files from being fetched in parallel. - it, err := c.prepare(ctx, db, fileID).Load() + e := c.prepare(ctx, db, fileID) + ev, err := e.value.Load() if err != nil { - c.release(fileID) + c.purge(fileID) + return nil, err + } + + // We always run the fetch under a system context and actor, so we need to check the caller's + // context manually before returning. + + // Check if the caller's context was canceled + if err := ctx.Err(); err != nil { return nil, err } + // Check that the caller is authorized to access the file subject, ok := dbauthz.ActorFromContext(ctx) if !ok { return nil, dbauthz.ErrNoActor } - // Always check the caller can actually read the file. - if err := c.authz.Authorize(ctx, subject, policy.ActionRead, it.Object); err != nil { - c.release(fileID) + if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil { + e.close() return nil, err } - var once sync.Once + var closeOnce sync.Once return &CloseFS{ - FS: it.FS, + FS: ev.FS, close: func() { // sync.Once makes the Close() idempotent, so we can call it // multiple times without worrying about double-releasing. - once.Do(func() { c.release(fileID) }) + closeOnce.Do(func() { + e.close() + }) }, }, nil } -func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *lazy.ValueWithError[CacheEntryValue] { +func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *cacheEntry { c.lock.Lock() defer c.lock.Unlock() hitLabel := "true" entry, ok := c.data[fileID] if !ok { - value := lazy.NewWithError(func() (CacheEntryValue, error) { - val, err := fetch(ctx, db, fileID) + hitLabel = "false" - // Always add to the cache size the bytes of the file loaded. - if err == nil { + var releaseOnce sync.Once + entry = &cacheEntry{ + refCount: 0, + value: lazy.NewWithError(func() (CacheEntryValue, error) { + val, err := fetch(db, fileID) + if err != nil { + // Force future calls to Acquire to trigger a new fetch as soon as + // a fetch has failed, even if references are still held. + delete(c.data, fileID) + return val, err + } + + // Add the size of the file to the cache size metrics. c.currentCacheSize.Add(float64(val.Size)) c.totalCacheSize.Add(float64(val.Size)) - } - return val, err - }) + return val, err + }), - entry = &cacheEntry{ - value: value, - refCount: 0, + close: func() { + entry.lock.Lock() + defer entry.lock.Unlock() + + entry.refCount-- + c.currentOpenFileReferences.Dec() + + if entry.refCount == 0 { + releaseOnce.Do(func() { + c.purge(fileID) + }) + } + }, } c.data[fileID] = entry + c.currentOpenFiles.Inc() c.totalOpenedFiles.Inc() - hitLabel = "false" } + entry.lock.Lock() + defer entry.lock.Unlock() c.currentOpenFileReferences.Inc() c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc() entry.refCount++ - return entry.value + return entry } -// release decrements the reference count for the given fileID, and frees the -// backing data if there are no further references being held. -// -// release should only be called after a successful call to Acquire using the Release() -// method on the returned *CloseFS. -func (c *Cache) release(fileID uuid.UUID) { +// purge immediately removes an entry from the cache. It should be called +func (c *Cache) purge(fileID uuid.UUID) { c.lock.Lock() defer c.lock.Unlock() entry, ok := c.data[fileID] if !ok { - // If we land here, it's almost certainly because a bug already happened, - // and we're freeing something that's already been freed, or we're calling - // this function with an incorrect ID. Should this function return an error? - return - } - - c.currentOpenFileReferences.Dec() - entry.refCount-- - if entry.refCount > 0 { + // If we land here, it's probably because of a fetch attempt that + // resulted in an error, and got purged already. It may also be an + // erroneous extra close, but we can't really distinguish between those + // two cases currently. return } + // Purge the file from the cache. c.currentOpenFiles.Dec() - ev, err := entry.value.Load() if err == nil { c.currentCacheSize.Add(-1 * float64(ev.Size)) @@ -246,11 +273,18 @@ func (c *Cache) Count() int { return len(c.data) } -func fetch(ctx context.Context, store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { - // Make sure the read does not fail due to authorization issues. - // Authz is checked on the Acquire call, so this is safe. +func fetch(store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { + // Because many callers can be waiting on the same file fetch concurrently, we + // want to prevent any failures that would cause them all to receive errors + // because the caller who initiated the fetch would fail. + // - We always run the fetch with an uncancelable context, and then check + // context cancellation for each acquirer afterwards. + // - We always run the fetch as a system user, and then check authorization + // for each acquirer afterwards. + // This prevents a canceled context or an unauthorized user from "holding up + // the queue". //nolint:gocritic - file, err := store.GetFileByID(dbauthz.AsFileReader(ctx), fileID) + file, err := store.GetFileByID(dbauthz.AsFileReader(context.Background()), fileID) if err != nil { return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err) } From 610740a330de7649728da317ef2e7492bb8d375e Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 24 Jun 2025 22:48:45 +0000 Subject: [PATCH 03/19] proper release gating --- coderd/files/cache.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index d139c15117c94..44f914d2ed596 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -194,6 +194,11 @@ func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID hitLabel = "false" var releaseOnce sync.Once + release := func() { + releaseOnce.Do(func() { + c.purge(fileID) + }) + } entry = &cacheEntry{ refCount: 0, value: lazy.NewWithError(func() (CacheEntryValue, error) { @@ -201,7 +206,8 @@ func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID if err != nil { // Force future calls to Acquire to trigger a new fetch as soon as // a fetch has failed, even if references are still held. - delete(c.data, fileID) + entry.close() + release() return val, err } @@ -218,12 +224,11 @@ func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID entry.refCount-- c.currentOpenFileReferences.Dec() - - if entry.refCount == 0 { - releaseOnce.Do(func() { - c.purge(fileID) - }) + if entry.refCount > 0 { + return } + + release() }, } c.data[fileID] = entry From ec534596493b0de45e93a1619b3f30b8bfc2b7d2 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 24 Jun 2025 23:11:37 +0000 Subject: [PATCH 04/19] lint --- coderd/files/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 44f914d2ed596..4084ec77a34b6 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -146,7 +146,7 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID // mutex has been released, or we would continue to hold the lock until the // entire file has been fetched, which may be slow, and would prevent other // files from being fetched in parallel. - e := c.prepare(ctx, db, fileID) + e := c.prepare(db, fileID) ev, err := e.value.Load() if err != nil { c.purge(fileID) @@ -184,7 +184,7 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID }, nil } -func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *cacheEntry { +func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { c.lock.Lock() defer c.lock.Unlock() From df7acff875ed99a2bfb39acde732f7b3b3f55908 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 24 Jun 2025 23:45:10 +0000 Subject: [PATCH 05/19] I win at debugging deadlocks today --- coderd/files/cache.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 4084ec77a34b6..a0272c7055b49 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -114,6 +114,7 @@ type cacheEntry struct { value *lazy.ValueWithError[CacheEntryValue] close func() + purge func() } type CacheEntryValue struct { @@ -149,7 +150,8 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID e := c.prepare(db, fileID) ev, err := e.value.Load() if err != nil { - c.purge(fileID) + e.close() + e.purge() return nil, err } @@ -193,9 +195,9 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { if !ok { hitLabel = "false" - var releaseOnce sync.Once - release := func() { - releaseOnce.Do(func() { + var purgeOnce sync.Once + purge := func() { + purgeOnce.Do(func() { c.purge(fileID) }) } @@ -204,10 +206,6 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { value: lazy.NewWithError(func() (CacheEntryValue, error) { val, err := fetch(db, fileID) if err != nil { - // Force future calls to Acquire to trigger a new fetch as soon as - // a fetch has failed, even if references are still held. - entry.close() - release() return val, err } @@ -228,8 +226,10 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { return } - release() + purge() }, + + purge: purge, } c.data[fileID] = entry @@ -245,7 +245,8 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { return entry } -// purge immediately removes an entry from the cache. It should be called +// purge immediately removes an entry from the cache, even if it has open references. +// It should only be called from the `close` function in a `cacheEntry`. func (c *Cache) purge(fileID uuid.UUID) { c.lock.Lock() defer c.lock.Unlock() From 2ecb1d74dca3bc29c97d58586033fbc84ebc45ea Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 24 Jun 2025 22:38:41 +0000 Subject: [PATCH 06/19] chore: purge file cache entries on error --- coderd/files/cache.go | 226 ++++++++++++++++++++++++------------------ 1 file changed, 130 insertions(+), 96 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 3698aac9286c8..d139c15117c94 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -25,60 +25,61 @@ type FileAcquirer interface { // New returns a file cache that will fetch files from a database func New(registerer prometheus.Registerer, authz rbac.Authorizer) *Cache { - return (&Cache{ - lock: sync.Mutex{}, - data: make(map[uuid.UUID]*cacheEntry), - authz: authz, - }).registerMetrics(registerer) + return &Cache{ + lock: sync.Mutex{}, + data: make(map[uuid.UUID]*cacheEntry), + authz: authz, + cacheMetrics: newCacheMetrics(registerer), + } } -func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache { +func newCacheMetrics(registerer prometheus.Registerer) cacheMetrics { subsystem := "file_cache" f := promauto.With(registerer) - c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_size_bytes_current", - Help: "The current amount of memory of all files currently open in the file cache.", - }) - - c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_size_bytes_total", - Help: "The total amount of memory ever opened in the file cache. This number never decrements.", - }) - - c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_current", - Help: "The count of unique files currently open in the file cache.", - }) - - c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_files_total", - Help: "The total count of unique files ever opened in the file cache.", - }) - - c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_file_refs_current", - Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", - }) - - c.totalOpenFileReferences = f.NewCounterVec(prometheus.CounterOpts{ - Namespace: "coderd", - Subsystem: subsystem, - Name: "open_file_refs_total", - Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.", - }, []string{"hit"}) - - return c + return cacheMetrics{ + currentCacheSize: f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_size_bytes_current", + Help: "The current amount of memory of all files currently open in the file cache.", + }), + + totalCacheSize: f.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_size_bytes_total", + Help: "The total amount of memory ever opened in the file cache. This number never decrements.", + }), + + currentOpenFiles: f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_current", + Help: "The count of unique files currently open in the file cache.", + }), + + totalOpenedFiles: f.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_total", + Help: "The total count of unique files ever opened in the file cache.", + }), + + currentOpenFileReferences: f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_file_refs_current", + Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", + }), + + totalOpenFileReferences: f.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_file_refs_total", + Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.", + }, []string{"hit"}), + } } // Cache persists the files for template versions, and is used by dynamic @@ -106,18 +107,21 @@ type cacheMetrics struct { totalCacheSize prometheus.Counter } +type cacheEntry struct { + // refCount must only be accessed while the cacheEntry lock is held. + lock sync.Mutex + refCount int + value *lazy.ValueWithError[CacheEntryValue] + + close func() +} + type CacheEntryValue struct { fs.FS Object rbac.Object Size int64 } -type cacheEntry struct { - // refCount must only be accessed while the Cache lock is held. - refCount int - value *lazy.ValueWithError[CacheEntryValue] -} - var _ fs.FS = (*CloseFS)(nil) // CloseFS is a wrapper around fs.FS that implements io.Closer. The Close() @@ -142,93 +146,116 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID // mutex has been released, or we would continue to hold the lock until the // entire file has been fetched, which may be slow, and would prevent other // files from being fetched in parallel. - it, err := c.prepare(ctx, db, fileID).Load() + e := c.prepare(ctx, db, fileID) + ev, err := e.value.Load() if err != nil { - c.release(fileID) + c.purge(fileID) + return nil, err + } + + // We always run the fetch under a system context and actor, so we need to check the caller's + // context manually before returning. + + // Check if the caller's context was canceled + if err := ctx.Err(); err != nil { return nil, err } + // Check that the caller is authorized to access the file subject, ok := dbauthz.ActorFromContext(ctx) if !ok { return nil, dbauthz.ErrNoActor } - // Always check the caller can actually read the file. - if err := c.authz.Authorize(ctx, subject, policy.ActionRead, it.Object); err != nil { - c.release(fileID) + if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil { + e.close() return nil, err } - var once sync.Once + var closeOnce sync.Once return &CloseFS{ - FS: it.FS, + FS: ev.FS, close: func() { // sync.Once makes the Close() idempotent, so we can call it // multiple times without worrying about double-releasing. - once.Do(func() { c.release(fileID) }) + closeOnce.Do(func() { + e.close() + }) }, }, nil } -func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *lazy.ValueWithError[CacheEntryValue] { +func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *cacheEntry { c.lock.Lock() defer c.lock.Unlock() hitLabel := "true" entry, ok := c.data[fileID] if !ok { - value := lazy.NewWithError(func() (CacheEntryValue, error) { - val, err := fetch(ctx, db, fileID) + hitLabel = "false" - // Always add to the cache size the bytes of the file loaded. - if err == nil { + var releaseOnce sync.Once + entry = &cacheEntry{ + refCount: 0, + value: lazy.NewWithError(func() (CacheEntryValue, error) { + val, err := fetch(db, fileID) + if err != nil { + // Force future calls to Acquire to trigger a new fetch as soon as + // a fetch has failed, even if references are still held. + delete(c.data, fileID) + return val, err + } + + // Add the size of the file to the cache size metrics. c.currentCacheSize.Add(float64(val.Size)) c.totalCacheSize.Add(float64(val.Size)) - } - return val, err - }) + return val, err + }), - entry = &cacheEntry{ - value: value, - refCount: 0, + close: func() { + entry.lock.Lock() + defer entry.lock.Unlock() + + entry.refCount-- + c.currentOpenFileReferences.Dec() + + if entry.refCount == 0 { + releaseOnce.Do(func() { + c.purge(fileID) + }) + } + }, } c.data[fileID] = entry + c.currentOpenFiles.Inc() c.totalOpenedFiles.Inc() - hitLabel = "false" } + entry.lock.Lock() + defer entry.lock.Unlock() c.currentOpenFileReferences.Inc() c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc() entry.refCount++ - return entry.value + return entry } -// release decrements the reference count for the given fileID, and frees the -// backing data if there are no further references being held. -// -// release should only be called after a successful call to Acquire using the Release() -// method on the returned *CloseFS. -func (c *Cache) release(fileID uuid.UUID) { +// purge immediately removes an entry from the cache. It should be called +func (c *Cache) purge(fileID uuid.UUID) { c.lock.Lock() defer c.lock.Unlock() entry, ok := c.data[fileID] if !ok { - // If we land here, it's almost certainly because a bug already happened, - // and we're freeing something that's already been freed, or we're calling - // this function with an incorrect ID. Should this function return an error? - return - } - - c.currentOpenFileReferences.Dec() - entry.refCount-- - if entry.refCount > 0 { + // If we land here, it's probably because of a fetch attempt that + // resulted in an error, and got purged already. It may also be an + // erroneous extra close, but we can't really distinguish between those + // two cases currently. return } + // Purge the file from the cache. c.currentOpenFiles.Dec() - ev, err := entry.value.Load() if err == nil { c.currentCacheSize.Add(-1 * float64(ev.Size)) @@ -246,11 +273,18 @@ func (c *Cache) Count() int { return len(c.data) } -func fetch(ctx context.Context, store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { - // Make sure the read does not fail due to authorization issues. - // Authz is checked on the Acquire call, so this is safe. +func fetch(store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { + // Because many callers can be waiting on the same file fetch concurrently, we + // want to prevent any failures that would cause them all to receive errors + // because the caller who initiated the fetch would fail. + // - We always run the fetch with an uncancelable context, and then check + // context cancellation for each acquirer afterwards. + // - We always run the fetch as a system user, and then check authorization + // for each acquirer afterwards. + // This prevents a canceled context or an unauthorized user from "holding up + // the queue". //nolint:gocritic - file, err := store.GetFileByID(dbauthz.AsFileReader(ctx), fileID) + file, err := store.GetFileByID(dbauthz.AsFileReader(context.Background()), fileID) if err != nil { return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err) } From db89836190f4d4e26e778285b299be59e2e7d9c4 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Wed, 25 Jun 2025 18:49:54 +0000 Subject: [PATCH 07/19] hmm... --- coderd/files/cache.go | 15 ++++++++------- coderd/files/cache_test.go | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index a0272c7055b49..b803c010a9f58 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -160,12 +160,14 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID // Check if the caller's context was canceled if err := ctx.Err(); err != nil { + e.close() return nil, err } // Check that the caller is authorized to access the file subject, ok := dbauthz.ActorFromContext(ctx) if !ok { + e.close() return nil, dbauthz.ErrNoActor } if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil { @@ -196,11 +198,6 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { hitLabel = "false" var purgeOnce sync.Once - purge := func() { - purgeOnce.Do(func() { - c.purge(fileID) - }) - } entry = &cacheEntry{ refCount: 0, value: lazy.NewWithError(func() (CacheEntryValue, error) { @@ -226,10 +223,14 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { return } - purge() + entry.purge() }, - purge: purge, + purge: func() { + purgeOnce.Do(func() { + c.purge(fileID) + }) + }, } c.data[fileID] = entry diff --git a/coderd/files/cache_test.go b/coderd/files/cache_test.go index 696dd06ce0c65..ce25d664cbc60 100644 --- a/coderd/files/cache_test.go +++ b/coderd/files/cache_test.go @@ -61,8 +61,8 @@ func TestCancelledFetch(t *testing.T) { // First call that will fail wg.Add(1) go func() { - close(rdy) _, err := cache.Acquire(ctx, dbM, fileID) + close(rdy) assert.ErrorIs(t, err, context.Canceled) wg.Done() }() From aab03354ffa96e8c59e4d19f99fdae002baedaf4 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Wed, 25 Jun 2025 19:30:33 +0000 Subject: [PATCH 08/19] at this point the calls are just serialized anyway --- coderd/files/cache_test.go | 34 ++++++++-------------------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/coderd/files/cache_test.go b/coderd/files/cache_test.go index ce25d664cbc60..5c63d9e4c12a2 100644 --- a/coderd/files/cache_test.go +++ b/coderd/files/cache_test.go @@ -2,7 +2,6 @@ package files_test import ( "context" - "sync" "sync/atomic" "testing" "time" @@ -35,7 +34,6 @@ func TestCancelledFetch(t *testing.T) { t.Parallel() fileID := uuid.New() - rdy := make(chan struct{}) dbM := dbmock.NewMockStore(gomock.NewController(t)) // First call should fail @@ -56,33 +54,17 @@ func TestCancelledFetch(t *testing.T) { ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) - var wg sync.WaitGroup - // First call that will fail - wg.Add(1) - go func() { - _, err := cache.Acquire(ctx, dbM, fileID) - close(rdy) - assert.ErrorIs(t, err, context.Canceled) - wg.Done() - }() + _, err := cache.Acquire(ctx, dbM, fileID) + assert.ErrorIs(t, err, context.Canceled) // Second call, that should succeed - wg.Add(1) - go func() { - // Wait until the first goroutine has started - <-rdy - fs, err := cache.Acquire(ctx, dbM, fileID) - assert.NoError(t, err) - if fs != nil { - fs.Close() - } - wg.Done() - }() - - // We need that second Acquire call to be queued up - time.Sleep(testutil.IntervalFast) - wg.Wait() + // Wait until the first goroutine has started + fs, err := cache.Acquire(ctx, dbM, fileID) + assert.NoError(t, err) + if fs != nil { + fs.Close() + } } // nolint:paralleltest,tparallel // Serially testing is easier From 78dedaaf0912ccd7feb1525998fb036ead7fbe1c Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Fri, 27 Jun 2025 23:01:57 +0000 Subject: [PATCH 09/19] concurrency be like --- coderd/files/cache.go | 26 +++++------ coderd/files/cache_test.go | 93 ++++++++++++++++++++++++++------------ 2 files changed, 76 insertions(+), 43 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index b803c010a9f58..2c56a4a804005 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -5,6 +5,7 @@ import ( "context" "io/fs" "sync" + "sync/atomic" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" @@ -108,9 +109,7 @@ type cacheMetrics struct { } type cacheEntry struct { - // refCount must only be accessed while the cacheEntry lock is held. - lock sync.Mutex - refCount int + refCount atomic.Int32 value *lazy.ValueWithError[CacheEntryValue] close func() @@ -199,7 +198,6 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { var purgeOnce sync.Once entry = &cacheEntry{ - refCount: 0, value: lazy.NewWithError(func() (CacheEntryValue, error) { val, err := fetch(db, fileID) if err != nil { @@ -214,12 +212,14 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { }), close: func() { - entry.lock.Lock() - defer entry.lock.Unlock() - - entry.refCount-- + entry.refCount.Add(-1) c.currentOpenFileReferences.Dec() - if entry.refCount > 0 { + // Safety: Another thread could grab a reference to this value between + // this check and entering `purge`, which will grab the cache lock. This + // is annoying, and may lead to temporary duplication of the file in + // memory, but is better than the deadlocking potential of other + // approaches we tried to solve this. + if entry.refCount.Load() > 0 { return } @@ -238,16 +238,14 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { c.totalOpenedFiles.Inc() } - entry.lock.Lock() - defer entry.lock.Unlock() c.currentOpenFileReferences.Inc() c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc() - entry.refCount++ + entry.refCount.Add(1) return entry } -// purge immediately removes an entry from the cache, even if it has open references. -// It should only be called from the `close` function in a `cacheEntry`. +// purge immediately removes an entry from the cache, even if it has open +// references. func (c *Cache) purge(fileID uuid.UUID) { c.lock.Lock() defer c.lock.Unlock() diff --git a/coderd/files/cache_test.go b/coderd/files/cache_test.go index 5c63d9e4c12a2..fca4aea12c2d3 100644 --- a/coderd/files/cache_test.go +++ b/coderd/files/cache_test.go @@ -2,6 +2,7 @@ package files_test import ( "context" + "sync" "sync/atomic" "testing" "time" @@ -33,38 +34,72 @@ import ( func TestCancelledFetch(t *testing.T) { t.Parallel() - fileID := uuid.New() - dbM := dbmock.NewMockStore(gomock.NewController(t)) - - // First call should fail - dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { - // Wait long enough for the second call to be queued up. - return database.File{}, context.Canceled + t.Run("canceled gets canceled", func(t *testing.T) { + fileID := uuid.New() + dbM := dbmock.NewMockStore(gomock.NewController(t)) + + // The file fetch should succeed. + dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { + return database.File{ + ID: fileID, + Data: make([]byte, 100), + }, nil + }) + + //nolint:gocritic // Unit testing + cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) + + // Cancel the context for the first call; should fail. + ctx, cancel := context.WithCancel(dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort))) + cancel() + _, err := cache.Acquire(ctx, dbM, fileID) + assert.ErrorIs(t, err, context.Canceled) }) - // Second call should succeed - dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { - return database.File{ - ID: fileID, - Data: make([]byte, 100), - }, nil - }) + t.Run("cancelation doesn't hold up the queue", func(t *testing.T) { + t.Skip() + fileID := uuid.New() + dbM := dbmock.NewMockStore(gomock.NewController(t)) + + // The file fetch should succeed. + dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { + return database.File{ + ID: fileID, + Data: make([]byte, 100), + }, nil + }) + + //nolint:gocritic // Unit testing + cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) + + var wg sync.WaitGroup + + // Cancel the context for the first call; should fail. + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithCancel(dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort))) + cancel() + _, err := cache.Acquire(ctx, dbM, fileID) + assert.ErrorIs(t, err, context.Canceled) + }() + + // Second call, that should succeed without fetching from the database again + // since the cache should be populated by the fetch the first request started + // even if it doesn't wait for completion. + wg.Add(1) + go func() { + defer wg.Done() + ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) + fs, err := cache.Acquire(ctx, dbM, fileID) + assert.NoError(t, err) + if fs != nil { + fs.Close() + } + }() - //nolint:gocritic // Unit testing - ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) - cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) - - // First call that will fail - _, err := cache.Acquire(ctx, dbM, fileID) - assert.ErrorIs(t, err, context.Canceled) - - // Second call, that should succeed - // Wait until the first goroutine has started - fs, err := cache.Acquire(ctx, dbM, fileID) - assert.NoError(t, err) - if fs != nil { - fs.Close() - } + wg.Wait() + }) } // nolint:paralleltest,tparallel // Serially testing is easier From 94ce6786672bd7f5c2c80185b951d269d6a7193b Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Mon, 30 Jun 2025 15:59:31 +0000 Subject: [PATCH 10/19] add comment --- coderd/files/cache.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 2c56a4a804005..9d3d23386d114 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -154,10 +154,12 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID return nil, err } - // We always run the fetch under a system context and actor, so we need to check the caller's - // context manually before returning. + // We always run the fetch under a system context and actor, so we need to + // check the caller's context (including the actor) manually before returning. - // Check if the caller's context was canceled + // Check if the caller's context was canceled. Even though `Authorize` takes + // a context, we still check it manually first because none of our mock + // database implementations check for context cancellation. if err := ctx.Err(); err != nil { e.close() return nil, err From d12d121dc9d35e8f1cb8d95a7d4a20fedaa346f9 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Mon, 30 Jun 2025 16:44:15 +0000 Subject: [PATCH 11/19] add another concurrency test --- coderd/files/cache_test.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/coderd/files/cache_test.go b/coderd/files/cache_test.go index fca4aea12c2d3..eca3d8f4aee6f 100644 --- a/coderd/files/cache_test.go +++ b/coderd/files/cache_test.go @@ -102,6 +102,43 @@ func TestCancelledFetch(t *testing.T) { }) } +func TestConcurrentFetch(t *testing.T) { + t.Parallel() + + fileID := uuid.New() + + // Only allow one call, which should succeed + dbM := dbmock.NewMockStore(gomock.NewController(t)) + dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { + return database.File{ID: fileID}, nil + }) + + cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) + //nolint:gocritic // Unit testing + ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) + + // Expect 2 calls to Acquire before we continue the test + var hold sync.WaitGroup + hold.Add(2) + + var wg sync.WaitGroup + wg.Add(2) + + for range 2 { + go func() { + hold.Done() + hold.Wait() + _, err := cache.Acquire(ctx, dbM, fileID) + require.NoError(t, err) + wg.Done() + }() + } + + // Wait for both go routines to assert their errors and finish. + wg.Wait() + require.Equal(t, 1, cache.Count()) +} + // nolint:paralleltest,tparallel // Serially testing is easier func TestCacheRBAC(t *testing.T) { t.Parallel() From 03f9217202f599e3b0e544c5ff3a3b67eea51b1b Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Mon, 30 Jun 2025 17:54:29 +0000 Subject: [PATCH 12/19] =?UTF-8?q?think=20I=20finally=20figured=20out=20thi?= =?UTF-8?q?s=20test=20=F0=9F=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- coderd/files/cache_internal_test.go | 22 +++++ coderd/files/cache_test.go | 141 +++++++++++++++------------- 2 files changed, 98 insertions(+), 65 deletions(-) create mode 100644 coderd/files/cache_internal_test.go diff --git a/coderd/files/cache_internal_test.go b/coderd/files/cache_internal_test.go new file mode 100644 index 0000000000000..9741c36c3c202 --- /dev/null +++ b/coderd/files/cache_internal_test.go @@ -0,0 +1,22 @@ +package files + +import ( + "context" + + "github.com/coder/coder/v2/coderd/database" + "github.com/google/uuid" +) + +// LeakCache prevents entries from even being released to enable testing certain +// behaviors. +type LeakCache struct { + *Cache +} + +func (c *LeakCache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) { + // We need to call prepare first to both 1. leak a reference and 2. prevent + // the behavior of immediately closing on an error (as implented in Acquire) + // from freeing the file. + c.prepare(db, fileID) + return c.Cache.Acquire(ctx, db, fileID) +} diff --git a/coderd/files/cache_test.go b/coderd/files/cache_test.go index eca3d8f4aee6f..1b8f038b5ff42 100644 --- a/coderd/files/cache_test.go +++ b/coderd/files/cache_test.go @@ -28,78 +28,87 @@ import ( "github.com/coder/coder/v2/testutil" ) -// TestCancelledFetch runs 2 Acquire calls. The first fails with a ctx.Canceled -// error. The second call should ignore the first error and try to fetch the file -// again, which should succeed. func TestCancelledFetch(t *testing.T) { t.Parallel() - t.Run("canceled gets canceled", func(t *testing.T) { - fileID := uuid.New() - dbM := dbmock.NewMockStore(gomock.NewController(t)) + fileID := uuid.New() + dbM := dbmock.NewMockStore(gomock.NewController(t)) - // The file fetch should succeed. - dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { - return database.File{ - ID: fileID, - Data: make([]byte, 100), - }, nil - }) + // The file fetch should succeed. + dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { + return database.File{ + ID: fileID, + Data: make([]byte, 100), + }, nil + }) - //nolint:gocritic // Unit testing - cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) + cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) - // Cancel the context for the first call; should fail. - ctx, cancel := context.WithCancel(dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort))) - cancel() - _, err := cache.Acquire(ctx, dbM, fileID) - assert.ErrorIs(t, err, context.Canceled) - }) + // Cancel the context for the first call; should fail. + //nolint:gocritic // Unit testing + ctx, cancel := context.WithCancel(dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort))) + cancel() + _, err := cache.Acquire(ctx, dbM, fileID) + assert.ErrorIs(t, err, context.Canceled) +} - t.Run("cancelation doesn't hold up the queue", func(t *testing.T) { - t.Skip() - fileID := uuid.New() - dbM := dbmock.NewMockStore(gomock.NewController(t)) +// TestCancelledConcurrentFetch runs 2 Acquire calls. The first has a canceled +// context and will get a ctx.Canceled error. The second call should get a warmfirst error and try to fetch the file +// again, which should succeed. +func TestCancelledConcurrentFetch(t *testing.T) { + t.Parallel() - // The file fetch should succeed. - dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { - return database.File{ - ID: fileID, - Data: make([]byte, 100), - }, nil - }) + fileID := uuid.New() + dbM := dbmock.NewMockStore(gomock.NewController(t)) - //nolint:gocritic // Unit testing - cache := files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{}) + // The file fetch should succeed. + dbM.EXPECT().GetFileByID(gomock.Any(), gomock.Any()).DoAndReturn(func(mTx context.Context, fileID uuid.UUID) (database.File, error) { + return database.File{ + ID: fileID, + Data: make([]byte, 100), + }, nil + }) - var wg sync.WaitGroup + cache := files.LeakCache{Cache: files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{})} - // Cancel the context for the first call; should fail. - wg.Add(1) - go func() { - defer wg.Done() - ctx, cancel := context.WithCancel(dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort))) - cancel() - _, err := cache.Acquire(ctx, dbM, fileID) - assert.ErrorIs(t, err, context.Canceled) - }() + // Expect 2 calls to Acquire before we continue the test + var ( + hold sync.WaitGroup + wg sync.WaitGroup + ) - // Second call, that should succeed without fetching from the database again - // since the cache should be populated by the fetch the first request started - // even if it doesn't wait for completion. - wg.Add(1) - go func() { - defer wg.Done() - ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) - fs, err := cache.Acquire(ctx, dbM, fileID) - assert.NoError(t, err) - if fs != nil { - fs.Close() - } - }() + //nolint:gocritic // Unit testing + ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) - wg.Wait() - }) + // Cancel the context for the first call; should fail. + hold.Add(1) + // TODO: wg.Go in Go 1.25 + wg.Add(1) + go func() { + hold.Done() + hold.Wait() + defer wg.Done() + ctx, cancel := context.WithCancel(ctx) + cancel() + _, err := cache.Acquire(ctx, dbM, fileID) + require.ErrorIs(t, err, context.Canceled) + }() + + // Second call, that should succeed without fetching from the database again + // since the cache should be populated by the fetch the first request started + // even if it doesn't wait for completion. + hold.Add(1) + // TODO: wg.Go in Go 1.25 + wg.Add(1) + go func() { + hold.Done() + hold.Wait() + defer wg.Done() + _, err := cache.Acquire(ctx, dbM, fileID) + require.NoError(t, err) + }() + + wg.Wait() } func TestConcurrentFetch(t *testing.T) { @@ -118,19 +127,21 @@ func TestConcurrentFetch(t *testing.T) { ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) // Expect 2 calls to Acquire before we continue the test - var hold sync.WaitGroup - hold.Add(2) - - var wg sync.WaitGroup - wg.Add(2) + var ( + hold sync.WaitGroup + wg sync.WaitGroup + ) for range 2 { + hold.Add(1) + // TODO: wg.Go in Go 1.25 + wg.Add(1) go func() { + defer wg.Done() hold.Done() hold.Wait() _, err := cache.Acquire(ctx, dbM, fileID) require.NoError(t, err) - wg.Done() }() } From 82a3b7e266e5ce250b6e4ec2b7cba5dd22aa1006 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Mon, 30 Jun 2025 18:00:33 +0000 Subject: [PATCH 13/19] actually this doesn't need to be threaded anymore --- coderd/files/cache_test.go | 36 ++++++------------------------------ 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/coderd/files/cache_test.go b/coderd/files/cache_test.go index 1b8f038b5ff42..6f8f74e74fe8e 100644 --- a/coderd/files/cache_test.go +++ b/coderd/files/cache_test.go @@ -71,44 +71,20 @@ func TestCancelledConcurrentFetch(t *testing.T) { cache := files.LeakCache{Cache: files.New(prometheus.NewRegistry(), &coderdtest.FakeAuthorizer{})} - // Expect 2 calls to Acquire before we continue the test - var ( - hold sync.WaitGroup - wg sync.WaitGroup - ) - //nolint:gocritic // Unit testing ctx := dbauthz.AsFileReader(testutil.Context(t, testutil.WaitShort)) // Cancel the context for the first call; should fail. - hold.Add(1) - // TODO: wg.Go in Go 1.25 - wg.Add(1) - go func() { - hold.Done() - hold.Wait() - defer wg.Done() - ctx, cancel := context.WithCancel(ctx) - cancel() - _, err := cache.Acquire(ctx, dbM, fileID) - require.ErrorIs(t, err, context.Canceled) - }() + canceledCtx, cancel := context.WithCancel(ctx) + cancel() + _, err := cache.Acquire(canceledCtx, dbM, fileID) + require.ErrorIs(t, err, context.Canceled) // Second call, that should succeed without fetching from the database again // since the cache should be populated by the fetch the first request started // even if it doesn't wait for completion. - hold.Add(1) - // TODO: wg.Go in Go 1.25 - wg.Add(1) - go func() { - hold.Done() - hold.Wait() - defer wg.Done() - _, err := cache.Acquire(ctx, dbM, fileID) - require.NoError(t, err) - }() - - wg.Wait() + _, err = cache.Acquire(ctx, dbM, fileID) + require.NoError(t, err) } func TestConcurrentFetch(t *testing.T) { From 3764134ac5fa4b0d5e2a979e7916a5b0e34b55d4 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Mon, 30 Jun 2025 18:28:18 +0000 Subject: [PATCH 14/19] lint --- coderd/files/cache_internal_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/coderd/files/cache_internal_test.go b/coderd/files/cache_internal_test.go index 9741c36c3c202..ab203b385e72c 100644 --- a/coderd/files/cache_internal_test.go +++ b/coderd/files/cache_internal_test.go @@ -3,8 +3,9 @@ package files import ( "context" - "github.com/coder/coder/v2/coderd/database" "github.com/google/uuid" + + "github.com/coder/coder/v2/coderd/database" ) // LeakCache prevents entries from even being released to enable testing certain From ac52626b4960acb72c2eea651c02f46aba2b13a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=82=B1=E3=82=A4=E3=83=A9?= Date: Tue, 1 Jul 2025 10:26:41 -0600 Subject: [PATCH 15/19] Update coderd/files/cache_internal_test.go Co-authored-by: Steven Masley --- coderd/files/cache_internal_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/files/cache_internal_test.go b/coderd/files/cache_internal_test.go index ab203b385e72c..89348c65a2f20 100644 --- a/coderd/files/cache_internal_test.go +++ b/coderd/files/cache_internal_test.go @@ -16,7 +16,7 @@ type LeakCache struct { func (c *LeakCache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) { // We need to call prepare first to both 1. leak a reference and 2. prevent - // the behavior of immediately closing on an error (as implented in Acquire) + // the behavior of immediately closing on an error (as implemented in Acquire) // from freeing the file. c.prepare(db, fileID) return c.Cache.Acquire(ctx, db, fileID) From 725696f1421b8050c7cc2a2876707316d9ea21f2 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 1 Jul 2025 16:48:46 +0000 Subject: [PATCH 16/19] one lock to rule them all --- coderd/files/cache.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 9d3d23386d114..240eab183f9d5 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -5,7 +5,6 @@ import ( "context" "io/fs" "sync" - "sync/atomic" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" @@ -109,10 +108,12 @@ type cacheMetrics struct { } type cacheEntry struct { - refCount atomic.Int32 + refCount int value *lazy.ValueWithError[CacheEntryValue] + // Safety: Must only be called while the Cache lock is held close func() + // Safety: Must only be called while the Cache lock is held purge func() } @@ -133,7 +134,9 @@ type CloseFS struct { close func() } -func (f *CloseFS) Close() { f.close() } +func (f *CloseFS) Close() { + f.close() +} // Acquire will load the fs.FS for the given file. It guarantees that parallel // calls for the same fileID will only result in one fetch, and that parallel @@ -149,6 +152,8 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID e := c.prepare(db, fileID) ev, err := e.value.Load() if err != nil { + c.lock.Lock() + defer c.lock.Unlock() e.close() e.purge() return nil, err @@ -183,6 +188,8 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID // sync.Once makes the Close() idempotent, so we can call it // multiple times without worrying about double-releasing. closeOnce.Do(func() { + c.lock.Lock() + defer c.lock.Unlock() e.close() }) }, @@ -214,14 +221,14 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { }), close: func() { - entry.refCount.Add(-1) + entry.refCount-- c.currentOpenFileReferences.Dec() // Safety: Another thread could grab a reference to this value between // this check and entering `purge`, which will grab the cache lock. This // is annoying, and may lead to temporary duplication of the file in // memory, but is better than the deadlocking potential of other // approaches we tried to solve this. - if entry.refCount.Load() > 0 { + if entry.refCount > 0 { return } @@ -242,16 +249,14 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { c.currentOpenFileReferences.Inc() c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc() - entry.refCount.Add(1) + entry.refCount++ return entry } // purge immediately removes an entry from the cache, even if it has open // references. +// Safety: Must only be called while the Cache lock is held func (c *Cache) purge(fileID uuid.UUID) { - c.lock.Lock() - defer c.lock.Unlock() - entry, ok := c.data[fileID] if !ok { // If we land here, it's probably because of a fetch attempt that From 9cda09f4592c6bf19e2275e8fdcda05133612c60 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 1 Jul 2025 17:26:34 +0000 Subject: [PATCH 17/19] update some comments --- coderd/files/cache.go | 1 + 1 file changed, 1 insertion(+) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 240eab183f9d5..6fc8c537e5d28 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -108,6 +108,7 @@ type cacheMetrics struct { } type cacheEntry struct { + // refCount must only be accessed while the Cache lock is held. refCount int value *lazy.ValueWithError[CacheEntryValue] From 98f18b1c69236c4259938217551ce92f293ef10f Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 1 Jul 2025 18:59:49 +0000 Subject: [PATCH 18/19] wait I thought I committed this --- coderd/files/cache.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 6fc8c537e5d28..e7bf4a1d987cb 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -108,13 +108,13 @@ type cacheMetrics struct { } type cacheEntry struct { - // refCount must only be accessed while the Cache lock is held. + // Safety: refCount must only be accessed while the Cache lock is held. refCount int value *lazy.ValueWithError[CacheEntryValue] - // Safety: Must only be called while the Cache lock is held + // Safety: close must only be called while the Cache lock is held close func() - // Safety: Must only be called while the Cache lock is held + // Safety: purge must only be called while the Cache lock is held purge func() } @@ -143,8 +143,8 @@ func (f *CloseFS) Close() { // calls for the same fileID will only result in one fetch, and that parallel // calls for distinct fileIDs will fetch in parallel. // -// Safety: Every call to Acquire that does not return an error must have a -// matching call to Release. +// Safety: Every call to Acquire that does not return an error must call close +// on the returned value when it is done being used. func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) { // It's important that this `Load` call occurs outside `prepare`, after the // mutex has been released, or we would continue to hold the lock until the @@ -224,11 +224,6 @@ func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { close: func() { entry.refCount-- c.currentOpenFileReferences.Dec() - // Safety: Another thread could grab a reference to this value between - // this check and entering `purge`, which will grab the cache lock. This - // is annoying, and may lead to temporary duplication of the file in - // memory, but is better than the deadlocking potential of other - // approaches we tried to solve this. if entry.refCount > 0 { return } From 00fb0b0d805f03459ee647ce25668f77513f5534 Mon Sep 17 00:00:00 2001 From: McKayla Washburn Date: Tue, 1 Jul 2025 19:23:03 +0000 Subject: [PATCH 19/19] lock on close --- coderd/files/cache.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/coderd/files/cache.go b/coderd/files/cache.go index e7bf4a1d987cb..159f1b8aee053 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -160,6 +160,12 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID return nil, err } + cleanup := func() { + c.lock.Lock() + defer c.lock.Unlock() + e.close() + } + // We always run the fetch under a system context and actor, so we need to // check the caller's context (including the actor) manually before returning. @@ -167,18 +173,18 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID // a context, we still check it manually first because none of our mock // database implementations check for context cancellation. if err := ctx.Err(); err != nil { - e.close() + cleanup() return nil, err } // Check that the caller is authorized to access the file subject, ok := dbauthz.ActorFromContext(ctx) if !ok { - e.close() + cleanup() return nil, dbauthz.ErrNoActor } if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil { - e.close() + cleanup() return nil, err }