From 9cc149f31abfc438d9a557e87d200db08f0e829f Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Fri, 18 Apr 2025 11:42:27 +0000 Subject: [PATCH 1/2] feat: modify coordinators to send errors and peers to log them --- enterprise/tailnet/connio.go | 6 +- enterprise/tailnet/multiagent_test.go | 3 +- enterprise/tailnet/pgcoord.go | 4 ++ enterprise/tailnet/pgcoord_internal_test.go | 4 +- enterprise/tailnet/pgcoord_test.go | 31 ++++++----- tailnet/controllers.go | 5 ++ tailnet/coordinator.go | 61 ++++++++++++++++----- tailnet/coordinator_test.go | 16 +++--- tailnet/peer.go | 10 ++-- tailnet/test/cases.go | 2 +- tailnet/test/peer.go | 10 +++- tailnet/tunnel.go | 23 +++++++- 12 files changed, 126 insertions(+), 49 deletions(-) diff --git a/enterprise/tailnet/connio.go b/enterprise/tailnet/connio.go index 923af4bee080d..5cd2b90419f69 100644 --- a/enterprise/tailnet/connio.go +++ b/enterprise/tailnet/connio.go @@ -113,6 +113,7 @@ func (c *connIO) recvLoop() { select { case <-c.coordCtx.Done(): c.logger.Debug(c.coordCtx, "exiting io recvLoop; coordinator exit") + _ = c.Enqueue(&proto.CoordinateResponse{Error: agpl.CloseErrCoordinatorClose}) return case <-c.peerCtx.Done(): c.logger.Debug(c.peerCtx, "exiting io recvLoop; peer context canceled") @@ -123,6 +124,9 @@ func (c *connIO) recvLoop() { return } if err := c.handleRequest(req); err != nil { + if !xerrors.Is(err, errDisconnect) { + _ = c.Enqueue(&proto.CoordinateResponse{Error: err.Error()}) + } return } } @@ -136,7 +140,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error { err := c.auth.Authorize(c.peerCtx, req) if err != nil { c.logger.Warn(c.peerCtx, "unauthorized request", slog.Error(err)) - return xerrors.Errorf("authorize request: %w", err) + return agpl.AuthorizationError{Wrapped: err} } if req.UpdateSelf != nil { diff --git a/enterprise/tailnet/multiagent_test.go b/enterprise/tailnet/multiagent_test.go index 0206681d1a375..fe3c3eaee04d3 100644 --- a/enterprise/tailnet/multiagent_test.go +++ b/enterprise/tailnet/multiagent_test.go @@ -10,6 +10,7 @@ import ( "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/enterprise/tailnet" + agpl "github.com/coder/coder/v2/tailnet" agpltest "github.com/coder/coder/v2/tailnet/test" "github.com/coder/coder/v2/testutil" ) @@ -77,7 +78,7 @@ func TestPGCoordinator_MultiAgent_CoordClose(t *testing.T) { err = coord1.Close() require.NoError(t, err) - ma1.AssertEventuallyResponsesClosed() + ma1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) } // TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with diff --git a/enterprise/tailnet/pgcoord.go b/enterprise/tailnet/pgcoord.go index da19f280ca617..1283d9f3531b7 100644 --- a/enterprise/tailnet/pgcoord.go +++ b/enterprise/tailnet/pgcoord.go @@ -37,6 +37,7 @@ const ( numHandshakerWorkers = 5 dbMaxBackoff = 10 * time.Second cleanupPeriod = time.Hour + CloseErrUnhealthy = "coordinator unhealthy" ) // pgCoord is a postgres-backed coordinator @@ -235,6 +236,7 @@ func (c *pgCoord) Coordinate( c.logger.Info(ctx, "closed incoming coordinate call while unhealthy", slog.F("peer_id", id), ) + resps <- &proto.CoordinateResponse{Error: CloseErrUnhealthy} close(resps) return reqs, resps } @@ -882,6 +884,7 @@ func (q *querier) newConn(c *connIO) { q.mu.Lock() defer q.mu.Unlock() if !q.healthy { + _ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy}) err := c.Close() // This can only happen during a narrow window where we were healthy // when pgCoord checked before accepting the connection, but now are @@ -1271,6 +1274,7 @@ func (q *querier) unhealthyCloseAll() { for _, mpr := range q.mappers { // close connections async so that we don't block the querier routine that responds to updates go func(c *connIO) { + _ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy}) err := c.Close() if err != nil { q.logger.Debug(q.ctx, "error closing conn while unhealthy", slog.Error(err)) diff --git a/enterprise/tailnet/pgcoord_internal_test.go b/enterprise/tailnet/pgcoord_internal_test.go index 709fb0c225bcc..8d9d4386b4852 100644 --- a/enterprise/tailnet/pgcoord_internal_test.go +++ b/enterprise/tailnet/pgcoord_internal_test.go @@ -427,7 +427,9 @@ func TestPGCoordinatorUnhealthy(t *testing.T) { pID := uuid.UUID{5} _, resps := coordinator.Coordinate(ctx, pID, "test", agpl.AgentCoordinateeAuth{ID: pID}) - resp := testutil.TryReceive(ctx, t, resps) + resp := testutil.RequireReceive(ctx, t, resps) + require.Equal(t, CloseErrUnhealthy, resp.Error) + resp = testutil.TryReceive(ctx, t, resps) require.Nil(t, resp, "channel should be closed") // give the coordinator some time to process any pending work. We are diff --git a/enterprise/tailnet/pgcoord_test.go b/enterprise/tailnet/pgcoord_test.go index 97f68daec9f4e..3c97c5dcec072 100644 --- a/enterprise/tailnet/pgcoord_test.go +++ b/enterprise/tailnet/pgcoord_test.go @@ -118,15 +118,15 @@ func TestPGCoordinatorSingle_AgentInvalidIP(t *testing.T) { agent := agpltest.NewAgent(ctx, t, coordinator, "agent") defer agent.Close(ctx) + prefix := agpl.TailscaleServicePrefix.RandomPrefix() agent.UpdateNode(&proto.Node{ - Addresses: []string{ - agpl.TailscaleServicePrefix.RandomPrefix().String(), - }, + Addresses: []string{prefix.String()}, PreferredDerp: 10, }) // The agent connection should be closed immediately after sending an invalid addr - agent.AssertEventuallyResponsesClosed() + agent.AssertEventuallyResponsesClosed( + agpl.AuthorizationError{Wrapped: agpl.InvalidNodeAddressError{Addr: prefix.Addr().String()}}.Error()) assertEventuallyLost(ctx, t, store, agent.ID) } @@ -153,7 +153,8 @@ func TestPGCoordinatorSingle_AgentInvalidIPBits(t *testing.T) { }) // The agent connection should be closed immediately after sending an invalid addr - agent.AssertEventuallyResponsesClosed() + agent.AssertEventuallyResponsesClosed( + agpl.AuthorizationError{Wrapped: agpl.InvalidAddressBitsError{Bits: 64}}.Error()) assertEventuallyLost(ctx, t, store, agent.ID) } @@ -493,9 +494,9 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) { require.NoError(t, err) // this closes agent2, client22, client21 - agent2.AssertEventuallyResponsesClosed() - client22.AssertEventuallyResponsesClosed() - client21.AssertEventuallyResponsesClosed() + agent2.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) + client22.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) + client21.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) assertEventuallyLost(ctx, t, store, agent2.ID) assertEventuallyLost(ctx, t, store, client21.ID) assertEventuallyLost(ctx, t, store, client22.ID) @@ -503,9 +504,9 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) { err = coord1.Close() require.NoError(t, err) // this closes agent1, client12, client11 - agent1.AssertEventuallyResponsesClosed() - client12.AssertEventuallyResponsesClosed() - client11.AssertEventuallyResponsesClosed() + agent1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) + client12.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) + client11.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) assertEventuallyLost(ctx, t, store, agent1.ID) assertEventuallyLost(ctx, t, store, client11.ID) assertEventuallyLost(ctx, t, store, client12.ID) @@ -636,12 +637,12 @@ func TestPGCoordinator_Unhealthy(t *testing.T) { } } // connected agent should be disconnected - agent1.AssertEventuallyResponsesClosed() + agent1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy) // new agent should immediately disconnect agent2 := agpltest.NewAgent(ctx, t, uut, "agent2") defer agent2.Close(ctx) - agent2.AssertEventuallyResponsesClosed() + agent2.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy) // next heartbeats succeed, so we are healthy for i := 0; i < 2; i++ { @@ -836,7 +837,7 @@ func TestPGCoordinatorDual_FailedHeartbeat(t *testing.T) { // we eventually disconnect from the coordinator. err = sdb1.Close() require.NoError(t, err) - p1.AssertEventuallyResponsesClosed() + p1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy) p2.AssertEventuallyLost(p1.ID) // This basically checks that peer2 had no update // performed on their status since we are connected @@ -891,7 +892,7 @@ func TestPGCoordinatorDual_PeerReconnect(t *testing.T) { // never send a DISCONNECTED update. err = c1.Close() require.NoError(t, err) - p1.AssertEventuallyResponsesClosed() + p1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose) p2.AssertEventuallyLost(p1.ID) // This basically checks that peer2 had no update // performed on their status since we are connected diff --git a/tailnet/controllers.go b/tailnet/controllers.go index a257667fbe7a9..b256d5ee5b2fc 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -284,6 +284,11 @@ func (c *BasicCoordination) respLoop() { return } + if resp.Error != "" { + c.logger.Error(context.Background(), + "coordination protocol error", slog.F("error", resp.Error)) + } + err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates()) if err != nil { c.logger.Debug(context.Background(), "failed to update peers", slog.Error(err)) diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index f0f2c311f6e23..aa73e53ee8642 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -24,7 +24,9 @@ const ( // dropping updates ResponseBufferSize = 512 // RequestBufferSize is the max number of requests to buffer per connection - RequestBufferSize = 32 + RequestBufferSize = 32 + CloseErrOverwritten = "peer ID overwritten by new connection" + CloseErrCoordinatorClose = "coordinator closed" ) // Coordinator exchanges nodes with agents to establish connections. @@ -97,6 +99,18 @@ var ( ErrAlreadyRemoved = xerrors.New("already removed") ) +type AuthorizationError struct { + Wrapped error +} + +func (e AuthorizationError) Error() string { + return fmt.Sprintf("authorization: %s", e.Wrapped.Error()) +} + +func (e AuthorizationError) Unwrap() error { + return e.Wrapped +} + // NewCoordinator constructs a new in-memory connection coordinator. This // coordinator is incompatible with multiple Coder replicas as all node data is // in-memory. @@ -161,8 +175,12 @@ func (c *coordinator) Coordinate( c.wg.Add(1) go func() { defer c.wg.Done() - p.reqLoop(ctx, logger, c.core.handleRequest) - err := c.core.lostPeer(p) + loopErr := p.reqLoop(ctx, logger, c.core.handleRequest) + closeErrStr := "" + if loopErr != nil { + closeErrStr = loopErr.Error() + } + err := c.core.lostPeer(p, closeErrStr) if xerrors.Is(err, ErrClosed) || xerrors.Is(err, ErrAlreadyRemoved) { return } @@ -227,7 +245,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate } if err := pr.auth.Authorize(ctx, req); err != nil { - return xerrors.Errorf("authorize request: %w", err) + return AuthorizationError{Wrapped: err} } if req.UpdateSelf != nil { @@ -270,7 +288,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate } } if req.Disconnect != nil { - c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect") + c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect", "") } if rfhs := req.ReadyForHandshake; rfhs != nil { err := c.handleReadyForHandshakeLocked(pr, rfhs) @@ -344,7 +362,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor err := other.updateMappingLocked(id, n, k, reason) if err != nil { other.logger.Error(context.Background(), "failed to update mapping", slog.Error(err)) - c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update") + c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to update tunnel peer mapping") } } } @@ -360,7 +378,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error { err := src.updateMappingLocked(dstID, dst.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel") if err != nil { src.logger.Error(context.Background(), "failed update of tunnel src", slog.Error(err)) - c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update") + c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", + "failed to update tunnel dest mapping") // if the source fails, then the tunnel is also removed and there is no reason to continue // processing. return err @@ -370,7 +389,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error { err := dst.updateMappingLocked(src.id, src.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel") if err != nil { dst.logger.Error(context.Background(), "failed update of tunnel dst", slog.Error(err)) - c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update") + c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", + "failed to update tunnel src mapping") } } } @@ -381,7 +401,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error { err := src.updateMappingLocked(dstID, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel") if err != nil { src.logger.Error(context.Background(), "failed to update", slog.Error(err)) - c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update") + c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel dest mapping") // removing the peer also removes all other tunnels and notifies destinations, so it's safe to // return here. return err @@ -391,7 +411,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error { err = dst.updateMappingLocked(src.id, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel") if err != nil { dst.logger.Error(context.Background(), "failed to update", slog.Error(err)) - c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update") + c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel src mapping") // don't return here because we still want to remove the tunnel, and an error at the // destination doesn't count as an error removing the tunnel at the source. } @@ -413,6 +433,11 @@ func (c *core) initPeer(p *peer) error { if old, ok := c.peers[p.id]; ok { // rare and interesting enough to log at Info, but it isn't an error per se old.logger.Info(context.Background(), "overwritten by new connection") + select { + case old.resps <- &proto.CoordinateResponse{Error: CloseErrOverwritten}: + default: + // pass + } close(old.resps) p.overwrites = old.overwrites + 1 } @@ -433,7 +458,7 @@ func (c *core) initPeer(p *peer) error { // removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes // all tunnels from which the removed peer is the source. -func (c *core) lostPeer(p *peer) error { +func (c *core) lostPeer(p *peer, closeErr string) error { c.mutex.Lock() defer c.mutex.Unlock() c.logger.Debug(context.Background(), "lostPeer", slog.F("peer_id", p.id)) @@ -443,11 +468,11 @@ func (c *core) lostPeer(p *peer) error { if existing, ok := c.peers[p.id]; !ok || existing != p { return ErrAlreadyRemoved } - c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost") + c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost", closeErr) return nil } -func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason string) { +func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason, closeErr string) { p, ok := c.peers[id] if !ok { c.logger.Critical(context.Background(), "removed non-existent peer %s", id) @@ -455,6 +480,13 @@ func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_Peer } c.updateTunnelPeersLocked(id, nil, kind, reason) c.tunnels.removeAll(id) + if closeErr != "" { + select { + case p.resps <- &proto.CoordinateResponse{Error: closeErr}: + default: + // blocked, pass. + } + } close(p.resps) delete(c.peers, id) } @@ -487,7 +519,8 @@ func (c *core) close() error { for id := range c.peers { // when closing, mark them as LOST so that we don't disrupt in-progress // connections. - c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close") + c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close", + CloseErrCoordinatorClose) } return nil } diff --git a/tailnet/coordinator_test.go b/tailnet/coordinator_test.go index 81a4ddc2182fc..3d2655a1950c7 100644 --- a/tailnet/coordinator_test.go +++ b/tailnet/coordinator_test.go @@ -58,7 +58,8 @@ func TestCoordinator(t *testing.T) { }, PreferredDerp: 10, }) - client.AssertEventuallyResponsesClosed() + client.AssertEventuallyResponsesClosed( + tailnet.AuthorizationError{Wrapped: tailnet.InvalidAddressBitsError{Bits: 64}}.Error()) }) t.Run("AgentWithoutClients", func(t *testing.T) { @@ -95,13 +96,13 @@ func TestCoordinator(t *testing.T) { }() agent := test.NewAgent(ctx, t, coordinator, "agent") defer agent.Close(ctx) + prefix := tailnet.TailscaleServicePrefix.RandomPrefix() agent.UpdateNode(&proto.Node{ - Addresses: []string{ - tailnet.TailscaleServicePrefix.RandomPrefix().String(), - }, + Addresses: []string{prefix.String()}, PreferredDerp: 10, }) - agent.AssertEventuallyResponsesClosed() + agent.AssertEventuallyResponsesClosed( + tailnet.AuthorizationError{Wrapped: tailnet.InvalidNodeAddressError{Addr: prefix.Addr().String()}}.Error()) }) t.Run("AgentWithoutClients_InvalidBits", func(t *testing.T) { @@ -122,7 +123,8 @@ func TestCoordinator(t *testing.T) { }, PreferredDerp: 10, }) - agent.AssertEventuallyResponsesClosed() + agent.AssertEventuallyResponsesClosed( + tailnet.AuthorizationError{Wrapped: tailnet.InvalidAddressBitsError{Bits: 64}}.Error()) }) t.Run("AgentWithClient", func(t *testing.T) { @@ -198,7 +200,7 @@ func TestCoordinator(t *testing.T) { agent2.AssertEventuallyHasDERP(client.ID, 2) // This original agent channels should've been closed forcefully. - agent1.AssertEventuallyResponsesClosed() + agent1.AssertEventuallyResponsesClosed(tailnet.CloseErrOverwritten) }) t.Run("AgentAck", func(t *testing.T) { diff --git a/tailnet/peer.go b/tailnet/peer.go index 0b265a1300074..ab7bd52e11d8a 100644 --- a/tailnet/peer.go +++ b/tailnet/peer.go @@ -121,24 +121,24 @@ func (p *peer) storeMappingLocked( }, nil } -func (p *peer) reqLoop(ctx context.Context, logger slog.Logger, handler func(context.Context, *peer, *proto.CoordinateRequest) error) { +func (p *peer) reqLoop(ctx context.Context, logger slog.Logger, handler func(context.Context, *peer, *proto.CoordinateRequest) error) error { for { select { case <-ctx.Done(): logger.Debug(ctx, "peerReadLoop context done") - return + return ctx.Err() case req, ok := <-p.reqs: if !ok { logger.Debug(ctx, "peerReadLoop channel closed") - return + return nil } logger.Debug(ctx, "peerReadLoop got request") if err := handler(ctx, p, req); err != nil { if xerrors.Is(err, ErrAlreadyRemoved) || xerrors.Is(err, ErrClosed) { - return + return nil } logger.Error(ctx, "peerReadLoop error handling request", slog.Error(err), slog.F("request", req)) - return + return err } } } diff --git a/tailnet/test/cases.go b/tailnet/test/cases.go index 8361c77f4db94..37917e93964a0 100644 --- a/tailnet/test/cases.go +++ b/tailnet/test/cases.go @@ -22,7 +22,7 @@ func GracefulDisconnectTest(ctx context.Context, t *testing.T, coordinator tailn p2.Disconnect() p1.AssertEventuallyDisconnected(p2.ID) - p2.AssertEventuallyResponsesClosed() + p2.AssertEventuallyResponsesClosed("") } func LostTest(ctx context.Context, t *testing.T, coordinator tailnet.CoordinatorV2) { diff --git a/tailnet/test/peer.go b/tailnet/test/peer.go index e3064389d7dc9..601dc748d42d7 100644 --- a/tailnet/test/peer.go +++ b/tailnet/test/peer.go @@ -230,13 +230,21 @@ func (p *Peer) AssertEventuallyLost(other uuid.UUID) { } } -func (p *Peer) AssertEventuallyResponsesClosed() { +func (p *Peer) AssertEventuallyResponsesClosed(expectedError string) { + gotErr := false p.t.Helper() for { err := p.readOneResp() if xerrors.Is(err, errResponsesClosed) { + if !gotErr && expectedError != "" { + p.t.Errorf("responses closed without error '%s'", expectedError) + } return } + if err != nil && expectedError != "" && err.Error() == expectedError { + gotErr = true + continue + } if !assert.NoError(p.t, err) { return } diff --git a/tailnet/tunnel.go b/tailnet/tunnel.go index c1335f4c17d01..75a943ba13249 100644 --- a/tailnet/tunnel.go +++ b/tailnet/tunnel.go @@ -2,6 +2,7 @@ package tailnet import ( "context" + "fmt" "net/netip" "github.com/google/uuid" @@ -12,6 +13,22 @@ import ( var legacyWorkspaceAgentIP = netip.MustParseAddr("fd7a:115c:a1e0:49d6:b259:b7ac:b1b2:48f4") +type InvalidAddressBitsError struct { + Bits int +} + +func (e InvalidAddressBitsError) Error() string { + return fmt.Sprintf("invalid address bits, expected 128, got %d", e.Bits) +} + +type InvalidNodeAddressError struct { + Addr string +} + +func (e InvalidNodeAddressError) Error() string { + return fmt.Sprintf("invalid node address, got %s", e.Addr) +} + type CoordinateeAuth interface { Authorize(ctx context.Context, req *proto.CoordinateRequest) error } @@ -61,13 +78,13 @@ func (a AgentCoordinateeAuth) Authorize(_ context.Context, req *proto.Coordinate } if pre.Bits() != 128 { - return xerrors.Errorf("invalid address bits, expected 128, got %d", pre.Bits()) + return InvalidAddressBitsError{pre.Bits()} } if TailscaleServicePrefix.AddrFromUUID(a.ID).Compare(pre.Addr()) != 0 && CoderServicePrefix.AddrFromUUID(a.ID).Compare(pre.Addr()) != 0 && legacyWorkspaceAgentIP.Compare(pre.Addr()) != 0 { - return xerrors.Errorf("invalid node address, got %s", pre.Addr().String()) + return InvalidNodeAddressError{pre.Addr().String()} } } } @@ -104,7 +121,7 @@ func handleClientNodeRequests(req *proto.CoordinateRequest) error { } if pre.Bits() != 128 { - return xerrors.Errorf("invalid address bits, expected 128, got %d", pre.Bits()) + return InvalidAddressBitsError{pre.Bits()} } } } From befa9cb52e2f9620233978c2621f8910b9a8fa7c Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Mon, 21 Apr 2025 06:55:34 +0000 Subject: [PATCH 2/2] only warn on ready for handshake errors --- enterprise/tailnet/connio.go | 2 +- tailnet/controllers.go | 10 ++++++++-- tailnet/coordinator.go | 3 ++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/enterprise/tailnet/connio.go b/enterprise/tailnet/connio.go index 5cd2b90419f69..df39b6227149b 100644 --- a/enterprise/tailnet/connio.go +++ b/enterprise/tailnet/connio.go @@ -221,7 +221,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error { slog.F("dst", dst.String()), ) _ = c.Enqueue(&proto.CoordinateResponse{ - Error: fmt.Sprintf("you do not share a tunnel with %q", dst.String()), + Error: fmt.Sprintf("%s: you do not share a tunnel with %q", agpl.ReadyForHandshakeError, dst.String()), }) return nil } diff --git a/tailnet/controllers.go b/tailnet/controllers.go index b256d5ee5b2fc..2328e19640a4d 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -285,8 +285,14 @@ func (c *BasicCoordination) respLoop() { } if resp.Error != "" { - c.logger.Error(context.Background(), - "coordination protocol error", slog.F("error", resp.Error)) + // ReadyForHandshake error can occur during race conditions, where we send a ReadyForHandshake message, + // but the source has already disconnected from the tunnel by the time we do. So, just log at warning. + if strings.HasPrefix(resp.Error, ReadyForHandshakeError) { + c.logger.Warn(context.Background(), "coordination warning", slog.F("msg", resp.Error)) + } else { + c.logger.Error(context.Background(), + "coordination protocol error", slog.F("error", resp.Error)) + } } err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates()) diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index aa73e53ee8642..38e4ebe90da06 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -27,6 +27,7 @@ const ( RequestBufferSize = 32 CloseErrOverwritten = "peer ID overwritten by new connection" CloseErrCoordinatorClose = "coordinator closed" + ReadyForHandshakeError = "ready for handshake error" ) // Coordinator exchanges nodes with agents to establish connections. @@ -316,7 +317,7 @@ func (c *core) handleReadyForHandshakeLocked(src *peer, rfhs []*proto.Coordinate // don't want to kill its connection. select { case src.resps <- &proto.CoordinateResponse{ - Error: fmt.Sprintf("you do not share a tunnel with %q", dstID.String()), + Error: fmt.Sprintf("%s: you do not share a tunnel with %q", ReadyForHandshakeError, dstID.String()), }: default: return ErrWouldBlock