Opening (✿) and Closing (❀)

Graceful shutdown and startup system for async job processing.

Symbols:

(Formerly codename: GRACE - Graceful Async Cancellation Engine)

Implementation Summary

❀ Closing (Graceful Shutdown)

✿ Opening (Graceful Start)

Key Files

Testing

# Fast tests (~10s)
go test ./pulse/async -run TestGRACE -short

# Full integration tests (~60s)
go test ./pulse/async -run TestGRACE

Phase Recovery Architecture

Two-Phase Job Pattern

Some jobs use a two-phase execution pattern:

  1. "ingest" phase: Process data, create sub-entities, enqueue child tasks
  2. "aggregate" phase: Wait for child tasks to complete, aggregate results

This pattern solves parent-child job coordination without blocking worker threads.

Smart Phase Recovery (Implemented)

Location: pulse/async/worker.go:requeueOrphanedJob()

When recovering orphaned jobs, GRACE validates phase consistency:

// Example: Check if child tasks actually exist for aggregate phase
if job.Metadata != nil && job.Metadata.Phase == "aggregate" {
    tasks, err := wp.queue.ListTasksByParent(job.ID)
    if err != nil || len(tasks) == 0 {
        // No tasks found - reset to ingest phase
        job.Metadata.Phase = ""
        log.Printf("GRACE: Reset job %s to 'ingest' phase (no child tasks found)")
    } else {
        // Tasks exist - keep aggregate phase
        log.Printf("GRACE: Job %s staying in 'aggregate' phase (%d child tasks found)")
    }
}

Scenarios handled:

  1. Crash during phase transition:

  2. Normal crash after task creation:

  3. Crash during ingest phase:

Testing

Phase recovery is tested in pulse/async/grace_test.go:

# Run phase recovery tests
go test ./pulse/async -run TestGRACEPhaseRecovery -v

Implementation Details

Error handling: If checking tasks fails (DB error), defaults to safe behavior (reset phase)

Logging: All phase decisions logged for debugging:

Backward compatibility: Only affects jobs with two-phase metadata; other job types unaffected

Parent-Child Job Lifecycle

Overview

Parent jobs spawn child tasks (subtasks). The system ensures child tasks are properly managed throughout the parent's lifecycle.

Cascade Deletion

Location: pulse/async/queue.go:DeleteJobWithChildren()

When a parent job is deleted:

  1. System finds all child tasks associated with parent
  2. Marks all active child tasks as cancelled with reason "parent job deleted"
  3. Deletes the parent job from database
  4. Preserves completed/failed children for audit trail

Race condition protection: Before enqueueing children, parent checks if it still exists in database. This prevents enqueueing tasks after parent deletion during execution.

// Check if parent job still exists before enqueueing children
if _, err := queue.GetJob(job.ID); err != nil {
    return fmt.Errorf("parent job deleted during execution: %w", err)
}

Orphan Cleanup

Location: pulse/async/queue.go:cancelOrphanedChildren()

When a parent job completes or fails:

  1. System finds all child tasks still active (queued/running/paused)
  2. Cancels each child with reason "parent job completed"
  3. Preserves completed/failed/cancelled children for history

Behavior:

Retry Logic

Location: pulse/async/error.go:RetryableError()

Failed tasks can be retried automatically (max 3 attempts total):

  1. Task fails with retryable error (AI failure, network error, timeout)
  2. System increments retry_count and re-queues job
  3. Logs retry attempt: ꩜ Retry 1/2: operation failed | job:JB_abc123
  4. After max retries, logs: ꩜ Max retries exceeded (2): operation failed | job:JB_abc123

Database tracking: Each retry attempt updates the job record with retry count and error details, providing full audit trail.

Testing

# Test cascade deletion
go test ./pulse/async -run TestDeleteJobWithChildren -v

# Test parent-child lifecycle
go test ./pulse/async -run TestParentChild -v

Integration Guide

Application Shutdown

Applications using Pulse should propagate shutdown signals:

// Create worker pool with application context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

workerPool := async.NewWorkerPool(ctx, db, queue, executor, config)
workerPool.Start()

// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

<-sigChan
log.Println("Shutdown signal received, stopping workers...")
cancel() // Triggers graceful shutdown

// Wait for workers to finish (with timeout)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
workerPool.StopWithContext(shutdownCtx)

Handler Context Checks

Job handlers should check context at task boundaries:

func (h *MyHandler) Execute(ctx context.Context, job *async.Job) error {
    for _, item := range items {
        // Check for cancellation before each task
        select {
        case <-ctx.Done():
            return ctx.Err() // Job will be checkpointed
        default:
        }

        // Process item
        if err := processItem(ctx, item); err != nil {
            return err
        }

        // Update progress
        job.Progress.Current++
    }

    return nil
}

Configuration

Worker Pool Config

type WorkerPoolConfig struct {
    Workers       int           // Number of concurrent workers
    PollInterval  time.Duration // How often to check for jobs
    ShutdownTimeout time.Duration // Max time to wait for graceful shutdown
}

Test Mode

For faster testing, use shorter intervals:

config := async.WorkerPoolConfig{
    Workers:         1,
    PollInterval:    100 * time.Millisecond,
    ShutdownTimeout: 2 * time.Second,
}

Last updated: 2025-12-27 Status: Implemented and tested