Graceful shutdown and startup system for async job processing.
Symbols:
(Formerly codename: GRACE - Graceful Async Cancellation Engine)
WorkerPoolConfig.WorkerStopTimeout)queued status with checkpoint intactrunning state after crashpulse/async/worker.go - Graceful start/stop logicpulse/async/grace_test.go - Test suiteVerified by:
TestGRACEShutdownFlow - pulse/async/grace_test.go:25TestGRACECheckpointSaving - pulse/async/grace_test.go:145TestGRACEWorkerShutdownTimeout - pulse/async/grace_test.go:183TestGRACEGracefulStart - pulse/async/grace_test.go:228TestGRACEGradualRecovery - pulse/async/grace_test.go:349# Fast tests (~10s)
go test ./pulse/async -run TestGRACE -short
# Full integration tests (~60s)
go test ./pulse/async -run TestGRACE
Some jobs use a two-phase execution pattern:
This pattern solves parent-child job coordination without blocking worker threads.
Location: pulse/async/worker.go:requeueOrphanedJob()
When recovering orphaned jobs, GRACE validates phase consistency:
// Example: Check if child tasks actually exist for aggregate phase
if job.Metadata != nil && job.Metadata.Phase == "aggregate" {
tasks, err := wp.queue.ListTasksByParent(job.ID)
if err != nil || len(tasks) == 0 {
// No tasks found - reset to ingest phase
job.Metadata.Phase = ""
log.Printf("GRACE: Reset job %s to 'ingest' phase (no child tasks found)")
} else {
// Tasks exist - keep aggregate phase
log.Printf("GRACE: Job %s staying in 'aggregate' phase (%d child tasks found)")
}
}
Scenarios handled:
Crash during phase transition:
Normal crash after task creation:
Crash during ingest phase:
Verified by:
TestGRACEPhaseRecoveryNoChildTasks - pulse/async/grace_test.go:492TestGRACEPhaseRecoveryWithChildTasks - pulse/async/grace_test.go:542go test ./pulse/async -run TestGRACEPhaseRecovery -v
Error handling: If checking tasks fails (DB error), defaults to safe behavior (reset phase)
Logging: All phase decisions logged for debugging:
Backward compatibility: Only affects jobs with two-phase metadata; other job types unaffected
Parent jobs spawn child tasks (subtasks). The system ensures child tasks are properly managed throughout the parent's lifecycle.
Location: pulse/async/queue.go:DeleteJobWithChildren()
When a parent job is deleted:
cancelled with reason "parent job deleted"Race condition protection: Before enqueueing children, parent checks if it still exists in database. This prevents enqueueing tasks after parent deletion during execution.
// Check if parent job still exists before enqueueing children
if _, err := queue.GetJob(job.ID); err != nil {
return fmt.Errorf("parent job deleted during execution: %w", err)
}
Location: pulse/async/queue.go:cancelOrphanedChildren()
When a parent job completes or fails:
Behavior:
Location: pulse/async/error.go:RetryableError()
Failed tasks can be retried automatically (max 2 retries = 3 total attempts):
retry_count and re-queues job꩜ Retry 1/2: operation failed | job:JB_abc123꩜ Max retries exceeded (2): operation failed | job:JB_abc123Database tracking: Each retry attempt updates the job record with retry count and error details, providing full audit trail.
Verified by:
TestParentJobHierarchy - pulse/async/job_test.go:369TestTASBotParentJobHierarchy - pulse/async/store_test.go:250go test ./pulse/async -run TestParentJobHierarchy -v
Applications using Pulse should propagate shutdown signals:
// Create worker pool with application context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
workerPool := async.NewWorkerPool(ctx, db, cfg, poolCfg, logger)
workerPool.Start()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
log.Println("Shutdown signal received, stopping workers...")
// Stop() cancels context and waits for workers with timeout (default 20s, configurable via poolCfg.WorkerStopTimeout)
workerPool.Stop()
Job handlers should check context at task boundaries:
func (h *MyHandler) Execute(ctx context.Context, job *async.Job) error {
for _, item := range items {
// Check for cancellation before each task
select {
case <-ctx.Done():
return ctx.Err() // Job will be checkpointed
default:
}
// Process item
if err := processItem(ctx, item); err != nil {
return err
}
// Update progress
job.Progress.Current++
}
return nil
}
type WorkerPoolConfig struct {
Workers int // Number of concurrent workers
PollInterval *time.Duration // Poll interval: nil = gradual ramp-up (default), 0 = no polling, positive = fixed interval
PauseOnBudget bool // Pause jobs when budget exceeded
GracefulStartPhase time.Duration // Duration of each graceful start phase (default: 5min, test: 10s)
WorkerStopTimeout time.Duration // Max time to wait for workers to checkpoint and exit (default: 20s)
MaxConsecutiveErrors int // Threshold for applying exponential backoff (default: 5)
MaxBackoff time.Duration // Maximum exponential backoff duration (default: 30s)
}
For faster testing, use shorter intervals:
pollInterval := 100 * time.Millisecond
config := async.WorkerPoolConfig{
Workers: 1,
PollInterval: &pollInterval,
GracefulStartPhase: 10 * time.Second,
WorkerStopTimeout: 2 * time.Second,
MaxConsecutiveErrors: 3,
MaxBackoff: 5 * time.Second,
}
Status: Implemented and tested