Graceful shutdown and startup system for async job processing.
Symbols:
(Formerly codename: GRACE - Graceful Async Cancellation Engine)
queued status with checkpoint intactrunning state after crashpulse/async/worker.go - Graceful start/stop logicpulse/async/grace_test.go - Test suite# Fast tests (~10s)
go test ./pulse/async -run TestGRACE -short
# Full integration tests (~60s)
go test ./pulse/async -run TestGRACE
Some jobs use a two-phase execution pattern:
This pattern solves parent-child job coordination without blocking worker threads.
Location: pulse/async/worker.go:requeueOrphanedJob()
When recovering orphaned jobs, GRACE validates phase consistency:
// Example: Check if child tasks actually exist for aggregate phase
if job.Metadata != nil && job.Metadata.Phase == "aggregate" {
tasks, err := wp.queue.ListTasksByParent(job.ID)
if err != nil || len(tasks) == 0 {
// No tasks found - reset to ingest phase
job.Metadata.Phase = ""
log.Printf("GRACE: Reset job %s to 'ingest' phase (no child tasks found)")
} else {
// Tasks exist - keep aggregate phase
log.Printf("GRACE: Job %s staying in 'aggregate' phase (%d child tasks found)")
}
}
Scenarios handled:
Crash during phase transition:
Normal crash after task creation:
Crash during ingest phase:
Phase recovery is tested in pulse/async/grace_test.go:
TestGRACEPhaseRecoveryNoChildTasks - Validates reset when no tasks existTestGRACEPhaseRecoveryWithChildTasks - Validates preservation when tasks exist# Run phase recovery tests
go test ./pulse/async -run TestGRACEPhaseRecovery -v
Error handling: If checking tasks fails (DB error), defaults to safe behavior (reset phase)
Logging: All phase decisions logged for debugging:
Backward compatibility: Only affects jobs with two-phase metadata; other job types unaffected
Parent jobs spawn child tasks (subtasks). The system ensures child tasks are properly managed throughout the parent's lifecycle.
Location: pulse/async/queue.go:DeleteJobWithChildren()
When a parent job is deleted:
cancelled with reason "parent job deleted"Race condition protection: Before enqueueing children, parent checks if it still exists in database. This prevents enqueueing tasks after parent deletion during execution.
// Check if parent job still exists before enqueueing children
if _, err := queue.GetJob(job.ID); err != nil {
return fmt.Errorf("parent job deleted during execution: %w", err)
}
Location: pulse/async/queue.go:cancelOrphanedChildren()
When a parent job completes or fails:
Behavior:
Location: pulse/async/error.go:RetryableError()
Failed tasks can be retried automatically (max 3 attempts total):
retry_count and re-queues job꩜ Retry 1/2: operation failed | job:JB_abc123꩜ Max retries exceeded (2): operation failed | job:JB_abc123Database tracking: Each retry attempt updates the job record with retry count and error details, providing full audit trail.
# Test cascade deletion
go test ./pulse/async -run TestDeleteJobWithChildren -v
# Test parent-child lifecycle
go test ./pulse/async -run TestParentChild -v
Applications using Pulse should propagate shutdown signals:
// Create worker pool with application context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
workerPool := async.NewWorkerPool(ctx, db, queue, executor, config)
workerPool.Start()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
log.Println("Shutdown signal received, stopping workers...")
cancel() // Triggers graceful shutdown
// Wait for workers to finish (with timeout)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
workerPool.StopWithContext(shutdownCtx)
Job handlers should check context at task boundaries:
func (h *MyHandler) Execute(ctx context.Context, job *async.Job) error {
for _, item := range items {
// Check for cancellation before each task
select {
case <-ctx.Done():
return ctx.Err() // Job will be checkpointed
default:
}
// Process item
if err := processItem(ctx, item); err != nil {
return err
}
// Update progress
job.Progress.Current++
}
return nil
}
type WorkerPoolConfig struct {
Workers int // Number of concurrent workers
PollInterval time.Duration // How often to check for jobs
ShutdownTimeout time.Duration // Max time to wait for graceful shutdown
}
For faster testing, use shorter intervals:
config := async.WorkerPoolConfig{
Workers: 1,
PollInterval: 100 * time.Millisecond,
ShutdownTimeout: 2 * time.Second,
}
Last updated: 2025-12-27 Status: Implemented and tested