Skip to content

Commit f161c82

Browse files
committed
fix: send prebuild job notification after job build db commit
1 parent 476df47 commit f161c82

File tree

2 files changed

+67
-27
lines changed

2 files changed

+67
-27
lines changed

enterprise/coderd/prebuilds/reconcile.go

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,8 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
697697
return xerrors.Errorf("failed to generate unique prebuild ID: %w", err)
698698
}
699699

700-
return c.store.InTx(func(db database.Store) error {
700+
var provisionerJob *database.ProvisionerJob
701+
err = c.store.InTx(func(db database.Store) error {
701702
template, err := db.GetTemplateByID(ctx, templateID)
702703
if err != nil {
703704
return xerrors.Errorf("failed to get template: %w", err)
@@ -732,11 +733,20 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
732733
c.logger.Info(ctx, "attempting to create prebuild", slog.F("name", name),
733734
slog.F("workspace_id", prebuiltWorkspaceID.String()), slog.F("preset_id", presetID.String()))
734735

735-
return c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
736+
provisionerJob, err = c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
737+
return err
736738
}, &database.TxOptions{
737739
Isolation: sql.LevelRepeatableRead,
738740
ReadOnly: false,
739741
})
742+
if err != nil {
743+
return err
744+
}
745+
746+
// Publish provisioner job event to notify the acquirer that a new job was posted
747+
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)
748+
749+
return nil
740750
}
741751

742752
// provisionDelete provisions a delete transition for a prebuilt workspace.
@@ -748,26 +758,25 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
748758
//
749759
// IMPORTANT: This function must be called within a database transaction. It does not create its own transaction.
750760
// The caller is responsible for managing the transaction boundary via db.InTx().
751-
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) error {
761+
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) (*database.ProvisionerJob, error) {
752762
workspace, err := db.GetWorkspaceByID(ctx, workspaceID)
753763
if err != nil {
754-
return xerrors.Errorf("get workspace by ID: %w", err)
764+
return nil, xerrors.Errorf("get workspace by ID: %w", err)
755765
}
756766

757767
template, err := db.GetTemplateByID(ctx, templateID)
758768
if err != nil {
759-
return xerrors.Errorf("failed to get template: %w", err)
769+
return nil, xerrors.Errorf("failed to get template: %w", err)
760770
}
761771

762772
if workspace.OwnerID != database.PrebuildsSystemUserID {
763-
return xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
773+
return nil, xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
764774
}
765775

766776
c.logger.Info(ctx, "attempting to delete prebuild", slog.F("orphan", mode.String()),
767777
slog.F("name", workspace.Name), slog.F("workspace_id", workspaceID.String()), slog.F("preset_id", presetID.String()))
768778

769-
return c.provision(ctx, db, workspaceID, template, presetID,
770-
database.WorkspaceTransitionDelete, workspace, mode)
779+
return c.provision(ctx, db, workspaceID, template, presetID, database.WorkspaceTransitionDelete, workspace, mode)
771780
}
772781

