Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 71 additions & 26 deletions enterprise/coderd/prebuilds/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,8 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
return xerrors.Errorf("failed to generate unique prebuild ID: %w", err)
}

return c.store.InTx(func(db database.Store) error {
var provisionerJob *database.ProvisionerJob
err = c.store.InTx(func(db database.Store) error {
template, err := db.GetTemplateByID(ctx, templateID)
if err != nil {
return xerrors.Errorf("failed to get template: %w", err)
Expand Down Expand Up @@ -732,11 +733,20 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
c.logger.Info(ctx, "attempting to create prebuild", slog.F("name", name),
slog.F("workspace_id", prebuiltWorkspaceID.String()), slog.F("preset_id", presetID.String()))

return c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
provisionerJob, err = c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
return err
}, &database.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})
if err != nil {
return err
}

// Publish provisioner job event to notify the acquirer that a new job was posted
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)

Comment on lines +746 to +748
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only do this if provisionerJob is not nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provisionerJob should never be nil, we already check it in provision and return an error in that case:

if provisionerJob == nil {
// This should not happen, builder.Build() should either return a job or an error.
// Returning an error to fail fast if we hit this unexpected case.
return nil, xerrors.Errorf("provision succeeded but returned no job")
}

Nevertheless, I'm also checking in publishProvisionerJob if the job is nil, just in case:

if provisionerJob == nil {
return
}

return nil
}

// provisionDelete provisions a delete transition for a prebuilt workspace.
Expand All @@ -748,26 +758,25 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
//
// IMPORTANT: This function must be called within a database transaction. It does not create its own transaction.
// The caller is responsible for managing the transaction boundary via db.InTx().
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) error {
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) (*database.ProvisionerJob, error) {
workspace, err := db.GetWorkspaceByID(ctx, workspaceID)
if err != nil {
return xerrors.Errorf("get workspace by ID: %w", err)
return nil, xerrors.Errorf("get workspace by ID: %w", err)
}

template, err := db.GetTemplateByID(ctx, templateID)
if err != nil {
return xerrors.Errorf("failed to get template: %w", err)
return nil, xerrors.Errorf("failed to get template: %w", err)
}

if workspace.OwnerID != database.PrebuildsSystemUserID {
return xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
return nil, xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
}

c.logger.Info(ctx, "attempting to delete prebuild", slog.F("orphan", mode.String()),
slog.F("name", workspace.Name), slog.F("workspace_id", workspaceID.String()), slog.F("preset_id", presetID.String()))

return c.provision(ctx, db, workspaceID, template, presetID,
database.WorkspaceTransitionDelete, workspace, mode)
return c.provision(ctx, db, workspaceID, template, presetID, database.WorkspaceTransitionDelete, workspace, mode)
}

