Opening (✿) and Closing (❀)

Graceful shutdown and startup system for async job processing.

Symbols:

(Formerly codename: GRACE - Graceful Async Cancellation Engine)

Implementation Summary

❀ Closing (Graceful Shutdown)

✿ Opening (Graceful Start)

Key Files

Testing

Verified by:

# Fast tests (~10s)
go test ./pulse/async -run TestGRACE -short

# Full integration tests (~60s)
go test ./pulse/async -run TestGRACE

Phase Recovery Architecture

Two-Phase Job Pattern

Some jobs use a two-phase execution pattern:

  1. "ingest" phase: Process data, create sub-entities, enqueue child tasks
  2. "aggregate" phase: Wait for child tasks to complete, aggregate results

This pattern solves parent-child job coordination without blocking worker threads.

Smart Phase Recovery (Implemented)

Location: pulse/async/worker.go:failOrphanedJob()

When recovering orphaned jobs, GRACE validates phase consistency:

// Example: Check if child tasks actually exist for aggregate phase
if job.Metadata != nil && job.Metadata.Phase == "aggregate" {
    tasks, err := wp.queue.ListTasksByParent(job.ID)
    if err != nil || len(tasks) == 0 {
        // No tasks found - reset to ingest phase
        job.Metadata.Phase = ""
        log.Printf("GRACE: Reset job %s to 'ingest' phase (no child tasks found)")
    } else {
        // Tasks exist - keep aggregate phase
        log.Printf("GRACE: Job %s staying in 'aggregate' phase (%d child tasks found)")
    }
}

Scenarios handled:

  1. Crash during phase transition:

  2. Normal crash after task creation:

  3. Crash during ingest phase:

Testing

Verified by:

go test ./pulse/async -run TestGRACEPhaseRecovery -v

Implementation Details

Error handling: If checking tasks fails (DB error), defaults to safe behavior (reset phase)

Logging: All phase decisions logged for debugging:

Backward compatibility: Only affects jobs with two-phase metadata; other job types unaffected

Parent-Child Job Lifecycle

Overview

Parent jobs spawn child tasks (subtasks). The system ensures child tasks are properly managed throughout the parent's lifecycle.

Cascade Deletion

Location: pulse/async/queue.go:DeleteJobWithChildren()

When a parent job is deleted:

  1. System finds all child tasks associated with parent
  2. Marks all active child tasks as cancelled with reason "parent job deleted"
  3. Deletes the parent job from database
  4. Preserves completed/failed children for audit trail

Race condition protection: Before enqueueing children, parent checks if it still exists in database. This prevents enqueueing tasks after parent deletion during execution.

// Check if parent job still exists before enqueueing children
if _, err := queue.GetJob(job.ID); err != nil {
    return fmt.Errorf("parent job deleted during execution: %w", err)
}

Orphan Cleanup

Location: pulse/async/queue.go:cancelOrphanedChildren()

When a parent job completes or fails:

  1. System finds all child tasks still active (queued/running/paused)
  2. Cancels each child with reason "parent job completed"
  3. Preserves completed/failed/cancelled children for history

Behavior:

Retry Logic

Location: pulse/async/error.go:RetryableError()

Failed tasks can be retried automatically (max 2 retries = 3 total attempts):

  1. Task fails with retryable error (AI failure, network error, timeout)
  2. System increments retry_count and re-queues job
  3. Logs retry attempt: ꩜ Retry 1/2: operation failed | job:JB_abc123
  4. After max retries exceeded, logs: ꩜ Max retries exceeded (2): operation failed | job:JB_abc123

Database tracking: Each retry attempt updates the job record with retry count and error details, providing full audit trail.

Testing

Verified by:

go test ./pulse/async -run TestParentJobHierarchy -v

Integration Guide

Application Shutdown

Applications using Pulse should propagate shutdown signals:

// Create worker pool with application context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

workerPool := async.NewWorkerPool(ctx, db, cfg, poolCfg, logger)
workerPool.Start()

// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

<-sigChan
log.Println("Shutdown signal received, stopping workers...")

// Stop() cancels context and waits for workers with timeout (default 20s, configurable via poolCfg.WorkerStopTimeout)
workerPool.Stop()

Handler Context Checks

Job handlers should check context at task boundaries:

func (h *MyHandler) Execute(ctx context.Context, job *async.Job) error {
    for _, item := range items {
        // Check for cancellation before each task
        select {
        case <-ctx.Done():
            return ctx.Err() // Job will be checkpointed
        default:
        }

        // Process item
        if err := processItem(ctx, item); err != nil {
            return err
        }

        // Update progress
        job.Progress.Current++
    }

    return nil
}

Configuration

Worker Pool Config

type WorkerPoolConfig struct {
    Workers              int           // Number of concurrent workers
    PollInterval         *time.Duration // Poll interval: nil = gradual ramp-up (default), 0 = no polling, positive = fixed interval
    PauseOnBudget        bool          // Pause jobs when budget exceeded
    GracefulStartPhase   time.Duration // Duration of each graceful start phase (default: 5min, test: 10s)
    WorkerStopTimeout    time.Duration // Max time to wait for workers to checkpoint and exit (default: 20s)
    MaxConsecutiveErrors int           // Threshold for applying exponential backoff (default: 5)
    MaxBackoff           time.Duration // Maximum exponential backoff duration (default: 30s)
}

Test Mode

For faster testing, use shorter intervals:

pollInterval := 100 * time.Millisecond
config := async.WorkerPoolConfig{
    Workers:              1,
    PollInterval:         &pollInterval,
    GracefulStartPhase:   10 * time.Second,
    WorkerStopTimeout:    2 * time.Second,
    MaxConsecutiveErrors: 3,
    MaxBackoff:           5 * time.Second,
}

Diagnostics

pprof (always on)

net/http/pprof is registered on the main HTTP server. Available whenever QNTX is running:

EndpointUse
/debug/pprof/goroutine?debug=2Full goroutine stacks with mutex wait times — primary deadlock diagnostic
/debug/pprof/mutexMutex contention profile
/debug/pprof/heapMemory allocation profile
/debug/pprof/profile?seconds=30CPU profile (30s sample)

Example: to diagnose a hanging ATS store, hit http://localhost:{port}/debug/pprof/goroutine?debug=2 and look for goroutines blocked on mutex acquisition.

Signal behavior

SignalTriggerBehavior
SIGINTCtrl+CGraceful shutdown: stop workers (20s timeout), checkpoint jobs, stop plugins, exit 0
SIGTERMkill <pid>Same as SIGINT
SIGQUITCtrl+\ or kill -QUITGo default: goroutine stacks to stderr, exit 2. Fallback when HTTP is unreachable

Mutex watchdog

The RustStore shared mutex (ats/storage/sqlitecgo/storage_cgo.go) serializes all SQLite access. A leaked transaction or slow CGO call can hold this mutex indefinitely, deadlocking all attestation operations.

A background goroutine periodically attempts to acquire the mutex with a timeout. If acquisition takes longer than the threshold, it logs a warning with the current goroutine stacks. This provides early warning before a full deadlock develops.

Database backup

Hot backup runs on the Pulse ticker without holding the Go mutex. See database-backup.md for architecture, write-load behavior, and why run_to_completion stalls under sustained writes.


Status: Implemented and tested