Skip to content

Commit 89b1757

Browse files
committed
chore: wire up usage tracking for managed agents
1 parent e376311 commit 89b1757

File tree

10 files changed

+190
-53
lines changed

10 files changed

+190
-53
lines changed

coderd/coderd.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/coder/coder/v2/coderd/oauth2provider"
2323
"github.com/coder/coder/v2/coderd/prebuilds"
24+
"github.com/coder/coder/v2/coderd/usage"
2425
"github.com/coder/coder/v2/coderd/wsbuilder"
2526

2627
"github.com/andybalholm/brotli"
@@ -198,6 +199,7 @@ type Options struct {
198199
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
199200
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
200201
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
202+
UsageCollector *atomic.Pointer[usage.Collector]
201203
// CoordinatorResumeTokenProvider is used to provide and validate resume
202204
// tokens issued by and passed to the coordinator DRPC API.
203205
CoordinatorResumeTokenProvider tailnet.ResumeTokenProvider
@@ -426,6 +428,13 @@ func New(options *Options) *API {
426428
v := schedule.NewAGPLUserQuietHoursScheduleStore()
427429
options.UserQuietHoursScheduleStore.Store(&v)
428430
}
431+
if options.UsageCollector == nil {
432+
options.UsageCollector = &atomic.Pointer[usage.Collector]{}
433+
}
434+
if options.UsageCollector.Load() == nil {
435+
collector := usage.NewAGPLCollector()
436+
options.UsageCollector.Store(&collector)
437+
}
429438
if options.OneTimePasscodeValidityPeriod == 0 {
430439
options.OneTimePasscodeValidityPeriod = 20 * time.Minute
431440
}
@@ -588,6 +597,7 @@ func New(options *Options) *API {
588597
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
589598
AccessControlStore: options.AccessControlStore,
590599
BuildUsageChecker: &buildUsageChecker,
600+
UsageCollector: options.UsageCollector,
591601
FileCache: files.New(options.PrometheusRegistry, options.Authorizer),
592602
Experiments: experiments,
593603
WebpushDispatcher: options.WebPushDispatcher,
@@ -1662,6 +1672,9 @@ type API struct {
16621672
// BuildUsageChecker is a pointer as it's passed around to multiple
16631673
// components.
16641674
BuildUsageChecker *atomic.Pointer[wsbuilder.UsageChecker]
1675+
// UsageCollector is a pointer to an atomic pointer because it is passed to
1676+
// multiple components.
1677+
UsageCollector *atomic.Pointer[usage.Collector]
16651678

16661679
UpdatesProvider tailnet.WorkspaceUpdatesProvider
16671680

@@ -1877,6 +1890,7 @@ func (api *API) CreateInMemoryTaggedProvisionerDaemon(dialCtx context.Context, n
18771890
&api.Auditor,
18781891
api.TemplateScheduleStore,
18791892
api.UserQuietHoursScheduleStore,
1893+
api.UsageCollector,
18801894
api.DeploymentValues,
18811895
provisionerdserver.Options{
18821896
OIDCConfig: api.OIDCConfig,

coderd/provisionerdserver/provisionerdserver.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"cdr.dev/slog"
3131

32+
"github.com/coder/coder/v2/coderd/usage"
3233
"github.com/coder/coder/v2/coderd/util/slice"
3334

3435
"github.com/coder/coder/v2/codersdk/drpcsdk"
@@ -121,6 +122,7 @@ type server struct {
121122
DeploymentValues *codersdk.DeploymentValues
122123
NotificationsEnqueuer notifications.Enqueuer
123124
PrebuildsOrchestrator *atomic.Pointer[prebuilds.ReconciliationOrchestrator]
125+
UsageCollector *atomic.Pointer[usage.Collector]
124126

125127
OIDCConfig promoauth.OAuth2Config
126128

@@ -174,6 +176,7 @@ func NewServer(
174176
auditor *atomic.Pointer[audit.Auditor],
175177
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore],
176178
userQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore],
179+
usageCollector *atomic.Pointer[usage.Collector],
177180
deploymentValues *codersdk.DeploymentValues,
178181
options Options,
179182
enqueuer notifications.Enqueuer,
@@ -195,6 +198,9 @@ func NewServer(
195198
if userQuietHoursScheduleStore == nil {
196199
return nil, xerrors.New("userQuietHoursScheduleStore is nil")
197200
}
201+
if usageCollector == nil {
202+
return nil, xerrors.New("usageCollector is nil")
203+
}
198204
if deploymentValues == nil {
199205
return nil, xerrors.New("deploymentValues is nil")
200206
}
@@ -244,6 +250,7 @@ func NewServer(
244250
heartbeatInterval: options.HeartbeatInterval,
245251
heartbeatFn: options.HeartbeatFn,
246252
PrebuildsOrchestrator: prebuildsOrchestrator,
253+
UsageCollector: usageCollector,
247254
}
248255

249256
if s.heartbeatFn == nil {
@@ -1892,6 +1899,18 @@ func (s *server) completeWorkspaceBuildJob(ctx context.Context, job database.Pro
18921899
}
18931900

18941901
sidebarAppID = uuid.NullUUID{UUID: id, Valid: true}
1902+
1903+
// Collect usage event for managed agents.
1904+
usageCollector := s.UsageCollector.Load()
1905+
if usageCollector != nil {
1906+
event := usage.DCManagedAgentsV1{
1907+
Count: 1,
1908+
}
1909+
err = (*usageCollector).CollectDiscreteUsageEvent(ctx, db, event)
1910+
if err != nil {
1911+
return xerrors.Errorf("collect %q event: %w", event.EventType(), err)
1912+
}
1913+
}
18951914
}
18961915

18971916
// Regardless of whether there is an AI task or not, update the field to indicate one way or the other since it

coderd/provisionerdserver/provisionerdserver_test.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/coder/coder/v2/coderd/schedule"
4545
"github.com/coder/coder/v2/coderd/schedule/cron"
4646
"github.com/coder/coder/v2/coderd/telemetry"
47+
"github.com/coder/coder/v2/coderd/usage"
4748
"github.com/coder/coder/v2/coderd/wspubsub"
4849
"github.com/coder/coder/v2/codersdk"
4950
"github.com/coder/coder/v2/codersdk/agentsdk"
@@ -67,6 +68,13 @@ func testUserQuietHoursScheduleStore() *atomic.Pointer[schedule.UserQuietHoursSc
6768
return ptr
6869
}
6970

71+
func testUsageCollector() *atomic.Pointer[usage.Collector] {
72+
ptr := &atomic.Pointer[usage.Collector]{}
73+
collector := usage.NewAGPLCollector()
74+
ptr.Store(&collector)
75+
return ptr
76+
}
77+
7078
func TestAcquireJob_LongPoll(t *testing.T) {
7179
t.Parallel()
7280
//nolint:dogsled
@@ -2469,7 +2477,10 @@ func TestCompleteJob(t *testing.T) {
24692477
t.Run(tc.name, func(t *testing.T) {
24702478
t.Parallel()
24712479

2472-
srv, db, _, pd := setup(t, false, &overrides{})
2480+
fakeUsageCollector, usageCollectorPtr := newFakeUsageCollector()
2481+
srv, db, _, pd := setup(t, false, &overrides{
2482+
usageCollector: usageCollectorPtr,
2483+
})
24732484

24742485
importJobID := uuid.New()
24752486
tvID := uuid.New()
@@ -2535,6 +2546,10 @@ func TestCompleteJob(t *testing.T) {
25352546
require.NoError(t, err)
25362547
require.True(t, version.HasAITask.Valid) // We ALWAYS expect a value to be set, therefore not nil, i.e. valid = true.
25372548
require.Equal(t, tc.expected, version.HasAITask.Bool)
2549+
2550+
// We never expect a usage event to be collected for
2551+
// template imports.
2552+
require.Empty(t, fakeUsageCollector.collectedEvents)
25382553
})
25392554
}
25402555
})
@@ -2576,7 +2591,10 @@ func TestCompleteJob(t *testing.T) {
25762591
t.Run(tc.name, func(t *testing.T) {
25772592
t.Parallel()
25782593

2579-
srv, db, _, pd := setup(t, false, &overrides{})
2594+
fakeUsageCollector, usageCollectorPtr := newFakeUsageCollector()
2595+
srv, db, _, pd := setup(t, false, &overrides{
2596+
usageCollector: usageCollectorPtr,
2597+
})
25802598

25812599
importJobID := uuid.New()
25822600
tvID := uuid.New()
@@ -2657,6 +2675,15 @@ func TestCompleteJob(t *testing.T) {
26572675

26582676
if tc.expected {
26592677
require.Equal(t, sidebarAppID, build.AITaskSidebarAppID.UUID.String())
2678+
2679+
// Check that a usage event was collected.
2680+
require.Len(t, fakeUsageCollector.collectedEvents, 1)
2681+
require.Equal(t, usage.DCManagedAgentsV1{
2682+
Count: 1,
2683+
}, fakeUsageCollector.collectedEvents[0])
2684+
} else {
2685+
// Check that no usage event was collected.
2686+
require.Empty(t, fakeUsageCollector.collectedEvents)
26602687
}
26612688
})
26622689
}
@@ -3582,6 +3609,7 @@ type overrides struct {
35823609
externalAuthConfigs []*externalauth.Config
35833610
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
35843611
userQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
3612+
usageCollector *atomic.Pointer[usage.Collector]
35853613
clock *quartz.Mock
35863614
acquireJobLongPollDuration time.Duration
35873615
heartbeatFn func(ctx context.Context) error
@@ -3603,6 +3631,7 @@ func setup(t *testing.T, ignoreLogErrors bool, ov *overrides) (proto.DRPCProvisi
36033631
var externalAuthConfigs []*externalauth.Config
36043632
tss := testTemplateScheduleStore()
36053633
uqhss := testUserQuietHoursScheduleStore()
3634+
usageCollector := testUsageCollector()
36063635
clock := quartz.NewReal()
36073636
pollDur := time.Duration(0)
36083637
if ov == nil {
@@ -3640,6 +3669,15 @@ func setup(t *testing.T, ignoreLogErrors bool, ov *overrides) (proto.DRPCProvisi
36403669
require.True(t, swapped)
36413670
}
36423671
}
3672+
if ov.usageCollector != nil {
3673+
tusageCollector := usageCollector.Load()
3674+
// keep the initial test value if the override hasn't set the atomic pointer.
3675+
usageCollector = ov.usageCollector
3676+
if usageCollector.Load() == nil {
3677+
swapped := usageCollector.CompareAndSwap(nil, tusageCollector)
3678+
require.True(t, swapped)
3679+
}
3680+
}
36433681
if ov.clock != nil {
36443682
clock = ov.clock
36453683
}
@@ -3695,6 +3733,7 @@ func setup(t *testing.T, ignoreLogErrors bool, ov *overrides) (proto.DRPCProvisi
36953733
auditPtr,
36963734
tss,
36973735
uqhss,
3736+
usageCollector,
36983737
deploymentValues,
36993738
provisionerdserver.Options{
37003739
ExternalAuthConfigs: externalAuthConfigs,
@@ -3809,3 +3848,22 @@ func (s *fakeStream) cancel() {
38093848
s.canceled = true
38103849
s.c.Broadcast()
38113850
}
3851+
3852+
type fakeUsageCollector struct {
3853+
collectedEvents []usage.Event
3854+
}
3855+
3856+
var _ usage.Collector = &fakeUsageCollector{}
3857+
3858+
func newFakeUsageCollector() (*fakeUsageCollector, *atomic.Pointer[usage.Collector]) {
3859+
ptr := &atomic.Pointer[usage.Collector]{}
3860+
fake := &fakeUsageCollector{}
3861+
var collector usage.Collector = fake
3862+
ptr.Store(&collector)
3863+
return fake, ptr
3864+
}
3865+
3866+
func (f *fakeUsageCollector) CollectDiscreteUsageEvent(_ context.Context, _ database.Store, event usage.DiscreteEvent) error {
3867+
f.collectedEvents = append(f.collectedEvents, event)
3868+
return nil
3869+
}

enterprise/cli/server.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/coder/coder/v2/enterprise/audit/backends"
2121
"github.com/coder/coder/v2/enterprise/coderd"
2222
"github.com/coder/coder/v2/enterprise/coderd/dormancy"
23+
"github.com/coder/coder/v2/enterprise/coderd/usage"
2324
"github.com/coder/coder/v2/enterprise/dbcrypt"
2425
"github.com/coder/coder/v2/enterprise/trialer"
2526
"github.com/coder/coder/v2/tailnet"
@@ -116,15 +117,57 @@ func (r *RootCmd) Server(_ func()) *serpent.Command {
116117
o.ExternalTokenEncryption = cs
117118
}
118119

120+
if o.LicenseKeys == nil {
121+
o.LicenseKeys = coderd.Keys
122+
}
123+
124+
multiCloser := &multiCloser{}
125+
126+
// Create the enterprise API.
119127
api, err := coderd.New(ctx, o)
120128
if err != nil {
121129
return nil, nil, err
122130
}
123-
return api.AGPL, api, nil
131+
multiCloser.Add(api)
132+
133+
// Start the enterprise usage publisher routine. This won't do anything
134+
// unless the deployment is licensed and one of the licenses has usage
135+
// publishing enabled.
136+
publisher := usage.NewTallymanPublisher(ctx, options.Logger, options.Database, o.LicenseKeys,
137+
usage.PublisherWithHTTPClient(api.HTTPClient),
138+
)
139+
err = publisher.Start()
140+
if err != nil {
141+
_ = multiCloser.Close()
142+
return nil, nil, xerrors.Errorf("start usage publisher: %w", err)
143+
}
144+
multiCloser.Add(publisher)
145+
146+
return api.AGPL, multiCloser, nil
124147
})
125148

126149
cmd.AddSubcommands(
127150
r.dbcryptCmd(),
128151
)
129152
return cmd
130153
}
154+
155+
type multiCloser struct {
156+
closers []io.Closer
157+
}
158+
159+
var _ io.Closer = &multiCloser{}
160+
161+
func (m *multiCloser) Add(closer io.Closer) {
162+
m.closers = append(m.closers, closer)
163+
}
164+
165+
func (m *multiCloser) Close() error {
166+
var mErr error
167+
for _, closer := range m.closers {
168+
if err := closer.Close(); err != nil {
169+
mErr = xerrors.Errorf("close %T: %w", closer, err)
170+
}
171+
}
172+
return mErr
173+
}

enterprise/coderd/coderd.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/coder/quartz"
@@ -22,10 +23,12 @@ import (
2223
agplportsharing "github.com/coder/coder/v2/coderd/portsharing"
2324
agplprebuilds "github.com/coder/coder/v2/coderd/prebuilds"
2425
"github.com/coder/coder/v2/coderd/rbac/policy"
26+
agplusage "github.com/coder/coder/v2/coderd/usage"
2527
"github.com/coder/coder/v2/coderd/wsbuilder"
2628
"github.com/coder/coder/v2/enterprise/coderd/connectionlog"
2729
"github.com/coder/coder/v2/enterprise/coderd/enidpsync"
2830
"github.com/coder/coder/v2/enterprise/coderd/portsharing"
31+
"github.com/coder/coder/v2/enterprise/coderd/usage"
2932

3033
"golang.org/x/xerrors"
3134
"tailscale.com/tailcfg"
@@ -90,6 +93,13 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
9093
if options.Entitlements == nil {
9194
options.Entitlements = entitlements.New()
9295
}
96+
if options.Options.UsageCollector == nil {
97+
options.Options.UsageCollector = &atomic.Pointer[agplusage.Collector]{}
98+
}
99+
if options.Options.UsageCollector.Load() == nil {
100+
collector := usage.NewDBCollector()
101+
options.Options.UsageCollector.Store(&collector)
102+
}
93103

94104
ctx, cancelFunc := context.WithCancel(ctx)
95105

enterprise/coderd/provisionerdaemons.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ func (api *API) provisionerDaemonServe(rw http.ResponseWriter, r *http.Request)
352352
&api.AGPL.Auditor,
353353
api.AGPL.TemplateScheduleStore,
354354
api.AGPL.UserQuietHoursScheduleStore,
355+
api.AGPL.UsageCollector,
355356
api.DeploymentValues,
356357
provisionerdserver.Options{
357358
ExternalAuthConfigs: api.ExternalAuthConfigs,

0 commit comments

Comments
 (0)