Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 91 additions & 84 deletions docs/admin/integrations/prometheus.md

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions enterprise/aibridged/aibridged.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ func New(ctx context.Context, pool Pooler, rpcDialer Dialer, logger slog.Logger)

ctx, cancel := context.WithCancel(ctx)
daemon := &Server{
logger: logger,
clientDialer: rpcDialer,
logger: logger,
clientDialer: rpcDialer,
clientCh: make(chan DRPCClient),
lifecycleCtx: ctx,
cancelFn: cancel,
initConnectionCh: make(chan struct{}),

requestBridgePool: pool,
clientCh: make(chan DRPCClient),
lifecycleCtx: ctx,
cancelFn: cancel,
initConnectionCh: make(chan struct{}),
}

daemon.wg.Add(1)
Expand Down
110 changes: 109 additions & 1 deletion enterprise/aibridged/aibridged_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"

"github.com/coder/aibridge"
Expand Down Expand Up @@ -166,7 +168,7 @@ func TestIntegration(t *testing.T) {

logger := testutil.Logger(t)
providers := []aibridge.Provider{aibridge.NewOpenAIProvider(aibridge.OpenAIConfig{BaseURL: mockOpenAI.URL})}
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, logger)
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, nil, logger)
require.NoError(t, err)

// Given: aibridged is started.
Expand Down Expand Up @@ -253,3 +255,109 @@ func TestIntegration(t *testing.T) {
// Then: the MCP server was initialized.
require.Contains(t, mcpTokenReceived, authLink.OAuthAccessToken, "mock MCP server not requested")
}

