Skip to content

Commit 83b6690

Browse files
committed
feat: add exp scaletest task-status command
1 parent 3551500 commit 83b6690

File tree

2 files changed

+265
-0
lines changed

2 files changed

+265
-0
lines changed

cli/exp_scaletest.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (r *RootCmd) scaletestCmd() *serpent.Command {
6464
r.scaletestWorkspaceTraffic(),
6565
r.scaletestAutostart(),
6666
r.scaletestNotifications(),
67+
r.scaletestTaskStatus(),
6768
r.scaletestSMTP(),
6869
r.scaletestPrebuilds(),
6970
},

cli/exp_scaletest_taskstatus.go

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
//go:build !slim
2+
3+
package cli
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"net/http"
9+
"sync"
10+
"time"
11+
12+
"github.com/google/uuid"
13+
"github.com/prometheus/client_golang/prometheus"
14+
"github.com/prometheus/client_golang/prometheus/promhttp"
15+
"golang.org/x/xerrors"
16+
17+
"cdr.dev/slog"
18+
"cdr.dev/slog/sloggers/sloghuman"
19+
"github.com/coder/serpent"
20+
21+
"github.com/coder/coder/v2/codersdk"
22+
"github.com/coder/coder/v2/scaletest/harness"
23+
"github.com/coder/coder/v2/scaletest/taskstatus"
24+
)
25+
26+
const (
27+
taskStatusTestName = "task-status"
28+
)
29+
30+
func (r *RootCmd) scaletestTaskStatus() *serpent.Command {
31+
var (
32+
count int64
33+
template string
34+
workspaceNamePrefix string
35+
appSlug string
36+
reportStatusPeriod time.Duration
37+
reportStatusDuration time.Duration
38+
baselineDuration time.Duration
39+
tracingFlags = &scaletestTracingFlags{}
40+
prometheusFlags = &scaletestPrometheusFlags{}
41+
timeoutStrategy = &timeoutFlags{}
42+
cleanupStrategy = newScaletestCleanupStrategy()
43+
output = &scaletestOutputFlags{}
44+
)
45+
orgContext := NewOrganizationContext()
46+
47+
cmd := &serpent.Command{
48+
Use: "task-status",
49+
Short: "Generates load on the Coder server by simulating task status reporting",
50+
Long: `This test creates external workspaces and simulates AI agents reporting task status.
51+
After all runners connect, it waits for the baseline duration before triggering status reporting.`,
52+
Handler: func(inv *serpent.Invocation) error {
53+
ctx := inv.Context()
54+
55+
outputs, err := output.parse()
56+
if err != nil {
57+
return xerrors.Errorf("could not parse --output flags: %w", err)
58+
}
59+
60+
client, err := r.InitClient(inv)
61+
if err != nil {
62+
return err
63+
}
64+
65+
org, err := orgContext.Selected(inv, client)
66+
if err != nil {
67+
return err
68+
}
69+
70+
_, err = requireAdmin(ctx, client)
71+
if err != nil {
72+
return err
73+
}
74+
75+
// Disable rate limits for this test
76+
client.HTTPClient = &http.Client{
77+
Transport: &codersdk.HeaderTransport{
78+
Transport: http.DefaultTransport,
79+
Header: map[string][]string{
80+
codersdk.BypassRatelimitHeader: {"true"},
81+
},
82+
},
83+
}
84+
85+
// Find the template
86+
tpl, err := parseTemplate(ctx, client, []uuid.UUID{org.ID}, template)
87+
if err != nil {
88+
return xerrors.Errorf("parse template %q: %w", template, err)
89+
}
90+
templateID := tpl.ID
91+
92+
reg := prometheus.NewRegistry()
93+
metrics := taskstatus.NewMetrics(reg)
94+
95+
logger := slog.Make(sloghuman.Sink(inv.Stdout)).Leveled(slog.LevelDebug)
96+
prometheusSrvClose := ServeHandler(ctx, logger, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), prometheusFlags.Address, "prometheus")
97+
defer prometheusSrvClose()
98+
99+
tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx)
100+
if err != nil {
101+
return xerrors.Errorf("create tracer provider: %w", err)
102+
}
103+
defer func() {
104+
// Allow time for traces to flush even if command context is
105+
// canceled. This is a no-op if tracing is not enabled.
106+
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
107+
if err := closeTracing(ctx); err != nil {
108+
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
109+
}
110+
// Wait for prometheus metrics to be scraped
111+
_, _ = fmt.Fprintf(inv.Stderr, "Waiting %s for prometheus metrics to be scraped\n", prometheusFlags.Wait)
112+
<-time.After(prometheusFlags.Wait)
113+
}()
114+
tracer := tracerProvider.Tracer(scaletestTracerName)
115+
116+
// Setup shared resources for coordination
117+
connectedWaitGroup := &sync.WaitGroup{}
118+
connectedWaitGroup.Add(int(count))
119+
startReporting := make(chan struct{})
120+
121+
// Create the test harness
122+
th := harness.NewTestHarness(
123+
timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}),
124+
cleanupStrategy.toStrategy(),
125+
)
126+
127+
// Create runners
128+
for i := range count {
129+
workspaceName := fmt.Sprintf("%s-%d", workspaceNamePrefix, i)
130+
cfg := taskstatus.Config{
131+
TemplateID: templateID,
132+
WorkspaceName: workspaceName,
133+
AppSlug: appSlug,
134+
ConnectedWaitGroup: connectedWaitGroup,
135+
StartReporting: startReporting,
136+
ReportStatusPeriod: reportStatusPeriod,
137+
ReportStatusDuration: reportStatusDuration,
138+
Metrics: metrics,
139+
MetricLabelValues: []string{},
140+
}
141+
142+
if err := cfg.Validate(); err != nil {
143+
return xerrors.Errorf("validate config for runner %d: %w", i, err)
144+
}
145+
146+
var runner harness.Runnable = taskstatus.NewRunner(client, cfg)
147+
if tracingEnabled {
148+
runner = &runnableTraceWrapper{
149+
tracer: tracer,
150+
spanName: fmt.Sprintf("%s/%d", taskStatusTestName, i),
151+
runner: runner,
152+
}
153+
}
154+
th.AddRun(taskStatusTestName, workspaceName, runner)
155+
}
156+
157+
// Start the test in a separate goroutine so we can coordinate timing
158+
testCtx, testCancel := timeoutStrategy.toContext(ctx)
159+
defer testCancel()
160+
testDone := make(chan error)
161+
go func() {
162+
testDone <- th.Run(testCtx)
163+
}()
164+
165+
// Wait for all runners to connect
166+
logger.Info(ctx, "waiting for all runners to connect")
167+
waitCtx, waitCancel := context.WithTimeout(ctx, 5*time.Minute)
168+
defer waitCancel()
169+
170+
connectDone := make(chan struct{})
171+
go func() {
172+
connectedWaitGroup.Wait()
173+
close(connectDone)
174+
}()
175+
176+
select {
177+
case <-waitCtx.Done():
178+
return xerrors.Errorf("timeout waiting for runners to connect")
179+
case <-connectDone:
180+
logger.Info(ctx, "all runners connected")
181+
}
182+
183+
// Wait for baseline duration
184+
logger.Info(ctx, "waiting for baseline duration", slog.F("duration", baselineDuration))
185+
select {
186+
case <-ctx.Done():
187+
return ctx.Err()
188+
case <-time.After(baselineDuration):
189+
}
190+
191+
// Trigger all runners to start reporting
192+
logger.Info(ctx, "triggering runners to start reporting task status")
193+
close(startReporting)
194+
195+
// Wait for the test to complete
196+
err = <-testDone
197+
if err != nil {
198+
return xerrors.Errorf("run test harness: %w", err)
199+
}
200+
201+
res := th.Results()
202+
for _, o := range outputs {
203+
err = o.write(res, inv.Stdout)
204+
if err != nil {
205+
return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err)
206+
}
207+
}
208+
209+
return nil
210+
},
211+
}
212+
213+
cmd.Options = serpent.OptionSet{
214+
{
215+
Flag: "count",
216+
Description: "Number of concurrent runners to create.",
217+
Default: "10",
218+
Value: serpent.Int64Of(&count),
219+
},
220+
{
221+
Flag: "template",
222+
Description: "Name or UUID of the template to use for the scale test. The template MUST include a coder_external_agent and a coder_app.",
223+
Default: "scaletest-task-status",
224+
Value: serpent.StringOf(&template),
225+
},
226+
{
227+
Flag: "workspace-name-prefix",
228+
Description: "Prefix for workspace names (will be suffixed with index).",
229+
Default: "scaletest-task-status",
230+
Value: serpent.StringOf(&workspaceNamePrefix),
231+
},
232+
{
233+
Flag: "app-slug",
234+
Description: "Slug of the app designated as the AI Agent.",
235+
Default: "ai-agent",
236+
Value: serpent.StringOf(&appSlug),
237+
},
238+
{
239+
Flag: "report-status-period",
240+
Description: "Time between reporting task statuses.",
241+
Default: "10s",
242+
Value: serpent.DurationOf(&reportStatusPeriod),
243+
},
244+
{
245+
Flag: "report-status-duration",
246+
Description: "Total time to report task statuses after baseline.",
247+
Default: "15m",
248+
Value: serpent.DurationOf(&reportStatusDuration),
249+
},
250+
{
251+
Flag: "baseline-duration",
252+
Description: "Duration to wait after all runners connect before starting to report status.",
253+
Default: "10m",
254+
Value: serpent.DurationOf(&baselineDuration),
255+
},
256+
}
257+
orgContext.AttachOptions(cmd)
258+
output.attach(&cmd.Options)
259+
tracingFlags.attach(&cmd.Options)
260+
prometheusFlags.attach(&cmd.Options)
261+
timeoutStrategy.attach(&cmd.Options)
262+
cleanupStrategy.attach(&cmd.Options)
263+
return cmd
264+
}

0 commit comments

Comments
 (0)