// cancelAndOrphanDeletePendingPrebuilds cancels pending prebuild jobs from inactive template versions
Expand All @@ -779,7 +788,9 @@ func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store
// Since these jobs were never processed by a provisioner, no Terraform resources were created,
// making it safe to orphan-delete the workspaces (skipping Terraform destroy).
func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Context, templateID uuid.UUID, templateVersionID uuid.UUID, presetID uuid.UUID) error {
return c.store.InTx(func(db database.Store) error {
var canceledProvisionerJob *database.ProvisionerJob
var canceledWorkspaceID uuid.UUID
err := c.store.InTx(func(db database.Store) error {
canceledJobs, err := db.UpdatePrebuildProvisionerJobWithCancel(
ctx,
database.UpdatePrebuildProvisionerJobWithCancelParams{
Expand Down Expand Up @@ -808,11 +819,14 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont

var multiErr multierror.Error
for _, job := range canceledJobs {
err = c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
provisionerJob, err := c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
if err != nil {
c.logger.Error(ctx, "failed to orphan delete canceled prebuild",
slog.F("workspace_id", job.WorkspaceID.String()), slog.Error(err))
multiErr.Errors = append(multiErr.Errors, err)
} else if canceledProvisionerJob == nil {
canceledProvisionerJob = provisionerJob
canceledWorkspaceID = job.WorkspaceID
}
}

Expand All @@ -821,15 +835,38 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})
if err != nil {
return err
}

// Job event notifications contain organization, provisioner type, and tags.
// Since all canceled jobs have the same values, we only send one notification
// for the first successfully canceled job, which is sufficient to trigger the
// provisioner chain that processes all remaining jobs.
if canceledProvisionerJob != nil {
c.publishProvisionerJob(ctx, canceledProvisionerJob, canceledWorkspaceID)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the same channel name is signaled multiple times with identical payload strings within the same transaction, only one instance of the notification event is delivered to listeners.

Link

We are now publishing these job events outside of the transaction, so we don't get this automatic de-duping. I'm not sure if this will cause issues with larger deployments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, we were never publishing the events within the transaction. Previously, we would send the job to the provisionNotifyCh channel, which would then publish the job outside the transaction https://github.com/coder/coder/blob/main/enterprise/coderd/prebuilds/reconcile.go#L164
Therefore, this deduplication was not happening in the previous implementation as well. The only difference here is that we guarantee that we only publish the job after the transaction commits, ensuring provisioners can see the job when they query the database.

Also, there seems to be a rule to avoid calling publish within a transaction: https://github.com/coder/coder/blob/8274251f/scripts/rules.go#L143

Nevertheless, this is a good point about deduping. Because it seems the job is published as (link):

msg, err := json.Marshal(JobPosting{
    OrganizationID:  job.OrganizationID,
    ProvisionerType: job.Provisioner,
    Tags:            job.Tags,
})

Meaning that all of these canceled jobs would have the same payload. So theoretically, we only need to send one job event, right? 🤔

Copy link
Member

@johnstcn johnstcn Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, this deduplication was not happening in the previous implementation as well.

Fair enough 👍 Just wanted to make sure this was called out.

Also, there seems to be a rule to avoid calling publish within a transaction: https://github.com/coder/coder/blob/8274251f/scripts/rules.go#L143

I completely forgot about that 😁 good callout!

Meaning that all of these canceled jobs would have the same payload. So theoretically, we only need to send one job event, right? 🤔

That's how I understand it, yes! provisionerdserver.Acquirer listens to all provisioner_job_posted events and 'wakes up' its provisioner if the tags match.

EDIT: now that I think about it more, I'm not sure that pubsub events actually impact canceled jobs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to only send 1 job event notification: fef8176

I think we still need to send the notification because these orphan-delete jobs are still handled by provisioners if any are available. Only when no provisioner is available is the job marked as completed automatically (and the workspace deleted). If provisioners exist, the notification ensures they pick up jobs immediately instead of waiting up to 30 seconds for the backup polling mechanism.


return nil
}

func (c *StoreReconciler) deletePrebuiltWorkspace(ctx context.Context, prebuiltWorkspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error {
return c.store.InTx(func(db database.Store) error {
return c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
var provisionerJob *database.ProvisionerJob
err := c.store.InTx(func(db database.Store) (err error) {
provisionerJob, err = c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
return err
}, &database.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})
if err != nil {
return err
}

// Publish provisioner job event to notify the acquirer that a new job was posted
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)

return nil
}

func (c *StoreReconciler) provision(
Expand All @@ -841,10 +878,10 @@ func (c *StoreReconciler) provision(
transition database.WorkspaceTransition,
workspace database.Workspace,
mode DeprovisionMode,
) error {
) (*database.ProvisionerJob, error) {
tvp, err := db.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID)
if err != nil {
return xerrors.Errorf("fetch preset details: %w", err)
return nil, xerrors.Errorf("fetch preset details: %w", err)
}

var params []codersdk.WorkspaceBuildParameter
Expand Down Expand Up @@ -893,26 +930,34 @@ func (c *StoreReconciler) provision(
audit.WorkspaceBuildBaggage{},
)
if err != nil {
return xerrors.Errorf("provision workspace: %w", err)
return nil, xerrors.Errorf("provision workspace: %w", err)
}

if provisionerJob == nil {
return nil
}

// Publish provisioner job event outside of transaction.
select {
case c.provisionNotifyCh <- *provisionerJob:
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
// This should not happen, builder.Build() should either return a job or an error.
// Returning an error to fail fast if we hit this unexpected case.
return nil, xerrors.Errorf("provision succeeded but returned no job")
}

c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
slog.F("prebuild_id", prebuildID.String()), slog.F("preset_id", presetID.String()),
slog.F("job_id", provisionerJob.ID))

return nil
return provisionerJob, nil
}

// publishProvisionerJob publishes a provisioner job event to notify the acquirer that a new job has been created.
// This must be called after the database transaction that creates the job has committed to ensure
// the job is visible to provisioners when they query the database.
func (c *StoreReconciler) publishProvisionerJob(ctx context.Context, provisionerJob *database.ProvisionerJob, workspaceID uuid.UUID) {
if provisionerJob == nil {
return
}
select {
case c.provisionNotifyCh <- *provisionerJob:
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", workspaceID.String()))
}
}

// ForceMetricsUpdate forces the metrics collector, if defined, to update its state (we cache the metrics state to
Expand Down
2 changes: 1 addition & 1 deletion enterprise/coderd/workspaceagents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestReinitializeAgent(t *testing.T) {

// Ensure that workspace agents can reinitialize against claimed prebuilds in non-default organizations:
for _, useDefaultOrg := range []bool{true, false} {
t.Run("", func(t *testing.T) {
t.Run(fmt.Sprintf("useDefaultOrg=%t", useDefaultOrg), func(t *testing.T) {
t.Parallel()

tempAgentLog := testutil.CreateTemp(t, "", "testReinitializeAgent")
Expand Down
Loading