Skip to content

Commit abe66a3

Browse files
authored
feat: implement agent socket api, client and cli (#20758) (#20976)
1 parent cd9d3ef commit abe66a3

37 files changed

+1313
-276
lines changed

agent/agent.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/coder/coder/v2/agent/agentcontainers"
4242
"github.com/coder/coder/v2/agent/agentexec"
4343
"github.com/coder/coder/v2/agent/agentscripts"
44+
"github.com/coder/coder/v2/agent/agentsocket"
4445
"github.com/coder/coder/v2/agent/agentssh"
4546
"github.com/coder/coder/v2/agent/proto"
4647
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
@@ -97,6 +98,8 @@ type Options struct {
9798
Devcontainers bool
9899
DevcontainerAPIOptions []agentcontainers.Option // Enable Devcontainers for these to be effective.
99100
Clock quartz.Clock
101+
SocketServerEnabled bool
102+
SocketPath string // Path for the agent socket server socket
100103
}
101104

102105
type Client interface {
@@ -202,6 +205,8 @@ func New(options Options) Agent {
202205

203206
devcontainers: options.Devcontainers,
204207
containerAPIOptions: options.DevcontainerAPIOptions,
208+
socketPath: options.SocketPath,
209+
socketServerEnabled: options.SocketServerEnabled,
205210
}
206211
// Initially, we have a closed channel, reflecting the fact that we are not initially connected.
207212
// Each time we connect we replace the channel (while holding the closeMutex) with a new one
@@ -279,6 +284,10 @@ type agent struct {
279284
devcontainers bool
280285
containerAPIOptions []agentcontainers.Option
281286
containerAPI *agentcontainers.API
287+
288+
socketServerEnabled bool
289+
socketPath string
290+
socketServer *agentsocket.Server
282291
}
283292

284293
func (a *agent) TailnetConn() *tailnet.Conn {
@@ -358,9 +367,32 @@ func (a *agent) init() {
358367
s.ExperimentalContainers = a.devcontainers
359368
},
360369
)
370+
371+
a.initSocketServer()
372+
361373
go a.runLoop()
362374
}
363375

376+
// initSocketServer initializes server that allows direct communication with a workspace agent using IPC.
377+
func (a *agent) initSocketServer() {
378+
if !a.socketServerEnabled {
379+
a.logger.Info(a.hardCtx, "socket server is disabled")
380+
return
381+
}
382+
383+
server, err := agentsocket.NewServer(
384+
a.logger.Named("socket"),
385+
agentsocket.WithPath(a.socketPath),
386+
)
387+
if err != nil {
388+
a.logger.Warn(a.hardCtx, "failed to create socket server", slog.Error(err), slog.F("path", a.socketPath))
389+
return
390+
}
391+
392+
a.socketServer = server
393+
a.logger.Debug(a.hardCtx, "socket server started", slog.F("path", a.socketPath))
394+
}
395+
364396
// runLoop attempts to start the agent in a retry loop.
365397
// Coder may be offline temporarily, a connection issue
366398
// may be happening, but regardless after the intermittent
@@ -1928,13 +1960,20 @@ func (a *agent) Close() error {
19281960
lifecycleState = codersdk.WorkspaceAgentLifecycleShutdownError
19291961
}
19301962
}
1963+
19311964
a.setLifecycle(lifecycleState)
19321965

19331966
err = a.scriptRunner.Close()
19341967
if err != nil {
19351968
a.logger.Error(a.hardCtx, "script runner close", slog.Error(err))
19361969
}
19371970

1971+
if a.socketServer != nil {
1972+
if err := a.socketServer.Close(); err != nil {
1973+
a.logger.Error(a.hardCtx, "socket server close", slog.Error(err))
1974+
}
1975+
}
1976+
19381977
if err := a.containerAPI.Close(); err != nil {
19391978
a.logger.Error(a.hardCtx, "container API close", slog.Error(err))
19401979
}

