Pulse & Async IX: Budget-Controlled Asynchronous Job Processing

Overview

Pulse is QNTX's rate-limiting and budget control system for asynchronous operations. It enables long-running, cost-sensitive operations to run asynchronously while adhering to API rate limits and money budgets. For the HTTP API that controls Pulse, see the Pulse API Documentation.

Motivation

Modern AI-powered applications often involve:

Without controls:

Architecture

Pulse System

Pulse acts as a smart rate limiter and budget manager for outgoing API calls.

┌─────────────────────────────────────────┐
│          Pulse Controller               │
├─────────────────────────────────────────┤
│  ├─ Rate Limiter (calls/minute)         │
│  ├─ Budget Tracker (daily/monthly USD)  │
│  ├─ Cost Estimator (per operation)      │
│  ├─ Pause/Resume Control                │
│  └─ Observability Integration           │
└─────────────────────────────────────────┘
              ↓  ↑
       ┌──────────────┐
       │ Async Job    │
       └──────────────┘

Key Responsibilities:

Async Job System

Jobs run asynchronously with pulse control using a generic handler-based architecture.

// Generic Async Job (handler-based architecture)
type Job struct {
    ID           string          // Unique job ID (ASID)
    HandlerName  string          // Handler identifier (e.g., "domain.operation")
    Payload      json.RawMessage // Handler-specific data (domain-owned)
    Source       string          // Data source (for deduplication)
    Status       JobStatus       // "queued", "running", "paused", "completed", "failed"
    Progress     Progress        // Current/total operations
    CostEstimate float64         // Estimated USD cost
    CostActual   float64         // Actual USD cost so far
    PulseState   *PulseState     // Rate limit, budget status
    Error        string          // Error message if failed
    ParentJobID  string          // For task hierarchies
    RetryCount   int             // Retry attempts (max 2)
    CreatedAt    time.Time
    StartedAt    *time.Time
    CompletedAt  *time.Time
    UpdatedAt    time.Time
}

type PulseState struct {
    CallsThisMinute  int
    CallsRemaining   int
    SpendToday       float64
    SpendThisMonth   float64
    BudgetRemaining  float64
    IsPaused         bool
    PauseReason      string  // "budget_exceeded", "rate_limit", "user_requested"
}

// Handler-based execution
type JobHandler interface {
    Name() string                                    // "domain.operation"
    Execute(ctx context.Context, job *Job) error
}

Type Reference: See Job, PulseState, and Progress type definitions.

Generic Architecture:

Configuration

Example Pulse configuration (see am package for full config system):

[pulse]
max_calls_per_minute = 10
daily_budget_usd = 5.0
monthly_budget_usd = 100.0
pause_on_budget_exceeded = true

Implementation

Database Schema

Pulse Budget Tracking

CREATE TABLE pulse_budget (
    date TEXT PRIMARY KEY,           -- "2025-11-23" for daily, "2025-11" for monthly
    type TEXT NOT NULL,              -- "daily" or "monthly"
    spend_usd REAL NOT NULL,         -- Current spend in USD
    operations_count INTEGER NOT NULL,
    created_at DATETIME,
    updated_at DATETIME
);

CREATE INDEX idx_pulse_budget_type ON pulse_budget(type);

Async Job Queue

CREATE TABLE async_ix_jobs (
    id TEXT PRIMARY KEY,             -- Job ID (ASID)
    handler_name TEXT,               -- Handler identifier
    source TEXT NOT NULL,            -- Data source (for deduplication)
    status TEXT NOT NULL,            -- "queued", "running", "paused", "completed", "failed"
    progress_current INTEGER,        -- Current operations completed
    progress_total INTEGER,          -- Total operations
    cost_estimate REAL,              -- Estimated USD cost
    cost_actual REAL,                -- Actual USD cost
    pulse_state TEXT,                -- JSON: PulseState
    error TEXT,                      -- Error message if failed
    payload TEXT,                    -- JSON: Handler-specific data
    parent_job_id TEXT,              -- Parent job for task hierarchies
    retry_count INTEGER DEFAULT 0,   -- Retry attempts (max 2)
    created_at DATETIME,
    started_at DATETIME,
    completed_at DATETIME,
    updated_at DATETIME
);