773782
// cancelAndOrphanDeletePendingPrebuilds cancels pending prebuild jobs from inactive template versions
@@ -779,7 +788,8 @@ func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store
779788
// Since these jobs were never processed by a provisioner, no Terraform resources were created,
780789
// making it safe to orphan-delete the workspaces (skipping Terraform destroy).
781790
func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Context, templateID uuid.UUID, templateVersionID uuid.UUID, presetID uuid.UUID) error {
782-
return c.store.InTx(func(db database.Store) error {
791+
provisionerJobs := make(map[uuid.UUID]*database.ProvisionerJob)
792+
err := c.store.InTx(func(db database.Store) error {
783793
canceledJobs, err := db.UpdatePrebuildProvisionerJobWithCancel(
784794
ctx,
785795
database.UpdatePrebuildProvisionerJobWithCancelParams{
@@ -808,11 +818,13 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
808818

809819
var multiErr multierror.Error
810820
for _, job := range canceledJobs {
811-
err = c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
821+
provisionerJob, err := c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
812822
if err != nil {
813823
c.logger.Error(ctx, "failed to orphan delete canceled prebuild",
814824
slog.F("workspace_id", job.WorkspaceID.String()), slog.Error(err))
815825
multiErr.Errors = append(multiErr.Errors, err)
826+
} else {
827+
provisionerJobs[job.WorkspaceID] = provisionerJob
816828
}
817829
}
818830

@@ -821,15 +833,35 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
821833
Isolation: sql.LevelRepeatableRead,
822834
ReadOnly: false,
823835
})
836+
if err != nil {
837+
return err
838+
}
839+
840+
// Publish provisioner job events to notify the acquirer that new jobs were posted
841+
for workspaceID, job := range provisionerJobs {
842+
c.publishProvisionerJob(ctx, job, workspaceID)
843+
}
844+
845+
return nil
824846
}
825847

826848
func (c *StoreReconciler) deletePrebuiltWorkspace(ctx context.Context, prebuiltWorkspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error {
827-
return c.store.InTx(func(db database.Store) error {
828-
return c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
849+
var provisionerJob *database.ProvisionerJob
850+
err := c.store.InTx(func(db database.Store) (err error) {
851+
provisionerJob, err = c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
852+
return err
829853
}, &database.TxOptions{
830854
Isolation: sql.LevelRepeatableRead,
831855
ReadOnly: false,
832856
})
857+
if err != nil {
858+
return err
859+
}
860+
861+
// Publish provisioner job event to notify the acquirer that a new job was posted
862+
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)
863+
864+
return nil
833865
}
834866

835867
func (c *StoreReconciler) provision(
@@ -841,10 +873,10 @@ func (c *StoreReconciler) provision(
841873
transition database.WorkspaceTransition,
842874
workspace database.Workspace,
843875
mode DeprovisionMode,
844-
) error {
876+
) (*database.ProvisionerJob, error) {
845877
tvp, err := db.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID)
846878
if err != nil {
847-
return xerrors.Errorf("fetch preset details: %w", err)
879+
return nil, xerrors.Errorf("fetch preset details: %w", err)
848880
}
849881

850882
var params []codersdk.WorkspaceBuildParameter
@@ -893,26 +925,34 @@ func (c *StoreReconciler) provision(
893925
audit.WorkspaceBuildBaggage{},
894926
)
895927
if err != nil {
896-
return xerrors.Errorf("provision workspace: %w", err)
928+
return nil, xerrors.Errorf("provision workspace: %w", err)
897929
}
898-
899930
if provisionerJob == nil {
900-
return nil
901-
}
902-
903-
// Publish provisioner job event outside of transaction.
904-
select {
905-
case c.provisionNotifyCh <- *provisionerJob:
906-
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
907-
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
908-
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
931+
// This should not happen, builder.Build() should either return a job or an error.
932+
// Returning an error to fail fast if we hit this unexpected case.
933+
return nil, xerrors.Errorf("provision succeeded but returned no job")
909934
}
910935

911936
c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
912937
slog.F("prebuild_id", prebuildID.String()), slog.F("preset_id", presetID.String()),
913938
slog.F("job_id", provisionerJob.ID))
914939

915-
return nil
940+
return provisionerJob, nil
941+
}
942+
943+
// publishProvisionerJob publishes a provisioner job event to notify the acquirer that a new job has been created.
944+
// This must be called after the database transaction that creates the job has committed to ensure
945+
// the job is visible to provisioners when they query the database.
946+
func (c *StoreReconciler) publishProvisionerJob(ctx context.Context, provisionerJob *database.ProvisionerJob, workspaceID uuid.UUID) {
947+
if provisionerJob == nil {
948+
return
949+
}
950+
select {
951+
case c.provisionNotifyCh <- *provisionerJob:
952+
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check
953+
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
954+
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", workspaceID.String()))
955+
}
916956
}
917957

918958
// ForceMetricsUpdate forces the metrics collector, if defined, to update its state (we cache the metrics state to

enterprise/coderd/workspaceagents_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestReinitializeAgent(t *testing.T) {
9595

9696
// Ensure that workspace agents can reinitialize against claimed prebuilds in non-default organizations:
9797
for _, useDefaultOrg := range []bool{true, false} {
98-
t.Run("", func(t *testing.T) {
98+
t.Run(fmt.Sprintf("useDefaultOrg=%t", useDefaultOrg), func(t *testing.T) {
9999
t.Parallel()
100100

101101
tempAgentLog := testutil.CreateTemp(t, "", "testReinitializeAgent")

0 commit comments

Comments
 (0)