Pulse is QNTX's rate-limiting and budget control system for asynchronous operations. It enables long-running, cost-sensitive operations to run asynchronously while adhering to API rate limits and money budgets. For the HTTP API that controls Pulse, see the Pulse API Documentation.
Modern AI-powered applications often involve:
Without controls:
Pulse acts as a smart rate limiter and budget manager for outgoing API calls.
┌─────────────────────────────────────────┐
│ Pulse Controller │
├─────────────────────────────────────────┤
│ ├─ Rate Limiter (calls/minute) │
│ ├─ Budget Tracker (daily/monthly USD) │
│ ├─ Cost Estimator (per operation) │
│ ├─ Pause/Resume Control │
│ └─ Observability Integration │
└─────────────────────────────────────────┘
↓ ↑
┌──────────────┐
│ Async Job │
└──────────────┘
Key Responsibilities:
Jobs run asynchronously with pulse control using a generic handler-based architecture.
// Generic Async Job (handler-based architecture)
type Job struct {
ID string // Unique job ID (ASID)
HandlerName string // Handler identifier (e.g., "domain.operation")
Payload json.RawMessage // Handler-specific data (domain-owned)
Source string // Data source (for deduplication)
Status JobStatus // "queued", "running", "paused", "completed", "failed"
Progress Progress // Current/total operations
CostEstimate float64 // Estimated USD cost
CostActual float64 // Actual USD cost so far
PulseState *PulseState // Rate limit, budget status
Error string // Error message if failed
ParentJobID string // For task hierarchies
RetryCount int // Retry attempts (max 2)
CreatedAt time.Time
StartedAt *time.Time
CompletedAt *time.Time
UpdatedAt time.Time
}
type PulseState struct {
CallsThisMinute int
CallsRemaining int
SpendToday float64
SpendThisMonth float64
BudgetRemaining float64
IsPaused bool
PauseReason string // "budget_exceeded", "rate_limit", "user_requested"
}
// Handler-based execution
type JobHandler interface {
Name() string // "domain.operation"
Execute(ctx context.Context, job *Job) error
}
Type Reference: See Job, PulseState, and Progress type definitions.
Generic Architecture:
Example Pulse configuration (see am package for full config system):
[pulse]
max_calls_per_minute = 10
daily_budget_usd = 5.0
monthly_budget_usd = 100.0
pause_on_budget_exceeded = true
CREATE TABLE pulse_budget (
date TEXT PRIMARY KEY, -- "2025-11-23" for daily, "2025-11" for monthly
type TEXT NOT NULL, -- "daily" or "monthly"
spend_usd REAL NOT NULL, -- Current spend in USD
operations_count INTEGER NOT NULL,
created_at DATETIME,
updated_at DATETIME
);
CREATE INDEX idx_pulse_budget_type ON pulse_budget(type);
CREATE TABLE async_ix_jobs (
id TEXT PRIMARY KEY, -- Job ID (ASID)
handler_name TEXT, -- Handler identifier
source TEXT NOT NULL, -- Data source (for deduplication)
status TEXT NOT NULL, -- "queued", "running", "paused", "completed", "failed"
progress_current INTEGER, -- Current operations completed
progress_total INTEGER, -- Total operations
cost_estimate REAL, -- Estimated USD cost
cost_actual REAL, -- Actual USD cost
pulse_state TEXT, -- JSON: PulseState
error TEXT, -- Error message if failed
payload TEXT, -- JSON: Handler-specific data
parent_job_id TEXT, -- Parent job for task hierarchies
retry_count INTEGER DEFAULT 0, -- Retry attempts (max 2)
created_at DATETIME,
started_at DATETIME,
completed_at DATETIME,
updated_at DATETIME
);
CREATE INDEX idx_async_ix_jobs_status ON async_ix_jobs(status);
CREATE INDEX idx_async_ix_jobs_created ON async_ix_jobs(created_at DESC);
CREATE INDEX idx_async_ix_jobs_handler ON async_ix_jobs(handler_name);
CREATE INDEX idx_async_ix_jobs_source_handler ON async_ix_jobs(source, handler_name);
pulse/budget/limiter.go)Sliding window rate limiter with configurable calls per minute:
type Limiter struct {
maxCallsPerMinute int
window time.Duration
mu sync.Mutex
callTimes []time.Time
}
func (r *Limiter) Allow() error {
// Check if call allowed within rate limit
// Returns error if rate limit exceeded
}
func (r *Limiter) Wait(ctx context.Context) error {
// Blocks until call is allowed or context cancelled
}
Features:
internal/pulse/budget/tracker.go)Tracks daily/monthly spend with persistence:
type Tracker struct {
store *Store
config BudgetConfig
mu sync.RWMutex
}
func (b *Tracker) CheckBudget(estimatedCost float64) error {
// Check if operation would exceed budget
}
func (b *Tracker) RecordOperation(actualCost float64) error {
// Record actual cost in database
}
func (b *Tracker) GetStatus() (*Status, error) {
// Returns current budget status from ai_model_usage table
}
Type Reference: See Limiter, Tracker, BudgetConfig, and Status type definitions.
Package: pulse/budget - Separated from async to eliminate import cycles
pulse/async/queue.go)Manages async job lifecycle:
type Queue struct {
store *Store
}
// Enqueue adds job to queue
func (q *Queue) Enqueue(job *Job) error
// Dequeue gets next runnable job (queued or scheduled, not paused)
func (q *Queue) Dequeue() (*Job, error)
// PauseJob pauses a running job
func (q *Queue) PauseJob(jobID string, reason string) error
// ResumeJob resumes a paused job
func (q *Queue) ResumeJob(jobID string) error
// CompleteJob marks job as completed
func (q *Queue) CompleteJob(jobID string) error
// FailJob marks job as failed with error
func (q *Queue) FailJob(jobID string, err error) error
pulse/async/worker.go)Processes jobs with pulse integration:
type WorkerPool struct {
queue *Queue
budgetTracker *budget.Tracker // Optional - can be nil for tests
rateLimiter *budget.Limiter // Optional - can be nil for tests
workers int
executor JobExecutor
}
// Start begins processing jobs
func (wp *WorkerPool) Start()
// Stop gracefully stops workers
func (wp *WorkerPool) Stop()
// processNextJob processes one job with rate limiting and budget checks
func (wp *WorkerPool) processNextJob() error {
// 1. Dequeue job
// 2. Check rate limit (pause if exceeded)
// 3. Check budget (pause if exceeded)
// 4. Execute job via handler registry
// 5. Mark complete/failed
}
Worker Pool Features:
Current Limitation: Handler availability validation happens at job execution time, not at job creation time. This can lead to jobs being created successfully but failing when executed because the required handler is not registered.
Example Scenario:
1. User creates IX job: "ix https://github.com/user/repo"
2. ATS parser (server/ats_parser.go) hardcodes HandlerName: "ixgest.git"
3. Job is created successfully in scheduled_pulse_jobs table
4. Job is executed by ticker and enqueued to async queue
5. Worker pool tries to execute job
6. Handler lookup fails: "no handler registered for handler name: ixgest.git"
7. Job fails with error, pulse_execution record updated to 'failed'
Why This Happens:
Current Behavior:
Future Improvements (separate issue):
Early validation: Check handler availability during job creation
Plugin-aware parsing: ATS parser queries plugin registry before hardcoding handler names
Graceful degradation: Allow jobs to be created but mark as "pending handler availability"
References:
server/ats_parser.go:137pulse/budget packageFiles:
pulse/budget/limiter.go - Rate limiterpulse/budget/tracker.go - Budget trackerpulse/budget/store.go - Budget persistenceArchitecture Benefits:
Files:
pulse/async/job.go - Generic job model (handler-based)pulse/async/handler.go - JobHandler interface and registrypulse/async/store.go - Job persistencepulse/async/queue.go - Queue operationspulse/async/worker.go - Worker pool with budget integrationpulse/async/grace_test.go - Opening/Closing testspulse/budget/limiter_test.go - Rate limiting (9/9 tests)pulse/budget/tracker_test.go - Budget calculationspulse/async/job_test.go - Job models and statepulse/async/queue_test.go - Queue operationspulse/async/store_test.go - Persistencepulse/async/worker_test.go - Worker pool and integrationTotal: 41/41 tests passing
Full async workflow end-to-end:
Query pricing APIs for real-time cost updates:
Allow high-priority jobs to skip the queue:
Schedule expensive operations for specific times:
Configure different models for different operations:
Reduce API costs through intelligent caching:
// Define payload type for your domain
type BatchProcessPayload struct {
SourceURL string `json:"source_url"`
RecordIDs []string `json:"record_ids"`
BatchType string `json:"batch_type"`
}
// Implement handler
type BatchProcessHandler struct {
dataService DataService
queue *async.Queue
logger *zap.Logger
}
func (h *BatchProcessHandler) Name() string {
return "data.batch-process"
}
func (h *BatchProcessHandler) Execute(ctx context.Context, job *async.Job) error {
var payload BatchProcessPayload
if err := json.Unmarshal(job.Payload, &payload); err != nil {
return fmt.Errorf("invalid payload: %w", err)
}
// Process with progress tracking and cost recording
for i, recordID := range payload.RecordIDs {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Process record (with API call cost)
if err := h.dataService.ProcessRecord(ctx, recordID); err != nil {
return fmt.Errorf("failed to process record %s: %w", recordID, err)
}
// Update progress
job.Progress.Current = i + 1
job.CostActual += 0.001 // $0.001 per API call
if err := h.queue.UpdateJob(job); err != nil {
h.logger.Warn("Failed to update job progress", zap.Error(err))
}
}
return nil
}
// Register handler
registry := async.NewHandlerRegistry()
registry.Register(&BatchProcessHandler{
dataService: myDataService,
queue: queue,
logger: logger,
})
type InferencePayload struct {
ModelName string `json:"model_name"`
InputData []string `json:"input_data"`
BatchSize int `json:"batch_size"`
}
type InferenceHandler struct {
mlService MLService
queue *async.Queue
}
func (h *InferenceHandler) Name() string {
return "ml.inference"
}
func (h *InferenceHandler) Execute(ctx context.Context, job *async.Job) error {
var payload InferencePayload
if err := json.Unmarshal(job.Payload, &payload); err != nil {
return fmt.Errorf("invalid payload: %w", err)
}
// Run inference with cost tracking
results, cost, err := h.mlService.RunInference(ctx, payload.ModelName, payload.InputData)
if err != nil {
return err
}
job.CostActual += cost
return h.queue.UpdateJob(job)
}
When daily or monthly budget is exceeded, jobs are automatically paused:
User Operation:
→ Pulse checks budget
→ Daily budget exceeded: $5.02 / $5.00
Response:
✗ Daily budget exceeded: $5.02 / $5.00
Projected cost: $0.040 USD
Options:
1. Reduce batch size
2. Increase daily budget (config: pulse.daily_budget_usd)
3. Wait until tomorrow (budget resets at midnight UTC)
Automatic Pause Behavior:
paused statusPulseState.IsPaused = truePauseReason = "budget_exceeded"Budget Reset: