Skip to content

fix(enterprise): mark nodes from unhealthy coordinators as lost #13123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,10 +1485,17 @@ func (h *heartbeats) filter(mappings []mapping) []mapping {
ok := m.coordinator == h.self
if !ok {
_, ok = h.coordinators[m.coordinator]
if !ok {
// If a mapping exists to a coordinator lost to heartbeats,
// still add the mapping as LOST. If a coordinator misses
// heartbeats but a client is still connected to it, this may be
// the only mapping available for it. Newer mappings will take
// precedence.
m.kind = proto.CoordinateResponse_PeerUpdate_LOST
}
}
if ok {
out = append(out, m)
}

out = append(out, m)
}
return out
}
Expand Down
40 changes: 38 additions & 2 deletions enterprise/tailnet/pgcoord_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/xerrors"
Expand All @@ -33,9 +34,9 @@ import (
// make update-golden-files
var UpdateGoldenFiles = flag.Bool("update", false, "update .golden files")

// TestHeartbeat_Cleanup is internal so that we can overwrite the cleanup period and not wait an hour for the timed
// TestHeartbeats_Cleanup is internal so that we can overwrite the cleanup period and not wait an hour for the timed
// cleanup.
func TestHeartbeat_Cleanup(t *testing.T) {
func TestHeartbeats_Cleanup(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -78,6 +79,41 @@ func TestHeartbeat_Cleanup(t *testing.T) {
close(waitForCleanup)
}

func TestHeartbeats_LostCoordinator_MarkLost(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
mStore := dbmock.NewMockStore(ctrl)

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

uut := &heartbeats{
ctx: ctx,
logger: logger,
store: mStore,
cleanupPeriod: time.Millisecond,
coordinators: map[uuid.UUID]time.Time{
uuid.New(): time.Now(),
},
}

mpngs := []mapping{{
peer: uuid.New(),
coordinator: uuid.New(),
updatedAt: time.Now(),
node: &proto.Node{},
kind: proto.CoordinateResponse_PeerUpdate_NODE,
}}

// Filter should still return the mapping without a coordinator, but marked
// as LOST.
got := uut.filter(mpngs)
require.Len(t, got, 1)
assert.Equal(t, proto.CoordinateResponse_PeerUpdate_LOST, got[0].kind)
}

// TestLostPeerCleanupQueries tests that our SQL queries to clean up lost peers do what we expect,
// that is, clean up peers and associated tunnels that have been lost for over 24 hours.
func TestLostPeerCleanupQueries(t *testing.T) {
Expand Down
66 changes: 56 additions & 10 deletions enterprise/tailnet/pgcoord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,52 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
assertEventuallyLost(ctx, t, store, client.id)
}

func TestPGCoordinatorSingle_MissedHeartbeats_NoDrop(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}
store, ps := dbtestutil.NewDB(t)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

coordinator, err := tailnet.NewPGCoord(ctx, logger, ps, store)
require.NoError(t, err)
defer coordinator.Close()

agentID := uuid.New()

client := agpltest.NewPeer(ctx, t, coordinator, "client")
defer client.Close(ctx)
client.AddTunnel(agentID)

client.UpdateDERP(11)

// simulate a second coordinator via DB calls only --- our goal is to test
// broken heart-beating, so we can't use a real coordinator
fCoord2 := &fakeCoordinator{
ctx: ctx,
t: t,
store: store,
id: uuid.New(),
}
// simulate a single heartbeat, the coordinator is healthy
fCoord2.heartbeat()

fCoord2.agentNode(agentID, &agpl.Node{PreferredDERP: 12})
// since it's healthy the client should get the new node.
client.AssertEventuallyHasDERP(agentID, 12)

// the heartbeat should then timeout and we'll get sent a LOST update, NOT a
// disconnect.
client.AssertEventuallyLost(agentID)

client.Close(ctx)

assertEventuallyLost(ctx, t, store, client.ID)
}

func TestPGCoordinatorSingle_SendsHeartbeats(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
Expand Down Expand Up @@ -857,6 +903,16 @@ func newTestAgent(t *testing.T, coord agpl.CoordinatorV1, name string, id ...uui
return a
}

func newTestClient(t *testing.T, coord agpl.CoordinatorV1, agentID uuid.UUID, id ...uuid.UUID) *testConn {
c := newTestConn(id)
go func() {
err := coord.ServeClient(c.serverWS, c.id, agentID)
assert.NoError(t, err)
close(c.closeChan)
}()
return c
}

func (c *testConn) close() error {
return c.ws.Close()
}
Expand Down Expand Up @@ -902,16 +958,6 @@ func (c *testConn) waitForClose(ctx context.Context, t *testing.T) {
}
}

func newTestClient(t *testing.T, coord agpl.CoordinatorV1, agentID uuid.UUID, id ...uuid.UUID) *testConn {
c := newTestConn(id)
go func() {
err := coord.ServeClient(c.serverWS, c.id, agentID)
assert.NoError(t, err)
close(c.closeChan)
}()
return c
}

func assertEventuallyHasDERPs(ctx context.Context, t *testing.T, c *testConn, expected ...int) {
t.Helper()
for {
Expand Down