Skip to content

Commit 2c77682

Browse files
committed
fix: send prebuild job notification after job build db commit
1 parent 0e21480 commit 2c77682

File tree

2 files changed

+68
-27
lines changed

2 files changed

+68
-27
lines changed

enterprise/coderd/prebuilds/reconcile.go

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,8 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
697697
return xerrors.Errorf("failed to generate unique prebuild ID: %w", err)
698698
}
699699

700-
return c.store.InTx(func(db database.Store) error {
700+
var provisionerJob *database.ProvisionerJob
701+
err = c.store.InTx(func(db database.Store) error {
701702
template, err := db.GetTemplateByID(ctx, templateID)
702703
if err != nil {
703704
return xerrors.Errorf("failed to get template: %w", err)
@@ -732,11 +733,20 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
732733
c.logger.Info(ctx, "attempting to create prebuild", slog.F("name", name),
733734
slog.F("workspace_id", prebuiltWorkspaceID.String()), slog.F("preset_id", presetID.String()))
734735

735-
return c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
736+
provisionerJob, err = c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
737+
return err
736738
}, &database.TxOptions{
737739
Isolation: sql.LevelRepeatableRead,
738740
ReadOnly: false,
739741
})
742+
if err != nil {
743+
return err
744+
}
745+
746+
// Publish provisioner job event to notify the acquirer that a new job was posted
747+
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)
748+
749+
return nil
740750
}
741751

742752
// provisionDelete provisions a delete transition for a prebuilt workspace.
@@ -748,26 +758,26 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
748758
//
749759
// IMPORTANT: This function must be called within a database transaction. It does not create its own transaction.
750760
// The caller is responsible for managing the transaction boundary via db.InTx().
751-
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) error {
761+
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) (*database.ProvisionerJob, error) {
752762
workspace, err := db.GetWorkspaceByID(ctx, workspaceID)
753763
if err != nil {
754-
return xerrors.Errorf("get workspace by ID: %w", err)
764+
return nil, xerrors.Errorf("get workspace by ID: %w", err)
755765
}
756766

757767
template, err := db.GetTemplateByID(ctx, templateID)
758768
if err != nil {
759-
return xerrors.Errorf("failed to get template: %w", err)
769+
return nil, xerrors.Errorf("failed to get template: %w", err)
760770
}
761771

762772
if workspace.OwnerID != database.PrebuildsSystemUserID {
763-
return xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
773+
return nil, xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
764774
}
765775

766776
c.logger.Info(ctx, "attempting to delete prebuild", slog.F("orphan", mode.String()),
767777
slog.F("name", workspace.Name), slog.F("workspace_id", workspaceID.String()), slog.F("preset_id", presetID.String()))
768778

769-
return c.provision(ctx, db, workspaceID, template, presetID,
770-
database.WorkspaceTransitionDelete, workspace, mode)
779+
provisionerJob, err := c.provision(ctx, db, workspaceID, template, presetID, database.WorkspaceTransitionDelete, workspace, mode)
780+
return provisionerJob, err
771781
}
772782

