Skip to content

Commit ca94588

Browse files
authored
fix: send prebuild job notification after job build db commit (#20693)
## Problem Fix race condition in prebuilds reconciler. Previously, a job notification event was sent to a Go channel before the provisioning database transaction completed. The notification is consumed by a separate goroutine that publishes to PostgreSQL's LISTEN/NOTIFY, using a separate database connection. This creates a potential race: if a provisioner daemon receives the notification and queries for the job before the provisioning transaction commits, it won't find the job in the database. This manifested as a flaky test failure in `TestReinitializeAgent`, where provisioners would occasionally miss newly created jobs. The test uses a 25-second timeout context, while the acquirer's backup polling mechanism checks for jobs every 30 seconds. This made the race condition visible in tests, though in production the backup polling would eventually pick up the job. The solution presented here guarantees that a job notification is only sent after the provisioning database transaction commits. ## Changes * The `provision()` and `provisionDelete()` functions now return the provisioner job instead of sending notifications internally. * A new `publishProvisionerJob()` helper centralizes the notification logic and is called after each transaction completes. Closes: coder/internal#963
1 parent e61b0fc commit ca94588

File tree

2 files changed

+72
-27
lines changed

2 files changed

+72
-27
lines changed

enterprise/coderd/prebuilds/reconcile.go

Lines changed: 71 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,8 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
697697
return xerrors.Errorf("failed to generate unique prebuild ID: %w", err)
698698
}
699699

700-
return c.store.InTx(func(db database.Store) error {
700+
var provisionerJob *database.ProvisionerJob
701+
err = c.store.InTx(func(db database.Store) error {
701702
template, err := db.GetTemplateByID(ctx, templateID)
702703
if err != nil {
703704
return xerrors.Errorf("failed to get template: %w", err)
@@ -732,11 +733,20 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
732733
c.logger.Info(ctx, "attempting to create prebuild", slog.F("name", name),
733734
slog.F("workspace_id", prebuiltWorkspaceID.String()), slog.F("preset_id", presetID.String()))
734735

735-
return c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
736+
provisionerJob, err = c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
737+
return err
736738
}, &database.TxOptions{
737739
Isolation: sql.LevelRepeatableRead,
738740
ReadOnly: false,
739741
})
742+
if err != nil {
743+
return err
744+
}
745+
746+
// Publish provisioner job event to notify the acquirer that a new job was posted
747+
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)
748+
749+
return nil
740750
}
741751

742752
// provisionDelete provisions a delete transition for a prebuilt workspace.
@@ -748,26 +758,25 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
748758
//
749759
// IMPORTANT: This function must be called within a database transaction. It does not create its own transaction.
750760
// The caller is responsible for managing the transaction boundary via db.InTx().
751-
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) error {
761+
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) (*database.ProvisionerJob, error) {
752762
workspace, err := db.GetWorkspaceByID(ctx, workspaceID)
753763
if err != nil {
754-
return xerrors.Errorf("get workspace by ID: %w", err)
764+
return nil, xerrors.Errorf("get workspace by ID: %w", err)
755765
}
756766

757767
template, err := db.GetTemplateByID(ctx, templateID)
758768
if err != nil {
759-
return xerrors.Errorf("failed to get template: %w", err)
769+
return nil, xerrors.Errorf("failed to get template: %w", err)
760770
}
761771

762772
if workspace.OwnerID != database.PrebuildsSystemUserID {
763-
return xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
773+
return nil, xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
764774
}
765775

766776
c.logger.Info(ctx, "attempting to delete prebuild", slog.F("orphan", mode.String()),
767777
slog.F("name", workspace.Name), slog.F("workspace_id", workspaceID.String()), slog.F("preset_id", presetID.String()))
768778

769-
return c.provision(ctx, db, workspaceID, template, presetID,
770-
database.WorkspaceTransitionDelete, workspace, mode)
779+
return c.provision(ctx, db, workspaceID, template, presetID, database.WorkspaceTransitionDelete, workspace, mode)
771780
}
772781

