Skip to content

Commit 0fb3b51

Browse files
committed
feat(coderd/database/dbpurge): add retention for connection logs
Add `DeleteOldConnectionLogs` query and integrate it into the `dbpurge` routine. Retention is controlled by `--retention-connection-logs` flag, falling back to `--retention-global` when not set. Disabled (0) by default. Depends on #21021 Updates #20743
1 parent b34ee61 commit 0fb3b51

File tree

9 files changed

+316
-0
lines changed

9 files changed

+316
-0
lines changed

coderd/database/dbauthz/dbauthz.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1749,6 +1749,13 @@ func (q *querier) DeleteOldAuditLogConnectionEvents(ctx context.Context, thresho
17491749
return q.db.DeleteOldAuditLogConnectionEvents(ctx, threshold)
17501750
}
17511751

1752+
func (q *querier) DeleteOldConnectionLogs(ctx context.Context, arg database.DeleteOldConnectionLogsParams) (int64, error) {
1753+
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
1754+
return 0, err
1755+
}
1756+
return q.db.DeleteOldConnectionLogs(ctx, arg)
1757+
}
1758+
17521759
func (q *querier) DeleteOldNotificationMessages(ctx context.Context) error {
17531760
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceNotificationMessage); err != nil {
17541761
return err

coderd/database/dbauthz/dbauthz_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,10 @@ func (s *MethodTestSuite) TestConnectionLogs() {
355355
dbm.EXPECT().CountConnectionLogs(gomock.Any(), database.CountConnectionLogsParams{}).Return(int64(0), nil).AnyTimes()
356356
check.Args(database.CountConnectionLogsParams{}, emptyPreparedAuthorized{}).Asserts(rbac.ResourceConnectionLog, policy.ActionRead)
357357
}))
358+
s.Run("DeleteOldConnectionLogs", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) {
359+
dbm.EXPECT().DeleteOldConnectionLogs(gomock.Any(), database.DeleteOldConnectionLogsParams{}).Return(int64(0), nil).AnyTimes()
360+
check.Args(database.DeleteOldConnectionLogsParams{}).Asserts(rbac.ResourceSystem, policy.ActionDelete)
361+
}))
358362
}
359363

360364
func (s *MethodTestSuite) TestFile() {

coderd/database/dbmetrics/querymetrics.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbpurge/dbpurge.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ const (
2525
// but we won't touch the `connection_logs` table.
2626
maxAuditLogConnectionEventAge = 90 * 24 * time.Hour // 90 days
2727
auditLogConnectionEventBatchSize = 1000
28+
// Batch size for connection log deletion. Smaller batches prevent long-held
29+
// locks that could impact concurrent database operations.
30+
connectionLogsBatchSize = 1000
2831
// Telemetry heartbeats are used to deduplicate events across replicas. We
2932
// don't need to persist heartbeat rows for longer than 24 hours, as they
3033
// are only used for deduplication across replicas. The time needs to be
@@ -111,9 +114,26 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder
111114
return xerrors.Errorf("failed to delete old aibridge records: %w", err)
112115
}
113116

117+
var purgedConnectionLogs int64
118+
connectionLogsRetention := vals.Retention.ConnectionLogs.Value()
119+
if connectionLogsRetention == 0 {
120+
connectionLogsRetention = vals.Retention.Global.Value()
121+
}
122+
if connectionLogsRetention > 0 {
123+
deleteConnectionLogsBefore := start.Add(-connectionLogsRetention)
124+
purgedConnectionLogs, err = tx.DeleteOldConnectionLogs(ctx, database.DeleteOldConnectionLogsParams{
125+
BeforeTime: deleteConnectionLogsBefore,
126+
LimitCount: connectionLogsBatchSize,
127+
})
128+
if err != nil {
129+
return xerrors.Errorf("failed to delete old connection logs: %w", err)
130+
}
131+
}
132+
114133
logger.Debug(ctx, "purged old database entries",
115134
slog.F("expired_api_keys", expiredAPIKeys),
116135
slog.F("aibridge_records", purgedAIBridgeRecords),
136+
slog.F("connection_logs", purgedConnectionLogs),
117137
slog.F("duration", clk.Since(start)),
118138
)
119139

coderd/database/dbpurge/dbpurge_test.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,223 @@ func TestDeleteOldTelemetryHeartbeats(t *testing.T) {
759759
}, testutil.WaitShort, testutil.IntervalFast, "it should delete old telemetry heartbeats")
760760
}
761761