773783
// cancelAndOrphanDeletePendingPrebuilds cancels pending prebuild jobs from inactive template versions
@@ -779,7 +789,8 @@ func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store
779789
// Since these jobs were never processed by a provisioner, no Terraform resources were created,
780790
// making it safe to orphan-delete the workspaces (skipping Terraform destroy).
781791
func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Context, templateID uuid.UUID, templateVersionID uuid.UUID, presetID uuid.UUID) error {
782-
return c.store.InTx(func(db database.Store) error {
792+
provisionerJobs := make(map[uuid.UUID]*database.ProvisionerJob)
793+
err := c.store.InTx(func(db database.Store) error {
783794
canceledJobs, err := db.UpdatePrebuildProvisionerJobWithCancel(
784795
ctx,
785796
database.UpdatePrebuildProvisionerJobWithCancelParams{
@@ -808,11 +819,13 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
808819

809820
var multiErr multierror.Error
810821
for _, job := range canceledJobs {
811-
err = c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
822+
provisionerJob, err := c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
812823
if err != nil {
813824
c.logger.Error(ctx, "failed to orphan delete canceled prebuild",
814825
slog.F("workspace_id", job.WorkspaceID.String()), slog.Error(err))
815826
multiErr.Errors = append(multiErr.Errors, err)
827+
} else {
828+
provisionerJobs[job.WorkspaceID] = provisionerJob
816829
}
817830
}
818831

@@ -821,15 +834,35 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
821834
Isolation: sql.LevelRepeatableRead,
822835
ReadOnly: false,
823836
})
837+
if err != nil {
838+
return err
839+
}
840+
841+
// Publish provisioner job events to notify the acquirer that new jobs were posted
842+
for workspaceID, job := range provisionerJobs {
843+
c.publishProvisionerJob(ctx, job, workspaceID)
844+
}
845+
846+
return nil
824847
}
825848

826849
func (c *StoreReconciler) deletePrebuiltWorkspace(ctx context.Context, prebuiltWorkspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error {
827-
return c.store.InTx(func(db database.Store) error {
828-
return c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
850+
var provisionerJob *database.ProvisionerJob
851+
err := c.store.InTx(func(db database.Store) (err error) {
852+
provisionerJob, err = c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
853+
return err
829854
}, &database.TxOptions{
830855
Isolation: sql.LevelRepeatableRead,
831856
ReadOnly: false,
832857
})
858+
if err != nil {
859+
return err
860+
}
861+
862+
// Publish provisioner job event to notify the acquirer that a new job was posted
863+
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)
864+
865+
return nil
833866
}
834867

835868
func (c *StoreReconciler) provision(
@@ -841,10 +874,10 @@ func (c *StoreReconciler) provision(
841874
transition database.WorkspaceTransition,
842875
workspace database.Workspace,
843876
mode DeprovisionMode,
844-
) error {
877+
) (*database.ProvisionerJob, error) {
845878
tvp, err := db.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID)
846879
if err != nil {
847-
return xerrors.Errorf("fetch preset details: %w", err)
880+
return nil, xerrors.Errorf("fetch preset details: %w", err)
848881
}
849882

850883
var params []codersdk.WorkspaceBuildParameter
@@ -893,26 +926,34 @@ func (c *StoreReconciler) provision(
893926
audit.WorkspaceBuildBaggage{},
894927
)
895928
if err != nil {
896-
return xerrors.Errorf("provision workspace: %w", err)
929+
return nil, xerrors.Errorf("provision workspace: %w", err)
897930
}
898-
899931
if provisionerJob == nil {
900-
return nil
901-
}
902-
903-
// Publish provisioner job event outside of transaction.
904-
select {
905-
case c.provisionNotifyCh <- *provisionerJob:
906-
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
907-
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
908-
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
932+
// This should not happen, builder.Build() should either return a job or an error.
933+
// Returning an error to fail fast if we hit this unexpected case.
934+
return nil, xerrors.Errorf("provision succeeded but returned no job")
909935
}
910936

911937
c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
912938
slog.F("prebuild_id", prebuildID.String()), slog.F("preset_id", presetID.String()),
913939
slog.F("job_id", provisionerJob.ID))
914940

915-
return nil
941+
return provisionerJob, nil
942+
}
943+
944+
// publishProvisionerJob publishes a provisioner job event to notify the acquirer that a new job has been created.
945+
// This must be called after the database transaction that creates the job has committed to ensure
946+
// the job is visible to provisioners when they query the database.
947+
func (c *StoreReconciler) publishProvisionerJob(ctx context.Context, provisionerJob *database.ProvisionerJob, workspaceID uuid.UUID) {
948+
if provisionerJob == nil {
949+
return
950+
}
951+
select {
952+
case c.provisionNotifyCh <- *provisionerJob:
953+
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check
954+
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
955+
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", workspaceID.String()))
956+
}
916957
}
917958

918959
// ForceMetricsUpdate forces the metrics collector, if defined, to update its state (we cache the metrics state to

enterprise/coderd/workspaceagents_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestReinitializeAgent(t *testing.T) {
9595

9696
// Ensure that workspace agents can reinitialize against claimed prebuilds in non-default organizations:
9797
for _, useDefaultOrg := range []bool{true, false} {
98-
t.Run("", func(t *testing.T) {
98+
t.Run(fmt.Sprintf("useDefaultOrg=%t", useDefaultOrg), func(t *testing.T) {
9999
t.Parallel()
100100

101101
tempAgentLog := testutil.CreateTemp(t, "", "testReinitializeAgent")

0 commit comments

Comments
 (0)