773782
// cancelAndOrphanDeletePendingPrebuilds cancels pending prebuild jobs from inactive template versions
@@ -779,7 +788,9 @@ func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store
779788
// Since these jobs were never processed by a provisioner, no Terraform resources were created,
780789
// making it safe to orphan-delete the workspaces (skipping Terraform destroy).
781790
func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Context, templateID uuid.UUID, templateVersionID uuid.UUID, presetID uuid.UUID) error {
782-
return c.store.InTx(func(db database.Store) error {
791+
var canceledProvisionerJob *database.ProvisionerJob
792+
var canceledWorkspaceID uuid.UUID
793+
err := c.store.InTx(func(db database.Store) error {
783794
canceledJobs, err := db.UpdatePrebuildProvisionerJobWithCancel(
784795
ctx,
785796
database.UpdatePrebuildProvisionerJobWithCancelParams{
@@ -808,11 +819,14 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
808819

809820
var multiErr multierror.Error
810821
for _, job := range canceledJobs {
811-
err = c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
822+
provisionerJob, err := c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
812823
if err != nil {
813824
c.logger.Error(ctx, "failed to orphan delete canceled prebuild",
814825
slog.F("workspace_id", job.WorkspaceID.String()), slog.Error(err))
815826
multiErr.Errors = append(multiErr.Errors, err)
827+
} else if canceledProvisionerJob == nil {
828+
canceledProvisionerJob = provisionerJob
829+
canceledWorkspaceID = job.WorkspaceID
816830
}
817831
}
818832

@@ -821,15 +835,38 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
821835
Isolation: sql.LevelRepeatableRead,
822836
ReadOnly: false,
823837
})
838+
if err != nil {
839+
return err
840+
}
841+
842+
// Job event notifications contain organization, provisioner type, and tags.
843+
// Since all canceled jobs have the same values, we only send one notification
844+
// for the first successfully canceled job, which is sufficient to trigger the
845+
// provisioner chain that processes all remaining jobs.
846+
if canceledProvisionerJob != nil {
847+
c.publishProvisionerJob(ctx, canceledProvisionerJob, canceledWorkspaceID)
848+
}
849+
850+
return nil
824851
}
825852

826853
func (c *StoreReconciler) deletePrebuiltWorkspace(ctx context.Context, prebuiltWorkspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error {
827-
return c.store.InTx(func(db database.Store) error {
828-
return c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
854+
var provisionerJob *database.ProvisionerJob
855+
err := c.store.InTx(func(db database.Store) (err error) {
856+
provisionerJob, err = c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
857+
return err
829858
}, &database.TxOptions{
830859
Isolation: sql.LevelRepeatableRead,
831860
ReadOnly: false,
832861
})
862+
if err != nil {
863+
return err
864+
}
865+
866+
// Publish provisioner job event to notify the acquirer that a new job was posted
867+
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)
868+
869+
return nil
833870
}
834871

835872
func (c *StoreReconciler) provision(
@@ -841,10 +878,10 @@ func (c *StoreReconciler) provision(
841878
transition database.WorkspaceTransition,
842879
workspace database.Workspace,
843880
mode DeprovisionMode,
844-
) error {
881+
) (*database.ProvisionerJob, error) {
845882
tvp, err := db.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID)
846883
if err != nil {
847-
return xerrors.Errorf("fetch preset details: %w", err)
884+
return nil, xerrors.Errorf("fetch preset details: %w", err)
848885
}
849886

850887
var params []codersdk.WorkspaceBuildParameter
@@ -893,26 +930,34 @@ func (c *StoreReconciler) provision(
893930
audit.WorkspaceBuildBaggage{},
894931
)
895932
if err != nil {
896-
return xerrors.Errorf("provision workspace: %w", err)
933+
return nil, xerrors.Errorf("provision workspace: %w", err)
897934
}
898-
899935
if provisionerJob == nil {
900-
return nil
901-
}
902-
903-
// Publish provisioner job event outside of transaction.
904-
select {
905-
case c.provisionNotifyCh <- *provisionerJob:
906-
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
907-
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
908-
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
936+
// This should not happen, builder.Build() should either return a job or an error.
937+
// Returning an error to fail fast if we hit this unexpected case.
938+
return nil, xerrors.Errorf("provision succeeded but returned no job")
909939
}
910940

911941
c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
912942
slog.F("prebuild_id", prebuildID.String()), slog.F("preset_id", presetID.String()),
913943
slog.F("job_id", provisionerJob.ID))
914944

915-
return nil
945+
return provisionerJob, nil
946+
}
947+
948+
// publishProvisionerJob publishes a provisioner job event to notify the acquirer that a new job has been created.
949+
// This must be called after the database transaction that creates the job has committed to ensure
950+
// the job is visible to provisioners when they query the database.
951+
func (c *StoreReconciler) publishProvisionerJob(ctx context.Context, provisionerJob *database.ProvisionerJob, workspaceID uuid.UUID) {
952+
if provisionerJob == nil {
953+
return
954+
}
955+
select {
956+
case c.provisionNotifyCh <- *provisionerJob:
957+
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check
958+
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
959+
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", workspaceID.String()))
960+
}
916961
}
917962

918963
// ForceMetricsUpdate forces the metrics collector, if defined, to update its state (we cache the metrics state to

enterprise/coderd/workspaceagents_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestReinitializeAgent(t *testing.T) {
9595

9696
// Ensure that workspace agents can reinitialize against claimed prebuilds in non-default organizations:
9797
for _, useDefaultOrg := range []bool{true, false} {
98-
t.Run("", func(t *testing.T) {
98+
t.Run(fmt.Sprintf("useDefaultOrg=%t", useDefaultOrg), func(t *testing.T) {
9999
t.Parallel()
100100

101101
tempAgentLog := testutil.CreateTemp(t, "", "testReinitializeAgent")

0 commit comments

Comments
 (0)