CREATE INDEX idx_async_ix_jobs_status ON async_ix_jobs(status);
CREATE INDEX idx_async_ix_jobs_created ON async_ix_jobs(created_at DESC);
CREATE INDEX idx_async_ix_jobs_handler ON async_ix_jobs(handler_name);
CREATE INDEX idx_async_ix_jobs_source_handler ON async_ix_jobs(source, handler_name);

Core Components

Rate Limiter (pulse/budget/limiter.go)

Sliding window rate limiter with configurable calls per minute:

type Limiter struct {
    maxCallsPerMinute int
    window            time.Duration
    mu                sync.Mutex
    callTimes         []time.Time
}

func (r *Limiter) Allow() error {
    // Check if call allowed within rate limit
    // Returns error if rate limit exceeded
}

func (r *Limiter) Wait(ctx context.Context) error {
    // Blocks until call is allowed or context cancelled
}

Features:

Budget Tracker (internal/pulse/budget/tracker.go)

Tracks daily/monthly spend with persistence:

type Tracker struct {
    store  *Store
    config BudgetConfig
    mu     sync.RWMutex
}

func (b *Tracker) CheckBudget(estimatedCost float64) error {
    // Check if operation would exceed budget
}

func (b *Tracker) RecordOperation(actualCost float64) error {
    // Record actual cost in database
}

func (b *Tracker) GetStatus() (*Status, error) {
    // Returns current budget status from ai_model_usage table
}

Type Reference: See Limiter, Tracker, BudgetConfig, and Status type definitions.

Package: pulse/budget - Separated from async to eliminate import cycles

Job Queue (pulse/async/queue.go)

Manages async job lifecycle:

type Queue struct {
    store *Store
}

// Enqueue adds job to queue
func (q *Queue) Enqueue(job *Job) error

// Dequeue gets next runnable job (queued or scheduled, not paused)
func (q *Queue) Dequeue() (*Job, error)

// PauseJob pauses a running job
func (q *Queue) PauseJob(jobID string, reason string) error

// ResumeJob resumes a paused job
func (q *Queue) ResumeJob(jobID string) error

// CompleteJob marks job as completed
func (q *Queue) CompleteJob(jobID string) error

// FailJob marks job as failed with error
func (q *Queue) FailJob(jobID string, err error) error

Worker Pool (pulse/async/worker.go)

Processes jobs with pulse integration:

type WorkerPool struct {
    queue         *Queue
    budgetTracker *budget.Tracker  // Optional - can be nil for tests
    rateLimiter   *budget.Limiter  // Optional - can be nil for tests
    workers       int
    executor      JobExecutor
}

// Start begins processing jobs
func (wp *WorkerPool) Start()

// Stop gracefully stops workers
func (wp *WorkerPool) Stop()

// processNextJob processes one job with rate limiting and budget checks
func (wp *WorkerPool) processNextJob() error {
    // 1. Dequeue job
    // 2. Check rate limit (pause if exceeded)
    // 3. Check budget (pause if exceeded)
    // 4. Execute job via handler registry
    // 5. Mark complete/failed
}

Worker Pool Features:

Handler Registration

Current Limitation: Handler availability validation happens at job execution time, not at job creation time. This can lead to jobs being created successfully but failing when executed because the required handler is not registered.

Example Scenario:

1. User creates IX job: "ix https://github.com/user/repo"
2. ATS parser (server/ats_parser.go) hardcodes HandlerName: "ixgest.git"
3. Job is created successfully in scheduled_pulse_jobs table
4. Job is executed by ticker and enqueued to async queue
5. Worker pool tries to execute job
6. Handler lookup fails: "no handler registered for handler name: ixgest.git"
7. Job fails with error, pulse_execution record updated to 'failed'

