Skip to content

Commit 500c17e

Browse files
authored
feat(agent): add agent unit manager (#20715)
relates to: coder/internal#1094 This is number 1 of 5 pull requests in an effort to add agent script ordering. It adds a unit manager, which uses an underlying DAG and a list of subscribers to inform units when their dependencies have changed in status. In follow-up PRs: * This unit manager will be plumbed into the workspace agent struct. * It will then be exposed to users via a new socket based drpc API * The agentsocket API will then become accessible via CLI commands that allow coder scripts to express their dependencies on one another. This is an experimental feature. There may be ways to improve the efficiency of the manager struct, but it is more important to validate this feature with customers before we invest in such optimizations. See the tests for examples of how units may communicate with one another. Actual CLI usage will be analogous. I used an LLM to produce some of these changes, but I have conducted thorough self review and consider this contribution to be ready for an external reviewer.
1 parent 35b9df8 commit 500c17e

File tree

4 files changed

+853
-6
lines changed

4 files changed

+853
-6
lines changed

agent/unit/graph.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (g *Graph[EdgeType, VertexType]) AddEdge(from, to VertexType, edge EdgeType
5858
toID := g.getOrCreateVertexID(to)
5959

6060
if g.canReach(to, from) {
61-
return xerrors.Errorf("adding edge (%v -> %v) would create a cycle", from, to)
61+
return xerrors.Errorf("adding edge (%v -> %v): %w", from, to, ErrCycleDetected)
6262
}
6363

6464
g.gonumGraph.SetEdge(simple.Edge{F: simple.Node(fromID), T: simple.Node(toID)})

agent/unit/graph_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,7 @@ func TestGraph(t *testing.T) {
148148
graph := &testGraph{}
149149
unit1 := &testGraphVertex{Name: "unit1"}
150150
err := graph.AddEdge(unit1, unit1, testEdgeCompleted)
151-
require.Error(t, err)
152-
require.ErrorContains(t, err, fmt.Sprintf("adding edge (%v -> %v) would create a cycle", unit1, unit1))
151+
require.ErrorIs(t, err, unit.ErrCycleDetected)
153152

154153
return graph
155154
},
@@ -160,8 +159,7 @@ func TestGraph(t *testing.T) {
160159
err := graph.AddEdge(unit1, unit2, testEdgeCompleted)
161160
require.NoError(t, err)
162161
err = graph.AddEdge(unit2, unit1, testEdgeStarted)
163-
require.Error(t, err)
164-
require.ErrorContains(t, err, fmt.Sprintf("adding edge (%v -> %v) would create a cycle", unit2, unit1))
162+
require.ErrorIs(t, err, unit.ErrCycleDetected)
165163

166164
return graph
167165
},
@@ -341,7 +339,7 @@ func TestGraphThreadSafety(t *testing.T) {
341339
// Verify all attempts correctly returned cycle error
342340
for i, err := range cycleErrors {
343341
require.Error(t, err, "goroutine %d should have detected cycle", i)
344-
require.Contains(t, err.Error(), "would create a cycle")
342+
require.ErrorIs(t, err, unit.ErrCycleDetected)
345343
}
346344

347345
// Verify graph remains valid (original chain intact)

agent/unit/manager.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package unit
2+
3+
import (
4+
"errors"
5+
"sync"
6+
7+
"golang.org/x/xerrors"
8+
9+
"github.com/coder/coder/v2/coderd/util/slice"
10+
)
11+
12+
var (
13+
ErrUnitNotFound = xerrors.New("unit not found")
14+
ErrUnitAlreadyRegistered = xerrors.New("unit already registered")
15+
ErrCannotUpdateOtherUnit = xerrors.New("cannot update other unit's status")
16+
ErrDependenciesNotSatisfied = xerrors.New("unit dependencies not satisfied")
17+
ErrSameStatusAlreadySet = xerrors.New("same status already set")
18+
ErrCycleDetected = xerrors.New("cycle detected")
19+
ErrFailedToAddDependency = xerrors.New("failed to add dependency")
20+
)
21+
22+
// Status represents the status of a unit.
23+
type Status string
24+
25+
// Status constants for dependency tracking.
26+
const (
27+
StatusNotRegistered Status = ""
28+
StatusPending Status = "pending"
29+
StatusStarted Status = "started"
30+
StatusComplete Status = "completed"
31+
)
32+
33+
// ID provides a type narrowed representation of the unique identifier of a unit.
34+
type ID string
35+
36+
// Unit represents a point-in-time snapshot of a vertex in the dependency graph.
37+
// Units may depend on other units, or be depended on by other units. The unit struct
38+
// is not aware of updates made to the dependency graph after it is initialized and should
39+
// not be cached.
40+
type Unit struct {
41+
id ID
42+
status Status
43+
// ready is true if all dependencies are satisfied.
44+
// It does not have an accessor method on Unit, because a unit cannot know whether it is ready.
45+
// Only the Manager can calculate whether a unit is ready based on knowledge of the dependency graph.
46+
// To discourage use of an outdated readiness value, only the Manager should set and return this field.
47+
ready bool
48+
}
49+
50+
func (u Unit) ID() ID {
51+
return u.id
52+
}
53+
54+
func (u Unit) Status() Status {
55+
return u.status
56+
}
57+
58+
// Dependency represents a dependency relationship between units.
59+
type Dependency struct {
60+
Unit ID
61+
DependsOn ID
62+
RequiredStatus Status
63+
CurrentStatus Status
64+
IsSatisfied bool
65+
}
66+
67+
// Manager provides reactive dependency tracking over a Graph.
68+
// It manages Unit registration, dependency relationships, and status updates
69+
// with automatic recalculation of readiness when dependencies are satisfied.
70+
type Manager struct {
71+
mu sync.RWMutex
72+
73+
// The underlying graph that stores dependency relationships
74+
graph *Graph[Status, ID]
75+
76+
// Store vertex instances for each unit to ensure consistent references
77+
units map[ID]Unit
78+
}
79+
80+
// NewManager creates a new Manager instance.
81+
func NewManager() *Manager {
82+
return &Manager{
83+
graph: &Graph[Status, ID]{},
84+
units: make(map[ID]Unit),
85+
}
86+
}
87+
88+
// Register adds a unit to the manager if it is not already registered.
89+
// If a Unit is already registered (per the ID field), it is not updated.
90+
func (m *Manager) Register(id ID) error {
91+
m.mu.Lock()
92+
defer m.mu.Unlock()
93+
94+
if m.registered(id) {
95+
return xerrors.Errorf("registering unit %q: %w", id, ErrUnitAlreadyRegistered)
96+
}
97+
98+
m.units[id] = Unit{
99+
id: id,
100+
status: StatusPending,
101+
ready: true,
102+
}
103+
104+
return nil
105+
}
106+
107+
// registered checks if a unit is registered in the manager.
108+
func (m *Manager) registered(id ID) bool {
109+
return m.units[id].status != StatusNotRegistered
110+
}
111+
112+
// Unit fetches a unit from the manager. If the unit does not exist,
113+
// it returns the Unit zero-value as a placeholder unit, because
114+
// units may depend on other units that have not yet been created.
115+
func (m *Manager) Unit(id ID) Unit {
116+
m.mu.RLock()
117+
defer m.mu.RUnlock()
118+
119+
return m.units[id]
120+
}
121+
122+
func (m *Manager) IsReady(id ID) bool {
123+
m.mu.RLock()
124+
defer m.mu.RUnlock()
125+
126+
if !m.registered(id) {
127+
return false
128+
}
129+
130+
return m.units[id].ready
131+
}
132+
133+
// AddDependency adds a dependency relationship between units.
134+
// The unit depends on the dependsOn unit reaching the requiredStatus.
135+
func (m *Manager) AddDependency(unit ID, dependsOn ID, requiredStatus Status) error {
136+
m.mu.Lock()
137+
defer m.mu.Unlock()
138+
139+
if !m.registered(unit) {
140+
return xerrors.Errorf("checking registration for unit %q: %w", unit, ErrUnitNotFound)
141+
}
142+
143+
// Add the dependency edge to the graph
144+
// The edge goes from unit to dependsOn, representing the dependency
145+
err := m.graph.AddEdge(unit, dependsOn, requiredStatus)
146+
if err != nil {
147+
return xerrors.Errorf("adding edge for unit %q: %w", unit, errors.Join(ErrFailedToAddDependency, err))
148+
}
149+
150+
// Recalculate readiness for the unit since it now has a new dependency
151+
m.recalculateReadinessUnsafe(unit)
152+
153+
return nil
154+
}
155+
156+
// UpdateStatus updates a unit's status and recalculates readiness for affected dependents.
157+
func (m *Manager) UpdateStatus(unit ID, newStatus Status) error {
158+
m.mu.Lock()
159+
defer m.mu.Unlock()
160+
161+
if !m.registered(unit) {
162+
return xerrors.Errorf("checking registration for unit %q: %w", unit, ErrUnitNotFound)
163+
}
164+
165+
u := m.units[unit]
166+
if u.status == newStatus {
167+
return xerrors.Errorf("checking status for unit %q: %w", unit, ErrSameStatusAlreadySet)
168+
}
169+
170+
u.status = newStatus
171+
m.units[unit] = u
172+
173+
// Get all units that depend on this one (reverse adjacent vertices)
174+
dependents := m.graph.GetReverseAdjacentVertices(unit)
175+
176+
// Recalculate readiness for all dependents
177+
for _, dependent := range dependents {
178+
m.recalculateReadinessUnsafe(dependent.From)
179+
}
180+
181+
return nil
182+
}
183+
184+
// recalculateReadinessUnsafe recalculates the readiness state for a unit.
185+
// This method assumes the caller holds the write lock.
186+
func (m *Manager) recalculateReadinessUnsafe(unit ID) {
187+
u := m.units[unit]
188+
dependencies := m.graph.GetForwardAdjacentVertices(unit)
189+
190+
allSatisfied := true
191+
for _, dependency := range dependencies {
192+
requiredStatus := dependency.Edge
193+
dependsOnUnit := m.units[dependency.To]
194+
if dependsOnUnit.status != requiredStatus {
195+
allSatisfied = false
196+
break
197+
}
198+
}
199+
200+
u.ready = allSatisfied
201+
m.units[unit] = u
202+
}
203+
204+
// GetGraph returns the underlying graph for visualization and debugging.
205+
// This should be used carefully as it exposes the internal graph structure.
206+
func (m *Manager) GetGraph() *Graph[Status, ID] {
207+
return m.graph
208+
}
209+
210+
// GetAllDependencies returns all dependencies for a unit, both satisfied and unsatisfied.
211+
func (m *Manager) GetAllDependencies(unit ID) ([]Dependency, error) {
212+
m.mu.RLock()
213+
defer m.mu.RUnlock()
214+
215+
if !m.registered(unit) {
216+
return nil, xerrors.Errorf("checking registration for unit %q: %w", unit, ErrUnitNotFound)
217+
}
218+
219+
dependencies := m.graph.GetForwardAdjacentVertices(unit)
220+
221+
var allDependencies []Dependency
222+
223+
for _, dependency := range dependencies {
224+
dependsOnUnit := m.units[dependency.To]
225+
requiredStatus := dependency.Edge
226+
allDependencies = append(allDependencies, Dependency{
227+
Unit: unit,
228+
DependsOn: dependsOnUnit.id,
229+
RequiredStatus: requiredStatus,
230+
CurrentStatus: dependsOnUnit.status,
231+
IsSatisfied: dependsOnUnit.status == requiredStatus,
232+
})
233+
}
234+
235+
return allDependencies, nil
236+
}
237+
238+
// GetUnmetDependencies returns a list of unsatisfied dependencies for a unit.
239+
func (m *Manager) GetUnmetDependencies(unit ID) ([]Dependency, error) {
240+
allDependencies, err := m.GetAllDependencies(unit)
241+
if err != nil {
242+
return nil, err
243+
}
244+
245+
var unmetDependencies []Dependency = slice.Filter(allDependencies, func(dependency Dependency) bool {
246+
return !dependency.IsSatisfied
247+
})
248+
249+
return unmetDependencies, nil
250+
}
251+
252+
// ExportDOT exports the dependency graph to DOT format for visualization.
253+
func (m *Manager) ExportDOT(name string) (string, error) {
254+
return m.graph.ToDOT(name)
255+
}

0 commit comments

Comments
 (0)