agent/agentsocket/client.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package agentsocket
2+
3+
import (
4+
"context"
5+
6+
"golang.org/x/xerrors"
7+
"storj.io/drpc"
8+
"storj.io/drpc/drpcconn"
9+
10+
"github.com/coder/coder/v2/agent/agentsocket/proto"
11+
"github.com/coder/coder/v2/agent/unit"
12+
)
13+
14+
// Option represents a configuration option for NewClient.
15+
type Option func(*options)
16+
17+
type options struct {
18+
path string
19+
}
20+
21+
// WithPath sets the socket path. If not provided or empty, the client will
22+
// auto-discover the default socket path.
23+
func WithPath(path string) Option {
24+
return func(opts *options) {
25+
if path == "" {
26+
return
27+
}
28+
opts.path = path
29+
}
30+
}
31+
32+
// Client provides a client for communicating with the workspace agentsocket API.
33+
type Client struct {
34+
client proto.DRPCAgentSocketClient
35+
conn drpc.Conn
36+
}
37+
38+
// NewClient creates a new socket client and opens a connection to the socket.
39+
// If path is not provided via WithPath or is empty, it will auto-discover the
40+
// default socket path.
41+
func NewClient(ctx context.Context, opts ...Option) (*Client, error) {
42+
options := &options{}
43+
for _, opt := range opts {
44+
opt(options)
45+
}
46+
47+
conn, err := dialSocket(ctx, options.path)
48+
if err != nil {
49+
return nil, xerrors.Errorf("connect to socket: %w", err)
50+
}
51+
52+
drpcConn := drpcconn.New(conn)
53+
client := proto.NewDRPCAgentSocketClient(drpcConn)
54+
55+
return &Client{
56+
client: client,
57+
conn: drpcConn,
58+
}, nil
59+
}
60+
61+
// Close closes the socket connection.
62+
func (c *Client) Close() error {
63+
return c.conn.Close()
64+
}
65+
66+
// Ping sends a ping request to the agent.
67+
func (c *Client) Ping(ctx context.Context) error {
68+
_, err := c.client.Ping(ctx, &proto.PingRequest{})
69+
return err
70+
}
71+
72+
// SyncStart starts a unit in the dependency graph.
73+
func (c *Client) SyncStart(ctx context.Context, unitName unit.ID) error {
74+
_, err := c.client.SyncStart(ctx, &proto.SyncStartRequest{
75+
Unit: string(unitName),
76+
})
77+
return err
78+
}
79+
80+
// SyncWant declares a dependency between units.
81+
func (c *Client) SyncWant(ctx context.Context, unitName, dependsOn unit.ID) error {
82+
_, err := c.client.SyncWant(ctx, &proto.SyncWantRequest{
83+
Unit: string(unitName),
84+
DependsOn: string(dependsOn),
85+
})
86+
return err
87+
}
88+
89+
// SyncComplete marks a unit as complete in the dependency graph.
90+
func (c *Client) SyncComplete(ctx context.Context, unitName unit.ID) error {
91+
_, err := c.client.SyncComplete(ctx, &proto.SyncCompleteRequest{
92+
Unit: string(unitName),
93+
})
94+
return err
95+
}
96+
97+
// SyncReady requests whether a unit is ready to be started. That is, all dependencies are satisfied.
98+
func (c *Client) SyncReady(ctx context.Context, unitName unit.ID) (bool, error) {
99+
resp, err := c.client.SyncReady(ctx, &proto.SyncReadyRequest{
100+
Unit: string(unitName),
101+
})
102+
return resp.Ready, err
103+
}
104+
105+
// SyncStatus gets the status of a unit and its dependencies.
106+
func (c *Client) SyncStatus(ctx context.Context, unitName unit.ID) (SyncStatusResponse, error) {
107+
resp, err := c.client.SyncStatus(ctx, &proto.SyncStatusRequest{
108+
Unit: string(unitName),
109+
})
110+
if err != nil {
111+
return SyncStatusResponse{}, err
112+
}
113+
114+
var dependencies []DependencyInfo
115+
for _, dep := range resp.Dependencies {
116+
dependencies = append(dependencies, DependencyInfo{
117+
DependsOn: unit.ID(dep.DependsOn),
118+
RequiredStatus: unit.Status(dep.RequiredStatus),
119+
CurrentStatus: unit.Status(dep.CurrentStatus),
120+
IsSatisfied: dep.IsSatisfied,
121+
})
122+
}
123+
124+
return SyncStatusResponse{
125+
UnitName: unitName,
126+
Status: unit.Status(resp.Status),
127+
IsReady: resp.IsReady,
128+
Dependencies: dependencies,
129+
}, nil
130+
}
131+
132+
// SyncStatusResponse contains the status information for a unit.
133+
type SyncStatusResponse struct {
134+
UnitName unit.ID `table:"unit,default_sort" json:"unit_name"`
135+
Status unit.Status `table:"status" json:"status"`
136+
IsReady bool `table:"ready" json:"is_ready"`
137+
Dependencies []DependencyInfo `table:"dependencies" json:"dependencies"`
138+
}
139+
140+
// DependencyInfo contains information about a unit dependency.
141+
type DependencyInfo struct {
142+
DependsOn unit.ID `table:"depends on,default_sort" json:"depends_on"`
143+
RequiredStatus unit.Status `table:"required status" json:"required_status"`
144+
CurrentStatus unit.Status `table:"current status" json:"current_status"`
145+
IsSatisfied bool `table:"satisfied" json:"is_satisfied"`
146+
}

