Pulse & Async IX: Budget-Controlled Asynchronous Job Processing

Overview

Pulse is QNTX's rate-limiting and budget control system for asynchronous operations. It enables long-running, cost-sensitive operations to run asynchronously while adhering to API rate limits and money budgets. For the HTTP API that controls Pulse, see the Pulse API Documentation.

Motivation

Modern AI-powered applications often involve:

Without controls:

Architecture

Pulse System

Pulse acts as a smart rate limiter and budget manager for outgoing API calls.

┌─────────────────────────────────────────┐
│          Pulse Controller               │
├─────────────────────────────────────────┤
│  ├─ Rate Limiter (calls/minute)         │
│  ├─ Budget Tracker (daily/monthly USD)  │
│  ├─ Cost Estimator (per operation)      │
│  ├─ Pause/Resume Control                │
│  └─ Observability Integration           │
└─────────────────────────────────────────┘
              ↓  ↑
       ┌──────────────┐
       │ Async Job    │
       └──────────────┘

Key Responsibilities:

Async Job System

Jobs run asynchronously with pulse control using a generic handler-based architecture.

// Generic Async Job (handler-based architecture)
type Job struct {
    ID           string          // Unique job ID (ASID)
    HandlerName  string          // Handler identifier (e.g., "domain.operation")
    Payload      json.RawMessage // Handler-specific data (domain-owned)
    Source       string          // Data source (for deduplication)
    Status       JobStatus       // "queued", "running", "paused", "completed", "failed"
    Progress     Progress        // Current/total operations
    CostEstimate float64         // Estimated USD cost
    CostActual   float64         // Actual USD cost so far
    PulseState   *PulseState     // Rate limit, budget status
    Error        string          // Error message if failed
    ParentJobID  string          // For task hierarchies
    RetryCount   int             // Retry attempts (max 2)
    CreatedAt    time.Time
    StartedAt    *time.Time
    CompletedAt  *time.Time
    UpdatedAt    time.Time
}

type PulseState struct {
    CallsThisMinute  int
    CallsRemaining   int
    SpendToday       float64
    SpendThisMonth   float64
    BudgetRemaining  float64
    IsPaused         bool
    PauseReason      string  // "budget_exceeded", "rate_limit", "user_requested"
}

// Handler-based execution
type JobHandler interface {
    Name() string                                    // "domain.operation"
    Execute(ctx context.Context, job *Job) error
}

Type Reference: See Job, PulseState, and Progress type definitions.

Generic Architecture:

Configuration

Example Pulse configuration (see am package for full config system):

[pulse]
max_calls_per_minute = 10
daily_budget_usd = 5.0
monthly_budget_usd = 100.0
pause_on_budget_exceeded = true

Implementation

Database Schema

Pulse Budget Tracking

CREATE TABLE pulse_budget (
    date TEXT PRIMARY KEY,           -- "2025-11-23" for daily, "2025-11" for monthly
    type TEXT NOT NULL,              -- "daily" or "monthly"
    spend_usd REAL NOT NULL,         -- Current spend in USD
    operations_count INTEGER NOT NULL,
    created_at DATETIME,
    updated_at DATETIME
);

CREATE INDEX idx_pulse_budget_type ON pulse_budget(type);

Async Job Queue

CREATE TABLE async_ix_jobs (
    id TEXT PRIMARY KEY,             -- Job ID (ASID)
    handler_name TEXT,               -- Handler identifier
    source TEXT NOT NULL,            -- Data source (for deduplication)
    status TEXT NOT NULL,            -- "queued", "running", "paused", "completed", "failed"
    progress_current INTEGER,        -- Current operations completed
    progress_total INTEGER,          -- Total operations
    cost_estimate REAL,              -- Estimated USD cost
    cost_actual REAL,                -- Actual USD cost
    pulse_state TEXT,                -- JSON: PulseState
    error TEXT,                      -- Error message if failed
    payload TEXT,                    -- JSON: Handler-specific data
    parent_job_id TEXT,              -- Parent job for task hierarchies
    retry_count INTEGER DEFAULT 0,   -- Retry attempts (max 2)
    created_at DATETIME,
    started_at DATETIME,
    completed_at DATETIME,
    updated_at DATETIME
);

CREATE INDEX idx_async_ix_jobs_status ON async_ix_jobs(status);
CREATE INDEX idx_async_ix_jobs_created ON async_ix_jobs(created_at DESC);
CREATE INDEX idx_async_ix_jobs_handler ON async_ix_jobs(handler_name);
CREATE INDEX idx_async_ix_jobs_source_handler ON async_ix_jobs(source, handler_name);

Core Components

Rate Limiter (pulse/budget/limiter.go)

Sliding window rate limiter with configurable calls per minute:

type Limiter struct {
    maxCallsPerMinute int
    window            time.Duration
    mu                sync.Mutex
    callTimes         []time.Time
}

func (r *Limiter) Allow() error {
    // Check if call allowed within rate limit
    // Returns error if rate limit exceeded
}

func (r *Limiter) Wait(ctx context.Context) error {
    // Blocks until call is allowed or context cancelled
}

Features:

Budget Tracker (internal/pulse/budget/tracker.go)

Tracks daily/monthly spend with persistence:

type Tracker struct {
    store  *Store
    config BudgetConfig
    mu     sync.RWMutex
}

func (b *Tracker) CheckBudget(estimatedCost float64) error {
    // Check if operation would exceed budget
}

