Skip to content

Commit 321ce8c

Browse files
authored
Merge branch 'main' into jakehwll/routing-ai-governance
2 parents 2361aae + 650dc86 commit 321ce8c

File tree

91 files changed

+5509
-985
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+5509
-985
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1537,7 +1537,7 @@ jobs:
15371537
steps:
15381538
- name: Send Slack notification
15391539
run: |
1540-
ESCAPED_PROMPT=$(printf "%s" "<@U08TJ4YNCA3> $BLINK_CI_FAILURE_PROMPT" | jq -Rsa .)
1540+
ESCAPED_PROMPT=$(printf "%s" "<@U09LQ75AHKR> $BLINK_CI_FAILURE_PROMPT" | jq -Rsa .)
15411541
curl -X POST -H 'Content-type: application/json' \
15421542
--data '{
15431543
"blocks": [

.github/workflows/nightly-gauntlet.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ jobs:
170170
steps:
171171
- name: Send Slack notification
172172
run: |
173-
ESCAPED_PROMPT=$(printf "%s" "<@U08TJ4YNCA3> $BLINK_CI_FAILURE_PROMPT" | jq -Rsa .)
173+
ESCAPED_PROMPT=$(printf "%s" "<@U09LQ75AHKR> $BLINK_CI_FAILURE_PROMPT" | jq -Rsa .)
174174
curl -X POST -H 'Content-type: application/json' \
175175
--data '{
176176
"blocks": [

cli/cliui/agent.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ func Agent(ctx context.Context, writer io.Writer, agentID uuid.UUID, opts AgentO
5353
t := time.NewTimer(0)
5454
defer t.Stop()
5555

56+
startTime := time.Now()
57+
baseInterval := opts.FetchInterval
58+
5659
for {
5760
select {
5861
case <-ctx.Done():
@@ -68,7 +71,11 @@ func Agent(ctx context.Context, writer io.Writer, agentID uuid.UUID, opts AgentO
6871
return
6972
}
7073
fetchedAgent <- fetchAgent{agent: agent}
71-
t.Reset(opts.FetchInterval)
74+
75+
// Adjust the interval based on how long we've been waiting.
76+
elapsed := time.Since(startTime)
77+
currentInterval := GetProgressiveInterval(baseInterval, elapsed)
78+
t.Reset(currentInterval)
7279
}
7380
}
7481
}()
@@ -293,6 +300,24 @@ func safeDuration(sw *stageWriter, a, b *time.Time) time.Duration {
293300
return a.Sub(*b)
294301
}
295302

303+
// GetProgressiveInterval returns an interval that increases over time.
304+
// The interval starts at baseInterval and increases to
305+
// a maximum of baseInterval * 16 over time.
306+
func GetProgressiveInterval(baseInterval time.Duration, elapsed time.Duration) time.Duration {
307+
switch {
308+
case elapsed < 60*time.Second:
309+
return baseInterval // 500ms for first 60 seconds
310+
case elapsed < 2*time.Minute:
311+
return baseInterval * 2 // 1s for next 1 minute
312+
case elapsed < 5*time.Minute:
313+
return baseInterval * 4 // 2s for next 3 minutes
314+
case elapsed < 10*time.Minute:
315+
return baseInterval * 8 // 4s for next 5 minutes
316+
default:
317+
return baseInterval * 16 // 8s after 10 minutes
318+
}
319+
}
320+
296321
type closeFunc func() error
297322

298323
func (c closeFunc) Close() error {

cli/cliui/agent_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,3 +866,31 @@ func TestConnDiagnostics(t *testing.T) {
866866
})
867867
}
868868
}
869+
870+
func TestGetProgressiveInterval(t *testing.T) {
871+
t.Parallel()
872+
873+
baseInterval := 500 * time.Millisecond
874+
875+
testCases := []struct {
876+
name string
877+
elapsed time.Duration
878+
expected time.Duration
879+
}{
880+
{"first_minute", 30 * time.Second, baseInterval},
881+
{"second_minute", 90 * time.Second, baseInterval * 2},
882+
{"third_to_fifth_minute", 3 * time.Minute, baseInterval * 4},
883+
{"sixth_to_tenth_minute", 7 * time.Minute, baseInterval * 8},
884+
{"after_ten_minutes", 15 * time.Minute, baseInterval * 16},
885+
{"boundary_first_minute", 59 * time.Second, baseInterval},
886+
{"boundary_second_minute", 61 * time.Second, baseInterval * 2},
887+
}
888+
889+
for _, tc := range testCases {
890+
t.Run(tc.name, func(t *testing.T) {
891+
t.Parallel()
892+
result := cliui.GetProgressiveInterval(baseInterval, tc.elapsed)
893+
require.Equal(t, tc.expected, result)
894+
})
895+
}
896+
}

cli/exp_scaletest_dynamicparameters.go

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,35 @@ package cli
44