agent/agentsocket/server.go

Lines changed: 11 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"sync"
88

99
"golang.org/x/xerrors"
10-
11-
"github.com/hashicorp/yamux"
1210
"storj.io/drpc/drpcmux"
1311
"storj.io/drpc/drpcserver"
1412

@@ -33,11 +31,17 @@ type Server struct {
3331
wg sync.WaitGroup
3432
}
3533

36-
func NewServer(path string, logger slog.Logger) (*Server, error) {
34+
// NewServer creates a new agent socket server.
35+
func NewServer(logger slog.Logger, opts ...Option) (*Server, error) {
36+
options := &options{}
37+
for _, opt := range opts {
38+
opt(options)
39+
}
40+
3741
logger = logger.Named("agentsocket-server")
3842
server := &Server{
3943
logger: logger,
40-
path: path,
44+
path: options.path,
4145
service: &DRPCAgentSocketService{
4246
logger: logger,
4347
unitManager: unit.NewManager(),
@@ -61,14 +65,6 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
6165
},
6266
})
6367

64-
if server.path == "" {
65-
var err error
66-
server.path, err = getDefaultSocketPath()
67-
if err != nil {
68-
return nil, xerrors.Errorf("get default socket path: %w", err)
69-
}
70-
}
71-
7268
listener, err := createSocket(server.path)
7369
if err != nil {
7470
return nil, xerrors.Errorf("create socket: %w", err)
@@ -91,6 +87,7 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
9187
return server, nil
9288
}
9389

90+
// Close stops the server and cleans up resources.
9491
func (s *Server) Close() error {
9592
s.mu.Lock()
9693

@@ -134,52 +131,8 @@ func (s *Server) acceptConnections() {
134131
return
135132
}
136133

137-
for {
138-
select {
139-
case <-s.ctx.Done():
140-
return
141-
default:
142-
}
143-
144-
conn, err := listener.Accept()
145-
if err != nil {
146-
s.logger.Warn(s.ctx, "error accepting connection", slog.Error(err))
147-
continue
148-
}
149-
150-
s.mu.Lock()
151-
if s.listener == nil {
152-
s.mu.Unlock()
153-
_ = conn.Close()
154-
return
155-
}
156-
s.wg.Add(1)
157-
s.mu.Unlock()
158-
159-
go func() {
160-
defer s.wg.Done()
161-
s.handleConnection(conn)
162-
}()
163-
}
164-
}
165-
166-
func (s *Server) handleConnection(conn net.Conn) {
167-
defer conn.Close()
168-
169-
s.logger.Debug(s.ctx, "new connection accepted", slog.F("remote_addr", conn.RemoteAddr()))
170-
171-
config := yamux.DefaultConfig()
172-
config.LogOutput = nil
173-
config.Logger = slog.Stdlib(s.ctx, s.logger.Named("agentsocket-yamux"), slog.LevelInfo)
174-
session, err := yamux.Server(conn, config)
175-
if err != nil {
176-
s.logger.Warn(s.ctx, "failed to create yamux session", slog.Error(err))
177-
return
178-
}
179-
defer session.Close()
180-
181-
err = s.drpcServer.Serve(s.ctx, session)
134+
err := s.drpcServer.Serve(s.ctx, listener)
182135
if err != nil {
183-
s.logger.Debug(s.ctx, "drpc server finished", slog.Error(err))
136+
s.logger.Warn(s.ctx, "error serving drpc server", slog.Error(err))
184137
}
185138
}

0 commit comments

Comments
 (0)