// TestIntegrationWithMetrics validates that Prometheus metrics are correctly incremented
// when requests are processed through aibridged.
func TestIntegrationWithMetrics(t *testing.T) {
t.Parallel()

ctx := testutil.Context(t, testutil.WaitLong)

// Create prometheus registry and metrics.
registry := prometheus.NewRegistry()
metrics := aibridge.NewMetrics(registry)

// Set up mock OpenAI server.
mockOpenAI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "chatcmpl-test",
"object": "chat.completion",
"created": 1753343279,
"model": "gpt-4.1",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "test response"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 10,
"completion_tokens": 5,
"total_tokens": 15
}
}`))
}))
t.Cleanup(mockOpenAI.Close)

// Database and coderd setup.
db, ps := dbtestutil.NewDB(t)
client, _, api, firstUser := coderdenttest.NewWithAPI(t, &coderdenttest.Options{
Options: &coderdtest.Options{
Database: db,
Pubsub: ps,
},
})

userClient, _ := coderdtest.CreateAnotherUser(t, client, firstUser.OrganizationID)

// Create an API token for the user.
apiKey, err := userClient.CreateToken(ctx, "me", codersdk.CreateTokenRequest{
TokenName: fmt.Sprintf("test-key-%d", time.Now().UnixNano()),
Lifetime: time.Hour,
Scope: codersdk.APIKeyScopeCoderAll,
})
require.NoError(t, err)

// Create aibridge client.
aiBridgeClient, err := api.CreateInMemoryAIBridgeServer(ctx)
require.NoError(t, err)

logger := testutil.Logger(t)
providers := []aibridge.Provider{aibridge.NewOpenAIProvider(aibridge.OpenAIConfig{BaseURL: mockOpenAI.URL})}

// Create pool with metrics.
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, metrics, logger)
require.NoError(t, err)

// Given: aibridged is started.
srv, err := aibridged.New(ctx, pool, func(ctx context.Context) (aibridged.DRPCClient, error) {
return aiBridgeClient, nil
}, logger)
require.NoError(t, err, "create new aibridged")
t.Cleanup(func() {
_ = srv.Shutdown(ctx)
})

// When: a request is made to aibridged.
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/openai/v1/chat/completions", bytes.NewBufferString(`{
"messages": [
{
"role": "user",
"content": "test message"
}
],
"model": "gpt-4.1"
}`))
require.NoError(t, err, "make request to test server")
req.Header.Add("Authorization", "Bearer "+apiKey.Key)
req.Header.Add("Accept", "application/json")

// When: aibridged handles the request.
rec := httptest.NewRecorder()
srv.ServeHTTP(rec, req)
require.Equal(t, http.StatusOK, rec.Code)

// Then: the interceptions metric should increase to 1.
// This is not exhaustively checking the available metrics; just an indicative one to prove
// the plumbing is working.
require.Eventually(t, func() bool {
count := promtest.ToFloat64(metrics.InterceptionCount)
return count == 1
}, testutil.WaitShort, testutil.IntervalFast, "interceptions_total metric should be 1")
}
5 changes: 2 additions & 3 deletions enterprise/aibridged/aibridged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func newTestServer(t *testing.T) (*aibridged.Server, *mock.MockDRPCClient, *mock
pool,
func(ctx context.Context) (aibridged.DRPCClient, error) {
return client, nil
},
logger)
}, logger)
require.NoError(t, err, "create new aibridged")
t.Cleanup(func() {
srv.Shutdown(context.Background())
Expand Down Expand Up @@ -291,7 +290,7 @@ func TestRouting(t *testing.T) {
aibridge.NewOpenAIProvider(aibridge.OpenAIConfig{BaseURL: openaiSrv.URL}),
aibridge.NewAnthropicProvider(aibridge.AnthropicConfig{BaseURL: antSrv.URL}, nil),
}
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, logger)
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, nil, logger)
require.NoError(t, err)
conn := &mockDRPCConn{}
client.EXPECT().DRPCConn().AnyTimes().Return(conn)
Expand Down
10 changes: 7 additions & 3 deletions enterprise/aibridged/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ type CachedBridgePool struct {

singleflight *singleflight.Group[string, *aibridge.RequestBridge]

metrics *aibridge.Metrics

shutDownOnce sync.Once
shuttingDownCh chan struct{}
}

func NewCachedBridgePool(options PoolOptions, providers []aibridge.Provider, logger slog.Logger) (*CachedBridgePool, error) {
func NewCachedBridgePool(options PoolOptions, providers []aibridge.Provider, metrics *aibridge.Metrics, logger slog.Logger) (*CachedBridgePool, error) {
cache, err := ristretto.NewCache(&ristretto.Config[string, *aibridge.RequestBridge]{
NumCounters: options.MaxItems * 10, // Docs suggest setting this 10x number of keys.
MaxCost: options.MaxItems * cacheCost, // Up to n instances.
Expand Down Expand Up @@ -88,6 +90,8 @@ func NewCachedBridgePool(options PoolOptions, providers []aibridge.Provider, log

singleflight: &singleflight.Group[string, *aibridge.RequestBridge]{},

metrics: metrics,

shuttingDownCh: make(chan struct{}),
}, nil
}
Expand Down Expand Up @@ -154,7 +158,7 @@ func (p *CachedBridgePool) Acquire(ctx context.Context, req Request, clientFn Cl
}
}

bridge, err := aibridge.NewRequestBridge(ctx, p.providers, p.logger, recorder, mcpServers)
bridge, err := aibridge.NewRequestBridge(ctx, p.providers, recorder, mcpServers, p.metrics, p.logger)
if err != nil {
return nil, xerrors.Errorf("create new request bridge: %w", err)
}
Expand All @@ -167,7 +171,7 @@ func (p *CachedBridgePool) Acquire(ctx context.Context, req Request, clientFn Cl
return instance, err
}

func (p *CachedBridgePool) Metrics() PoolMetrics {
func (p *CachedBridgePool) CacheMetrics() PoolMetrics {
if p.cache == nil {
return nil
}
Expand Down
32 changes: 16 additions & 16 deletions enterprise/aibridged/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestPool(t *testing.T) {
mcpProxy := mcpmock.NewMockServerProxier(ctrl)

opts := aibridged.PoolOptions{MaxItems: 1, TTL: time.Second}
pool, err := aibridged.NewCachedBridgePool(opts, nil, logger)
pool, err := aibridged.NewCachedBridgePool(opts, nil, nil, logger)
require.NoError(t, err)
t.Cleanup(func() { pool.Shutdown(context.Background()) })

Expand Down Expand Up @@ -63,11 +63,11 @@ func TestPool(t *testing.T) {
require.NoError(t, err, "acquire pool instance")
require.Same(t, inst, instB)

metrics := pool.Metrics()
require.EqualValues(t, 1, metrics.KeysAdded())
require.EqualValues(t, 0, metrics.KeysEvicted())
require.EqualValues(t, 1, metrics.Hits())
require.EqualValues(t, 1, metrics.Misses())
cacheMetrics := pool.CacheMetrics()
require.EqualValues(t, 1, cacheMetrics.KeysAdded())
require.EqualValues(t, 0, cacheMetrics.KeysEvicted())
require.EqualValues(t, 1, cacheMetrics.Hits())
require.EqualValues(t, 1, cacheMetrics.Misses())

// This will get called again because a new instance will be created.
mcpProxy.EXPECT().Init(gomock.Any()).Times(1).Return(nil)
Expand All @@ -81,11 +81,11 @@ func TestPool(t *testing.T) {
require.NoError(t, err, "acquire pool instance")
require.NotSame(t, inst, inst2)

metrics = pool.Metrics()
require.EqualValues(t, 2, metrics.KeysAdded())
require.EqualValues(t, 1, metrics.KeysEvicted())
require.EqualValues(t, 1, metrics.Hits())
require.EqualValues(t, 2, metrics.Misses())
cacheMetrics = pool.CacheMetrics()
require.EqualValues(t, 2, cacheMetrics.KeysAdded())
require.EqualValues(t, 1, cacheMetrics.KeysEvicted())
require.EqualValues(t, 1, cacheMetrics.Hits())
require.EqualValues(t, 2, cacheMetrics.Misses())

// This will get called again because a new instance will be created.
mcpProxy.EXPECT().Init(gomock.Any()).Times(1).Return(nil)
Expand All @@ -99,11 +99,11 @@ func TestPool(t *testing.T) {
require.NoError(t, err, "acquire pool instance 2B")
require.NotSame(t, inst2, inst2B)

metrics = pool.Metrics()
require.EqualValues(t, 3, metrics.KeysAdded())
require.EqualValues(t, 2, metrics.KeysEvicted())
require.EqualValues(t, 1, metrics.Hits())
require.EqualValues(t, 3, metrics.Misses())
cacheMetrics = pool.CacheMetrics()
require.EqualValues(t, 3, cacheMetrics.KeysAdded())
require.EqualValues(t, 2, cacheMetrics.KeysEvicted())
require.EqualValues(t, 1, cacheMetrics.Hits())
require.EqualValues(t, 3, cacheMetrics.Misses())

// TODO: add test for expiry.
// This requires Go 1.25's [synctest](https://pkg.go.dev/testing/synctest) since the
Expand Down
10 changes: 10 additions & 0 deletions enterprise/aibridged/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ func (t *recorderTranslation) RecordPromptUsage(ctx context.Context, req *aibrid
}

func (t *recorderTranslation) RecordTokenUsage(ctx context.Context, req *aibridge.TokenUsageRecord) error {
merged := req.Metadata
if merged == nil {
merged = aibridge.Metadata{}
}

// Merge the token usage values into metadata; later we might want to store some of these in their own fields.
for k, v := range req.ExtraTokenTypes {
merged[k] = v
}

_, err := t.client.RecordTokenUsage(ctx, &proto.RecordTokenUsageRequest{
InterceptionId: req.InterceptionID,
MsgId: req.MsgID,
Expand Down
7 changes: 6 additions & 1 deletion enterprise/cli/aibridged.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"golang.org/x/xerrors"

"github.com/prometheus/client_golang/prometheus"

"github.com/coder/aibridge"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/enterprise/aibridged"
Expand All @@ -31,8 +33,11 @@ func newAIBridgeDaemon(coderAPI *coderd.API) (*aibridged.Server, error) {
}, getBedrockConfig(coderAPI.DeploymentValues.AI.BridgeConfig.Bedrock)),
}

reg := prometheus.WrapRegistererWithPrefix("coder_aibridged_", coderAPI.PrometheusRegistry)
metrics := aibridge.NewMetrics(reg)

// Create pool for reusable stateful [aibridge.RequestBridge] instances (one per user).
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, logger.Named("pool")) // TODO: configurable.
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, metrics, logger.Named("pool")) // TODO: configurable size.
if err != nil {
return nil, xerrors.Errorf("create request pool: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ require (
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e
github.com/pkg/sftp v1.13.7
github.com/prometheus-community/pro-bing v0.7.0
github.com/prometheus/client_golang v1.23.0
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.65.0
github.com/prometheus/common v0.66.1
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/robfig/cron/v3 v3.0.1
github.com/shirou/gopsutil/v4 v4.25.5
Expand Down Expand Up @@ -476,7 +476,7 @@ require (
github.com/anthropics/anthropic-sdk-go v1.18.0
github.com/brianvoe/gofakeit/v7 v7.9.0
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
github.com/coder/aibridge v0.1.7
github.com/coder/aibridge v0.2.0
github.com/coder/aisdk-go v0.0.9
github.com/coder/boundary v1.0.1-0.20250925154134-55a44f2a7945
github.com/coder/preview v1.0.4
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -919,8 +919,8 @@ github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8=
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4=
github.com/coder/aibridge v0.1.7 h1:GTAM8nHawXMeb/pxAIwvzr76dyVGu9hw9qV6Gvpc7nw=
github.com/coder/aibridge v0.1.7/go.mod h1:7GhrLbzf6uM3sCA7OPaDzvq9QNrCjNuzMy+WgipYwfQ=
github.com/coder/aibridge v0.2.0 h1:kAWhHD6fsmDLH1WxIwXPu9Ineijj+lVniko45C003Vo=
github.com/coder/aibridge v0.2.0/go.mod h1:2T0RSnIX1WTqFajzXsaNsoNe6mmNsNeCTxiHBWEsFnE=
github.com/coder/aisdk-go v0.0.9 h1:Vzo/k2qwVGLTR10ESDeP2Ecek1SdPfZlEjtTfMveiVo=
github.com/coder/aisdk-go v0.0.9/go.mod h1:KF6/Vkono0FJJOtWtveh5j7yfNrSctVTpwgweYWSp5M=
github.com/coder/boundary v1.0.1-0.20250925154134-55a44f2a7945 h1:hDUf02kTX8EGR3+5B+v5KdYvORs4YNfDPci0zCs+pC0=
Expand Down Expand Up @@ -1718,15 +1718,15 @@ github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus-community/pro-bing v0.7.0 h1:KFYFbxC2f2Fp6c+TyxbCOEarf7rbnzr9Gw8eIb0RfZA=
github.com/prometheus-community/pro-bing v0.7.0/go.mod h1:Moob9dvlY50Bfq6i88xIwfyw7xLFHH69LUgx9n5zqCE=
github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc=
github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE=
github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg=
Expand Down
2 changes: 1 addition & 1 deletion scripts/metricsdocgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func readMetrics() ([]*dto.MetricFamily, error) {

var metrics []*dto.MetricFamily

decoder := expfmt.NewDecoder(f, expfmt.NewFormat(expfmt.TypeProtoText))
decoder := expfmt.NewDecoder(f, expfmt.NewFormat(expfmt.TypeTextPlain))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by, this fixes metricsdocgen.

for {
var m dto.MetricFamily
err = decoder.Decode(&m)
Expand Down
Loading
Loading