55
import (
66
"fmt"
7+
"net/http"
8+
"time"
79

810
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promhttp"
912
"golang.org/x/xerrors"
1013

1114
"cdr.dev/slog"
1215
"cdr.dev/slog/sloggers/sloghuman"
16+
"github.com/coder/serpent"
1317

18+
"github.com/coder/coder/v2/codersdk"
1419
"github.com/coder/coder/v2/scaletest/dynamicparameters"
1520
"github.com/coder/coder/v2/scaletest/harness"
16-
"github.com/coder/serpent"
1721
)
1822

1923
const (
2024
dynamicParametersTestName = "dynamic-parameters"
2125
)
2226

2327
func (r *RootCmd) scaletestDynamicParameters() *serpent.Command {
24-
var templateName string
25-
var numEvals int64
28+
var (
29+
templateName string
30+
numEvals int64
31+
tracingFlags = &scaletestTracingFlags{}
32+
prometheusFlags = &scaletestPrometheusFlags{}
33+
// This test requires unlimited concurrency
34+
timeoutStrategy = &timeoutFlags{}
35+
)
2636
orgContext := NewOrganizationContext()
2737
output := &scaletestOutputFlags{}
2838

@@ -51,15 +61,53 @@ func (r *RootCmd) scaletestDynamicParameters() *serpent.Command {
5161
return err
5262
}
5363

64+
_, err = requireAdmin(ctx, client)
65+
if err != nil {
66+
return err
67+
}
68+
69+
client.HTTPClient = &http.Client{
70+
Transport: &codersdk.HeaderTransport{
71+
Transport: http.DefaultTransport,
72+
Header: map[string][]string{
73+
codersdk.BypassRatelimitHeader: {"true"},
74+
},
75+
},
76+
}
77+
78+
reg := prometheus.NewRegistry()
79+
metrics := dynamicparameters.NewMetrics(reg, "concurrent_evaluations")
80+
5481
logger := slog.Make(sloghuman.Sink(inv.Stdout)).Leveled(slog.LevelDebug)
82+
prometheusSrvClose := ServeHandler(ctx, logger, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), prometheusFlags.Address, "prometheus")
83+
defer prometheusSrvClose()
84+
85+
tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx)
86+
if err != nil {
87+
return xerrors.Errorf("create tracer provider: %w", err)
88+
}
89+
defer func() {
90+
// Allow time for traces to flush even if command context is
91+
// canceled. This is a no-op if tracing is not enabled.
92+
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
93+
if err := closeTracing(ctx); err != nil {
94+
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
95+
}
96+
// Wait for prometheus metrics to be scraped
97+
_, _ = fmt.Fprintf(inv.Stderr, "Waiting %s for prometheus metrics to be scraped\n", prometheusFlags.Wait)
98+
<-time.After(prometheusFlags.Wait)
99+
}()
100+
tracer := tracerProvider.Tracer(scaletestTracerName)
101+
55102
partitions, err := dynamicparameters.SetupPartitions(ctx, client, org.ID, templateName, numEvals, logger)
56103
if err != nil {
57104
return xerrors.Errorf("setup dynamic parameters partitions: %w", err)
58105
}
59106

60-
th := harness.NewTestHarness(harness.ConcurrentExecutionStrategy{}, harness.ConcurrentExecutionStrategy{})
61-
reg := prometheus.NewRegistry()
62-
metrics := dynamicparameters.NewMetrics(reg, "concurrent_evaluations")
107+
th := harness.NewTestHarness(
108+
timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}),
109+
// there is no cleanup since it's just a connection that we sever.
110+
nil)
63111

64112
for i, part := range partitions {
65113
for j := range part.ConcurrentEvaluations {
@@ -68,12 +116,21 @@ func (r *RootCmd) scaletestDynamicParameters() *serpent.Command {
68116
Metrics: metrics,
69117
MetricLabelValues: []string{fmt.Sprintf("%d", part.ConcurrentEvaluations)},
70118
}
71-
runner := dynamicparameters.NewRunner(client, cfg)
119+
var runner harness.Runnable = dynamicparameters.NewRunner(client, cfg)
120+
if tracingEnabled {
121+
runner = &runnableTraceWrapper{
122+
tracer: tracer,
123+
spanName: fmt.Sprintf("%s/%d/%d", dynamicParametersTestName, i, j),
124+
runner: runner,
125+
}
126+
}
72127
th.AddRun(dynamicParametersTestName, fmt.Sprintf("%d/%d", j, i), runner)
73128
}
74129
}
75130

76-
err = th.Run(ctx)
131+
testCtx, testCancel := timeoutStrategy.toContext(ctx)
132+
defer testCancel()
133+
err = th.Run(testCtx)
77134
if err != nil {
78135
return xerrors.Errorf("run test harness: %w", err)
79136
}
@@ -106,5 +163,8 @@ func (r *RootCmd) scaletestDynamicParameters() *serpent.Command {
106163
}
107164
orgContext.AttachOptions(cmd)
108165
output.attach(&cmd.Options)
166+
tracingFlags.attach(&cmd.Options)
167+
prometheusFlags.attach(&cmd.Options)
168+
timeoutStrategy.attach(&cmd.Options)
109169
return cmd
110170
}

0 commit comments

Comments
 (0)