Pulse is QNTX's rate-limiting and budget control system for asynchronous operations. It enables long-running, cost-sensitive operations to run asynchronously while adhering to API rate limits and money budgets. For the HTTP API that controls Pulse, see the Pulse API Documentation.
Modern AI-powered applications often involve:
Without controls:
Pulse acts as a smart rate limiter and budget manager for outgoing API calls.
┌─────────────────────────────────────────┐
│ Pulse Controller │
├─────────────────────────────────────────┤
│ ├─ Rate Limiter (calls/minute) │
│ ├─ Budget Tracker (daily/monthly USD) │
│ ├─ Cost Estimator (per operation) │
│ ├─ Pause/Resume Control │
│ └─ Observability Integration │
└─────────────────────────────────────────┘
↓ ↑
┌──────────────┐
│ Async Job │
└──────────────┘
Key Responsibilities:
Jobs run asynchronously with pulse control using a generic handler-based architecture.
// Generic Async Job (handler-based architecture)
type Job struct {
ID string // Unique job ID (ASID)
HandlerName string // Handler identifier (e.g., "domain.operation")
Payload json.RawMessage // Handler-specific data (domain-owned)
Source string // Data source (for deduplication)
Status JobStatus // "queued", "running", "paused", "completed", "failed"
Progress Progress // Current/total operations
CostEstimate float64 // Estimated USD cost
CostActual float64 // Actual USD cost so far
PulseState *PulseState // Rate limit, budget status
Error string // Error message if failed
ParentJobID string // For task hierarchies
RetryCount int // Retry attempts (max 2)
CreatedAt time.Time
StartedAt *time.Time
CompletedAt *time.Time
UpdatedAt time.Time
}
type PulseState struct {
CallsThisMinute int
CallsRemaining int
SpendToday float64
SpendThisMonth float64
BudgetRemaining float64
IsPaused bool
PauseReason string // "budget_exceeded", "rate_limit", "user_requested"
}
// Handler-based execution
type JobHandler interface {
Name() string // "domain.operation"
Execute(ctx context.Context, job *Job) error
}
Type Reference: See Job, PulseState, and Progress type definitions.
Generic Architecture:
Example Pulse configuration (see am package for full config system):
[pulse]
max_calls_per_minute = 10
daily_budget_usd = 5.0
monthly_budget_usd = 100.0
pause_on_budget_exceeded = true
CREATE TABLE pulse_budget (
date TEXT PRIMARY KEY, -- "2025-11-23" for daily, "2025-11" for monthly
type TEXT NOT NULL, -- "daily" or "monthly"
spend_usd REAL NOT NULL, -- Current spend in USD
operations_count INTEGER NOT NULL,
created_at DATETIME,
updated_at DATETIME
);
CREATE INDEX idx_pulse_budget_type ON pulse_budget(type);
CREATE TABLE async_ix_jobs (
id TEXT PRIMARY KEY, -- Job ID (ASID)
handler_name TEXT, -- Handler identifier
source TEXT NOT NULL, -- Data source (for deduplication)
status TEXT NOT NULL, -- "queued", "running", "paused", "completed", "failed"
progress_current INTEGER, -- Current operations completed
progress_total INTEGER, -- Total operations
cost_estimate REAL, -- Estimated USD cost
cost_actual REAL, -- Actual USD cost
pulse_state TEXT, -- JSON: PulseState
error TEXT, -- Error message if failed
payload TEXT, -- JSON: Handler-specific data
parent_job_id TEXT, -- Parent job for task hierarchies
retry_count INTEGER DEFAULT 0, -- Retry attempts (max 2)
created_at DATETIME,
started_at DATETIME,
completed_at DATETIME,
updated_at DATETIME
);
CREATE INDEX idx_async_ix_jobs_status ON async_ix_jobs(status);
CREATE INDEX idx_async_ix_jobs_created ON async_ix_jobs(created_at DESC);
CREATE INDEX idx_async_ix_jobs_handler ON async_ix_jobs(handler_name);
CREATE INDEX idx_async_ix_jobs_source_handler ON async_ix_jobs(source, handler_name);
pulse/budget/limiter.go)Sliding window rate limiter with configurable calls per minute:
type Limiter struct {
maxCallsPerMinute int
window time.Duration
mu sync.Mutex
callTimes []time.Time
}
func (r *Limiter) Allow() error {
// Check if call allowed within rate limit
// Returns error if rate limit exceeded
}
func (r *Limiter) Wait(ctx context.Context) error {
// Blocks until call is allowed or context cancelled
}
Features:
internal/pulse/budget/tracker.go)Tracks daily/monthly spend with persistence:
type Tracker struct {
store *Store
config BudgetConfig
mu sync.RWMutex
}
func (b *Tracker) CheckBudget(estimatedCost float64) error {
// Check if operation would exceed budget
}
func (b *Tracker) RecordOperation(actualCost float64) error {
// Record actual cost in database
}
func (b *Tracker) GetStatus() (*Status, error) {
// Returns current budget status from ai_model_usage table
}
Type Reference: See Limiter, Tracker, BudgetConfig, and Status type definitions.
Package: pulse/budget - Separated from async to eliminate import cycles
pulse/async/queue.go)Manages async job lifecycle:
type Queue struct {
store *Store
}
// Enqueue adds job to queue
func (q *Queue) Enqueue(job *Job) error
// Dequeue gets next runnable job (queued or scheduled, not paused)
func (q *Queue) Dequeue() (*Job, error)
// PauseJob pauses a running job
func (q *Queue) PauseJob(jobID string, reason string) error
// ResumeJob resumes a paused job
func (q *Queue) ResumeJob(jobID string) error
// CompleteJob marks job as completed
func (q *Queue) CompleteJob(jobID string) error
// FailJob marks job as failed with error
func (q *Queue) FailJob(jobID string, err error) error
pulse/async/worker.go)Processes jobs with pulse integration:
type WorkerPool struct {
queue *Queue
budgetTracker *budget.Tracker // Optional - can be nil for tests
rateLimiter *budget.Limiter // Optional - can be nil for tests
workers int
executor JobExecutor
}
// Start begins processing jobs
func (wp *WorkerPool) Start()
// Stop gracefully stops workers
func (wp *WorkerPool) Stop()
// processNextJob processes one job with rate limiting and budget checks
func (wp *WorkerPool) processNextJob() error {
// 1. Dequeue job
// 2. Check rate limit (pause if exceeded)
// 3. Check budget (pause if exceeded)
// 4. Execute job via handler registry
// 5. Mark complete/failed
}
Worker Pool Features:
pulse/budget packageFiles:
pulse/budget/limiter.go - Rate limiterpulse/budget/tracker.go - Budget trackerpulse/budget/store.go - Budget persistenceArchitecture Benefits:
Files:
pulse/async/job.go - Generic job model (handler-based)pulse/async/handler.go - JobHandler interface and registrypulse/async/store.go - Job persistencepulse/async/queue.go - Queue operationspulse/async/worker.go - Worker pool with budget integrationpulse/async/grace_test.go - Opening/Closing testspulse/budget/limiter_test.go - Rate limiting (9/9 tests)pulse/budget/tracker_test.go - Budget calculationspulse/async/job_test.go - Job models and statepulse/async/queue_test.go - Queue operationspulse/async/store_test.go - Persistencepulse/async/worker_test.go - Worker pool and integrationTotal: 41/41 tests passing
Full async workflow end-to-end:
Query pricing APIs for real-time cost updates:
Allow high-priority jobs to skip the queue:
Schedule expensive operations for specific times:
Configure different models for different operations:
Reduce API costs through intelligent caching:
// Define payload type for your domain
type BatchProcessPayload struct {
SourceURL string `json:"source_url"`
RecordIDs []string `json:"record_ids"`
BatchType string `json:"batch_type"`
}
// Implement handler
type BatchProcessHandler struct {
dataService DataService
queue *async.Queue
logger *zap.Logger
}
func (h *BatchProcessHandler) Name() string {
return "data.batch-process"
}
func (h *BatchProcessHandler) Execute(ctx context.Context, job *async.Job) error {
var payload BatchProcessPayload
if err := json.Unmarshal(job.Payload, &payload); err != nil {
return fmt.Errorf("invalid payload: %w", err)
}
// Process with progress tracking and cost recording
for i, recordID := range payload.RecordIDs {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Process record (with API call cost)
if err := h.dataService.ProcessRecord(ctx, recordID); err != nil {
return fmt.Errorf("failed to process record %s: %w", recordID, err)
}
// Update progress
job.Progress.Current = i + 1
job.CostActual += 0.001 // $0.001 per API call
if err := h.queue.UpdateJob(job); err != nil {
h.logger.Warn("Failed to update job progress", zap.Error(err))
}
}
return nil
}
// Register handler
registry := async.NewHandlerRegistry()
registry.Register(&BatchProcessHandler{
dataService: myDataService,
queue: queue,
logger: logger,
})
type InferencePayload struct {
ModelName string `json:"model_name"`
InputData []string `json:"input_data"`
BatchSize int `json:"batch_size"`
}
type InferenceHandler struct {
mlService MLService
queue *async.Queue
}
func (h *InferenceHandler) Name() string {
return "ml.inference"
}
func (h *InferenceHandler) Execute(ctx context.Context, job *async.Job) error {
var payload InferencePayload
if err := json.Unmarshal(job.Payload, &payload); err != nil {
return fmt.Errorf("invalid payload: %w", err)
}
// Run inference with cost tracking
results, cost, err := h.mlService.RunInference(ctx, payload.ModelName, payload.InputData)
if err != nil {
return err
}
job.CostActual += cost
return h.queue.UpdateJob(job)
}
When daily or monthly budget is exceeded, jobs are automatically paused:
User Operation:
→ Pulse checks budget
→ Daily budget exceeded: $5.02 / $5.00
Response:
✗ Daily budget exceeded: $5.02 / $5.00
Projected cost: $0.040 USD
Options:
1. Reduce batch size
2. Increase daily budget (config: pulse.daily_budget_usd)
3. Wait until tomorrow (budget resets at midnight UTC)
Automatic Pause Behavior:
paused statusPulseState.IsPaused = truePauseReason = "budget_exceeded"Budget Reset:
docs/development/grace.md - Graceful startup/shutdown systemdocs/architecture/config-system.md - Configuration system including Pulse settingsdocs/architecture/pulse-resource-coordination.md - GPU and system resource management