762+
func TestDeleteOldConnectionLogs(t *testing.T) {
763+
t.Parallel()
764+
765+
t.Run("RetentionEnabled", func(t *testing.T) {
766+
t.Parallel()
767+
768+
ctx := testutil.Context(t, testutil.WaitShort)
769+
770+
clk := quartz.NewMock(t)
771+
now := time.Date(2025, 1, 15, 7, 30, 0, 0, time.UTC)
772+
retentionPeriod := 30 * 24 * time.Hour // 30 days
773+
afterThreshold := now.Add(-retentionPeriod).Add(-24 * time.Hour) // 31 days ago (older than threshold)
774+
beforeThreshold := now.Add(-15 * 24 * time.Hour) // 15 days ago (newer than threshold)
775+
clk.Set(now).MustWait(ctx)
776+
777+
db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
778+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
779+
user := dbgen.User(t, db, database.User{})
780+
org := dbgen.Organization(t, db, database.Organization{})
781+
_ = dbgen.OrganizationMember(t, db, database.OrganizationMember{UserID: user.ID, OrganizationID: org.ID})
782+
tv := dbgen.TemplateVersion(t, db, database.TemplateVersion{OrganizationID: org.ID, CreatedBy: user.ID})
783+
tmpl := dbgen.Template(t, db, database.Template{OrganizationID: org.ID, ActiveVersionID: tv.ID, CreatedBy: user.ID})
784+
workspace := dbgen.Workspace(t, db, database.WorkspaceTable{
785+
OwnerID: user.ID,
786+
OrganizationID: org.ID,
787+
TemplateID: tmpl.ID,
788+
})
789+
790+
// Create old connection log (should be deleted)
791+
oldLog := dbgen.ConnectionLog(t, db, database.UpsertConnectionLogParams{
792+
ID: uuid.New(),
793+
Time: afterThreshold,
794+
OrganizationID: org.ID,
795+
WorkspaceOwnerID: user.ID,
796+
WorkspaceID: workspace.ID,
797+
WorkspaceName: workspace.Name,
798+
AgentName: "agent1",
799+
Type: database.ConnectionTypeSsh,
800+
ConnectionStatus: database.ConnectionStatusConnected,
801+
})
802+
803+
// Create recent connection log (should be kept)
804+
recentLog := dbgen.ConnectionLog(t, db, database.UpsertConnectionLogParams{
805+
ID: uuid.New(),
806+
Time: beforeThreshold,
807+
OrganizationID: org.ID,
808+
WorkspaceOwnerID: user.ID,
809+
WorkspaceID: workspace.ID,
810+
WorkspaceName: workspace.Name,
811+
AgentName: "agent2",
812+
Type: database.ConnectionTypeSsh,
813+
ConnectionStatus: database.ConnectionStatusConnected,
814+
})
815+
816+
// Run the purge with configured retention period
817+
done := awaitDoTick(ctx, t, clk)
818+
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{
819+
Retention: codersdk.RetentionConfig{
820+
ConnectionLogs: serpent.Duration(retentionPeriod),
821+
},
822+
}, clk)
823+
defer closer.Close()
824+
testutil.TryReceive(ctx, t, done)
825+
826+
// Verify results by querying all connection logs
827+
logs, err := db.GetConnectionLogsOffset(ctx, database.GetConnectionLogsOffsetParams{
828+
LimitOpt: 100,
829+
})
830+
require.NoError(t, err)
831+
832+
logIDs := make([]uuid.UUID, len(logs))
833+
for i, log := range logs {
834+
logIDs[i] = log.ConnectionLog.ID
835+
}
836+
837+
require.NotContains(t, logIDs, oldLog.ID, "old connection log should be deleted")
838+
require.Contains(t, logIDs, recentLog.ID, "recent connection log should be kept")
839+
})
840+
841+
t.Run("RetentionDisabled", func(t *testing.T) {
842+
t.Parallel()
843+
844+
ctx := testutil.Context(t, testutil.WaitShort)
845+
846+
clk := quartz.NewMock(t)
847+
now := time.Date(2025, 1, 15, 7, 30, 0, 0, time.UTC)
848+
oldTime := now.Add(-365 * 24 * time.Hour) // 1 year ago
849+
clk.Set(now).MustWait(ctx)
850+
851+
db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
852+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
853+
user := dbgen.User(t, db, database.User{})
854+
org := dbgen.Organization(t, db, database.Organization{})
855+
_ = dbgen.OrganizationMember(t, db, database.OrganizationMember{UserID: user.ID, OrganizationID: org.ID})
856+
tv := dbgen.TemplateVersion(t, db, database.TemplateVersion{OrganizationID: org.ID, CreatedBy: user.ID})
857+
tmpl := dbgen.Template(t, db, database.Template{OrganizationID: org.ID, ActiveVersionID: tv.ID, CreatedBy: user.ID})
858+
workspace := dbgen.Workspace(t, db, database.WorkspaceTable{
859+
OwnerID: user.ID,
860+
OrganizationID: org.ID,
861+
TemplateID: tmpl.ID,
862+
})
863+
864+
// Create old connection log (should NOT be deleted when retention is 0)
865+
oldLog := dbgen.ConnectionLog(t, db, database.UpsertConnectionLogParams{
866+
ID: uuid.New(),
867+
Time: oldTime,
868+
OrganizationID: org.ID,
869+
WorkspaceOwnerID: user.ID,
870+
WorkspaceID: workspace.ID,
871+
WorkspaceName: workspace.Name,
872+
AgentName: "agent1",
873+
Type: database.ConnectionTypeSsh,
874+
ConnectionStatus: database.ConnectionStatusConnected,
875+
})
876+
877+
// Run the purge with retention disabled (0)
878+
done := awaitDoTick(ctx, t, clk)
879+
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{
880+
Retention: codersdk.RetentionConfig{
881+
ConnectionLogs: serpent.Duration(0), // disabled
882+
},
883+
}, clk)
884+
defer closer.Close()
885+
testutil.TryReceive(ctx, t, done)
886+
887+
// Verify old log is still present
888+
logs, err := db.GetConnectionLogsOffset(ctx, database.GetConnectionLogsOffsetParams{
889+
LimitOpt: 100,
890+
})
891+
require.NoError(t, err)
892+
893+
logIDs := make([]uuid.UUID, len(logs))
894+
for i, log := range logs {
895+
logIDs[i] = log.ConnectionLog.ID
896+
}
897+
898+
require.Contains(t, logIDs, oldLog.ID, "old connection log should NOT be deleted when retention is disabled")
899+
})
900+
901+
t.Run("GlobalRetentionFallback", func(t *testing.T) {
902+
t.Parallel()
903+
904+
ctx := testutil.Context(t, testutil.WaitShort)
905+
906+
clk := quartz.NewMock(t)
907+
now := time.Date(2025, 1, 15, 7, 30, 0, 0, time.UTC)
908+
retentionPeriod := 30 * 24 * time.Hour // 30 days
909+
afterThreshold := now.Add(-retentionPeriod).Add(-24 * time.Hour) // 31 days ago (older than threshold)
910+
beforeThreshold := now.Add(-15 * 24 * time.Hour) // 15 days ago (newer than threshold)
911+
clk.Set(now).MustWait(ctx)
912+
913+
db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
914+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
915+
user := dbgen.User(t, db, database.User{})
916+
org := dbgen.Organization(t, db, database.Organization{})
917+
_ = dbgen.OrganizationMember(t, db, database.OrganizationMember{UserID: user.ID, OrganizationID: org.ID})
918+
tv := dbgen.TemplateVersion(t, db, database.TemplateVersion{OrganizationID: org.ID, CreatedBy: user.ID})
919+
tmpl := dbgen.Template(t, db, database.Template{OrganizationID: org.ID, ActiveVersionID: tv.ID, CreatedBy: user.ID})
920+
workspace := dbgen.Workspace(t, db, database.WorkspaceTable{
921+
OwnerID: user.ID,
922+
OrganizationID: org.ID,
923+
TemplateID: tmpl.ID,
924+
})
925+
926+
// Create old connection log (should be deleted)
927+
oldLog := dbgen.ConnectionLog(t, db, database.UpsertConnectionLogParams{
928+
ID: uuid.New(),
929+
Time: afterThreshold,
930+
OrganizationID: org.ID,
931+
WorkspaceOwnerID: user.ID,
932+
WorkspaceID: workspace.ID,
933+
WorkspaceName: workspace.Name,
934+
AgentName: "agent1",
935+
Type: database.ConnectionTypeSsh,
936+
ConnectionStatus: database.ConnectionStatusConnected,
937+
})
938+
939+
// Create recent connection log (should be kept)
940+
recentLog := dbgen.ConnectionLog(t, db, database.UpsertConnectionLogParams{
941+
ID: uuid.New(),
942+
Time: beforeThreshold,
943+
OrganizationID: org.ID,
944+
WorkspaceOwnerID: user.ID,
945+
WorkspaceID: workspace.ID,
946+
WorkspaceName: workspace.Name,
947+
AgentName: "agent2",
948+
Type: database.ConnectionTypeSsh,
949+
ConnectionStatus: database.ConnectionStatusConnected,
950+
})
951+
952+
// Run the purge with global retention (connection logs retention is 0, so it falls back)
953+
done := awaitDoTick(ctx, t, clk)
954+
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{
955+
Retention: codersdk.RetentionConfig{
956+
Global: serpent.Duration(retentionPeriod), // Use global
957+
ConnectionLogs: serpent.Duration(0), // Not set, should fall back to global
958+
},
959+
}, clk)
960+
defer closer.Close()
961+
testutil.TryReceive(ctx, t, done)
962+
963+
// Verify results
964+
logs, err := db.GetConnectionLogsOffset(ctx, database.GetConnectionLogsOffsetParams{
965+
LimitOpt: 100,
966+
})
967+
require.NoError(t, err)
968+
969+
logIDs := make([]uuid.UUID, len(logs))
970+
for i, log := range logs {
971+
logIDs[i] = log.ConnectionLog.ID
972+
}
973+
974+
require.NotContains(t, logIDs, oldLog.ID, "old connection log should be deleted via global retention")
975+
require.Contains(t, logIDs, recentLog.ID, "recent connection log should be kept")
976+
})
977+
}
978+
762979
func TestDeleteOldAIBridgeRecords(t *testing.T) {
763980
t.Parallel()
764981

coderd/database/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/connectionlogs.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,22 @@ WHERE
239239
-- @authorize_filter
240240
;
241241

242+
-- name: DeleteOldConnectionLogs :one
243+
WITH old_logs AS (
244+
SELECT id
245+
FROM connection_logs
246+
WHERE connect_time < @before_time::timestamp with time zone
247+
ORDER BY connect_time ASC
248+
LIMIT @limit_count
249+
),
250+
deleted_rows AS (
251+
DELETE FROM connection_logs
252+
USING old_logs
253+
WHERE connection_logs.id = old_logs.id
254+
RETURNING connection_logs.id
255+
)
256+
SELECT COUNT(deleted_rows.id) AS deleted_count FROM deleted_rows;
257+
242258
-- name: UpsertConnectionLog :one
243259
INSERT INTO connection_logs (
244260
id,

0 commit comments

Comments
 (0)