func (b *Tracker) RecordOperation(actualCost float64) error {
    // Record actual cost in database
}

func (b *Tracker) GetStatus() (*Status, error) {
    // Returns current budget status from ai_model_usage table
}

Type Reference: See Limiter, Tracker, BudgetConfig, and Status type definitions.

Package: pulse/budget - Separated from async to eliminate import cycles

Job Queue (pulse/async/queue.go)

Manages async job lifecycle:

type Queue struct {
    store *Store
}

// Enqueue adds job to queue
func (q *Queue) Enqueue(job *Job) error

// Dequeue gets next runnable job (queued or scheduled, not paused)
func (q *Queue) Dequeue() (*Job, error)

// PauseJob pauses a running job
func (q *Queue) PauseJob(jobID string, reason string) error

// ResumeJob resumes a paused job
func (q *Queue) ResumeJob(jobID string) error

// CompleteJob marks job as completed
func (q *Queue) CompleteJob(jobID string) error

// FailJob marks job as failed with error
func (q *Queue) FailJob(jobID string, err error) error

Worker Pool (pulse/async/worker.go)

Processes jobs with pulse integration:

type WorkerPool struct {
    queue         *Queue
    budgetTracker *budget.Tracker  // Optional - can be nil for tests
    rateLimiter   *budget.Limiter  // Optional - can be nil for tests
    workers       int
    executor      JobExecutor
}

// Start begins processing jobs
func (wp *WorkerPool) Start()

// Stop gracefully stops workers
func (wp *WorkerPool) Stop()

// processNextJob processes one job with rate limiting and budget checks
func (wp *WorkerPool) processNextJob() error {
    // 1. Dequeue job
    // 2. Check rate limit (pause if exceeded)
    // 3. Check budget (pause if exceeded)
    // 4. Execute job via handler registry
    // 5. Mark complete/failed
}

Worker Pool Features:

Implementation Status

Phase 1: Pulse Foundation ✅ COMPLETE

Files:

Architecture Benefits:

Phase 2: Async Job System ✅ COMPLETE

Files:

Testing Strategy

Unit Tests

Total: 41/41 tests passing

Integration Tests

Full async workflow end-to-end:

Future Enhancements

Dynamic Cost Estimation

Query pricing APIs for real-time cost updates:

Priority Queues

Allow high-priority jobs to skip the queue:

Scheduling

Schedule expensive operations for specific times:

Multi-Model Support

Configure different models for different operations:

Cost Optimization

Reduce API costs through intelligent caching:

Use Case Examples

Example 1: Batch Data Processing

// Define payload type for your domain
type BatchProcessPayload struct {
    SourceURL string   `json:"source_url"`
    RecordIDs []string `json:"record_ids"`
    BatchType string   `json:"batch_type"`
}

// Implement handler
type BatchProcessHandler struct {
    dataService DataService
    queue       *async.Queue
    logger      *zap.Logger
}

func (h *BatchProcessHandler) Name() string {
    return "data.batch-process"
}

func (h *BatchProcessHandler) Execute(ctx context.Context, job *async.Job) error {
    var payload BatchProcessPayload
    if err := json.Unmarshal(job.Payload, &payload); err != nil {
        return fmt.Errorf("invalid payload: %w", err)
    }

    // Process with progress tracking and cost recording
    for i, recordID := range payload.RecordIDs {
        // Check for cancellation
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // Process record (with API call cost)
        if err := h.dataService.ProcessRecord(ctx, recordID); err != nil {
            return fmt.Errorf("failed to process record %s: %w", recordID, err)
        }

        // Update progress
        job.Progress.Current = i + 1
        job.CostActual += 0.001 // $0.001 per API call
        if err := h.queue.UpdateJob(job); err != nil {
            h.logger.Warn("Failed to update job progress", zap.Error(err))
        }
    }

    return nil
}

// Register handler
registry := async.NewHandlerRegistry()
registry.Register(&BatchProcessHandler{
    dataService: myDataService,
    queue:       queue,
    logger:      logger,
})

Example 2: ML Model Inference

type InferencePayload struct {
    ModelName  string   `json:"model_name"`
    InputData  []string `json:"input_data"`
    BatchSize  int      `json:"batch_size"`
}

type InferenceHandler struct {
    mlService MLService
    queue     *async.Queue
}

func (h *InferenceHandler) Name() string {
    return "ml.inference"
}

func (h *InferenceHandler) Execute(ctx context.Context, job *async.Job) error {
    var payload InferencePayload
    if err := json.Unmarshal(job.Payload, &payload); err != nil {
        return fmt.Errorf("invalid payload: %w", err)
    }

    // Run inference with cost tracking
    results, cost, err := h.mlService.RunInference(ctx, payload.ModelName, payload.InputData)
    if err != nil {
        return err
    }

    job.CostActual += cost
    return h.queue.UpdateJob(job)
}

Budget Exceeded Workflow

When daily or monthly budget is exceeded, jobs are automatically paused:

User Operation:
  → Pulse checks budget
  → Daily budget exceeded: $5.02 / $5.00

Response:
  ✗ Daily budget exceeded: $5.02 / $5.00
    Projected cost: $0.040 USD
    Options:
      1. Reduce batch size
      2. Increase daily budget (config: pulse.daily_budget_usd)
      3. Wait until tomorrow (budget resets at midnight UTC)

Automatic Pause Behavior:

Budget Reset:

Related Documentation