Why This Happens:

Current Behavior:

Future Improvements (separate issue):

  1. Early validation: Check handler availability during job creation

  2. Plugin-aware parsing: ATS parser queries plugin registry before hardcoding handler names

  3. Graceful degradation: Allow jobs to be created but mark as "pending handler availability"

References:

Implementation Status

Phase 1: Pulse Foundation ✅ COMPLETE

Files:

Architecture Benefits:

Phase 2: Async Job System ✅ COMPLETE

Files:

Testing Strategy

Unit Tests

Total: 41/41 tests passing

Integration Tests

Full async workflow end-to-end:

Future Enhancements

Dynamic Cost Estimation

Query pricing APIs for real-time cost updates:

Priority Queues

Allow high-priority jobs to skip the queue:

Scheduling

Schedule expensive operations for specific times:

Multi-Model Support

Configure different models for different operations:

Cost Optimization

Reduce API costs through intelligent caching:

Use Case Examples

Example 1: Batch Data Processing

// Define payload type for your domain
type BatchProcessPayload struct {
    SourceURL string   `json:"source_url"`
    RecordIDs []string `json:"record_ids"`
    BatchType string   `json:"batch_type"`
}

// Implement handler
type BatchProcessHandler struct {
    dataService DataService
    queue       *async.Queue
    logger      *zap.Logger
}

func (h *BatchProcessHandler) Name() string {
    return "data.batch-process"
}

func (h *BatchProcessHandler) Execute(ctx context.Context, job *async.Job) error {
    var payload BatchProcessPayload
    if err := json.Unmarshal(job.Payload, &payload); err != nil {
        return fmt.Errorf("invalid payload: %w", err)
    }

    // Process with progress tracking and cost recording
    for i, recordID := range payload.RecordIDs {
        // Check for cancellation
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // Process record (with API call cost)
        if err := h.dataService.ProcessRecord(ctx, recordID); err != nil {
            return fmt.Errorf("failed to process record %s: %w", recordID, err)
        }

        // Update progress
        job.Progress.Current = i + 1
        job.CostActual += 0.001 // $0.001 per API call
        if err := h.queue.UpdateJob(job); err != nil {
            h.logger.Warn("Failed to update job progress", zap.Error(err))
        }
    }

    return nil
}

// Register handler
registry := async.NewHandlerRegistry()
registry.Register(&BatchProcessHandler{
    dataService: myDataService,
    queue:       queue,
    logger:      logger,
})

Example 2: ML Model Inference

type InferencePayload struct {
    ModelName  string   `json:"model_name"`
    InputData  []string `json:"input_data"`
    BatchSize  int      `json:"batch_size"`
}

type InferenceHandler struct {
    mlService MLService
    queue     *async.Queue
}

func (h *InferenceHandler) Name() string {
    return "ml.inference"
}

func (h *InferenceHandler) Execute(ctx context.Context, job *async.Job) error {
    var payload InferencePayload
    if err := json.Unmarshal(job.Payload, &payload); err != nil {
        return fmt.Errorf("invalid payload: %w", err)
    }

    // Run inference with cost tracking
    results, cost, err := h.mlService.RunInference(ctx, payload.ModelName, payload.InputData)
    if err != nil {
        return err
    }

    job.CostActual += cost
    return h.queue.UpdateJob(job)
}

Budget Exceeded Workflow

When daily or monthly budget is exceeded, jobs are automatically paused:

User Operation:
  → Pulse checks budget
  → Daily budget exceeded: $5.02 / $5.00

Response:
  ✗ Daily budget exceeded: $5.02 / $5.00
    Projected cost: $0.040 USD
    Options:
      1. Reduce batch size
      2. Increase daily budget (config: pulse.daily_budget_usd)
      3. Wait until tomorrow (budget resets at midnight UTC)

Automatic Pause Behavior:

Budget Reset:

Related Documentation