Status: Phases 1-4 Complete ✓
Date: 2026-01-31
Branch: research/plugin-pulse-integration
Goal: Enable plugins to announce async handlers and receive execution requests
Implemented:
domain.proto: Initialize returns InitializeResponse with handler_names[]ExecuteJob RPC to protocolGoal: Python plugin can execute jobs forwarded by Pulse
Implemented:
["python.script"] in initialize()execute_job() RPCPluginProxyHandler in Go forwards jobs to plugins via gRPCqntx-code import from server/ats_parser.goArchitecture working:
Job with handler_name="python.script"
→ Pulse Worker picks up job
→ PluginProxyHandler.Execute()
→ gRPC call to Python plugin
→ Plugin executes code
→ Returns result/progress/cost to Pulse
Goal: Python plugin reads saved scripts from ATS store and announces them as handlers
Implemented:
qntx handler create <name> --code <code> or --file <path>initialize() for handler attestationsInitializeResponse with python. prefix"python.vacancies"Verification:
[python v0.3.10] Discovering Python handlers from ATS store
[python v0.3.10] Discovered 2 handler(s): ["test-handler", "vacancies"]
[python v0.3.10] Announcing: ["python.script", "python.test-handler", "python.vacancies"]
Goal: Fix async race where handlers announced but not registered with Pulse
Problem: Pulse starts during server init, plugins load asynchronously afterward, handler registration ran before plugins finished loading.
Implemented:
GetDaemon() method to QNTXServer for accessing Pulse worker poolserver/init.go to cmd/qntx/main.go after InitializeAll()TestGetDaemon verifies daemon/registry accessVerification:
[plugin-loader] Registering plugin async handlers with Pulse
[plugin-loader] Registering plugin async handler plugin=python handler=python.script
[plugin-loader] Registering plugin async handler plugin=python handler=python.test-handler
[plugin-loader] Registering plugin async handler plugin=python handler=python.vacancies
[plugin-loader] Plugin async handler registration complete
Goal: Move remaining hardcoded handler to plugin for pure plugin-based architecture
Plan:
code.git handlerOriginal Context: Feature branch feature/dynamic-ix-routing attempts to add Python script execution via Pulse, but does so through an ad-hoc Go handler that makes gRPC calls to the plugin. This document proposes a general architecture for plugins to register async handlers directly with Pulse.
Pulse (async job system) and the Plugin system exist as parallel, largely disconnected systems:
┌─────────────────────────────────────────────┐
│ QNTX Core │
│ │
│ ┌──────────────┐ ┌─────────────────┐ │
│ │ Pulse │ │ Plugin System │ │
│ │ │ │ │ │
│ │ - Registry │ │ - Python plugin │ │
│ │ - Workers │ │ - Code plugin │ │
│ │ - Queue │◄─────┤ (can enqueue) │ │
│ └──────────────┘ └─────────────────┘ │
│ │ │
│ │ Handlers registered in Go: │
│ ├─ ixgest.git (qntx-code package) │
│ └─ python.script (NEW, ad-hoc) ❌ │
│ (Go code calls plugin) │
└─────────────────────────────────────────────┘
Current state:
queue_endpoint passed during Initialize)pulse/async/cmd/qntx/commands/pulse.goExample from feature/dynamic-ix-routing:
// cmd/qntx/commands/pulse.go:94
registry.Register(async.NewPythonScriptHandler(pythonURL, logger.Logger))
This creates a Go shim (PythonScriptHandler) that forwards execution to the Python plugin.
Plugins receive service endpoints during initialization:
message InitializeRequest {
string ats_store_endpoint = 1; // For creating attestations
string queue_endpoint = 2; // For enqueuing jobs ✓
string auth_token = 3;
map<string, string> config = 4;
}
Plugins can already:
QueueService gRPCPlugins cannot:
Enable bidirectional integration between Pulse and plugins:
┌─────────────────────────────────────────────┐
│ QNTX Core │
│ │
│ ┌──────────────┐ ┌─────────────────┐ │
│ │ Pulse │◄────►│ Plugin System │ │
│ │ │ │ │ │
│ │ - Registry │ │ - Python plugin │ │
│ │ - Workers │ │ announces: │ │
│ │ - Queue │ │ "python.*" │ │
│ │ │ │ │ │
│ │ Handlers: │ │ - Code plugin │ │
│ │ ├─ ixgest.git│ │ announces: │ │
│ │ │ (Go) │ │ "ixgest.git" │ │
│ │ └─ python.* │ │ │ │
│ │ (proxied │──────► │ │
│ │ to plugin) └─────────────────┘ │
└─────────────────────────────────────────────┘
Key concepts:
job.HandlerNameModify plugin/grpc/protocol/domain.proto:
// Add to InitializeRequest response
message InitializeResponse {
// Handler names this plugin can execute
// Examples: ["python.script", "python.webhook", "python.analysis"]
repeated string handler_names = 1;
}
// NEW: RPC for executing jobs
service DomainPluginService {
// ... existing RPCs ...
// ExecuteJob executes an async job
rpc ExecuteJob(ExecuteJobRequest) returns (ExecuteJobResponse);
}
message ExecuteJobRequest {
string job_id = 1; // For logging/tracking
string handler_name = 2; // Which handler to invoke
bytes payload = 3; // Job-specific data (JSON)
int64 timeout_secs = 4; // Execution timeout
}
message ExecuteJobResponse {
bool success = 1;
string error = 2; // Error message if failed
bytes result = 3; // Optional result data
// Progress tracking (optional)
int32 progress_current = 4;
int32 progress_total = 5;
// Cost tracking (optional)
double cost_actual = 6;
}
Design notes:
Initialize now returns handler names instead of EmptyExecuteJob RPC allows Pulse to invoke plugin handlersPlugins implement handler registration:
Example: Python plugin (qntx-python/src/service.rs)
#[tonic::async_trait]
impl DomainPluginService for PythonPluginService {
async fn initialize(&self, request: Request<InitializeRequest>)
-> Result<Response<InitializeResponse>, Status> {
// ... existing initialization logic ...
// Announce async handler capabilities
Ok(Response::new(InitializeResponse {
handler_names: vec![
"python.script".to_string(),
"python.webhook".to_string(),
"python.csv".to_string(),
],
}))
}
async fn execute_job(&self, request: Request<ExecuteJobRequest>)
-> Result<Response<ExecuteJobResponse>, Status> {
let req = request.into_inner();
// Route to internal handler based on handler_name
match req.handler_name.as_str() {
"python.script" => self.execute_python_script(req).await,
"python.webhook" => self.execute_webhook_handler(req).await,
"python.csv" => self.execute_csv_handler(req).await,
_ => Err(Status::not_found(format!(
"Unknown handler: {}", req.handler_name
))),
}
}
}
Create a generic proxy handler in pulse/async/:
New file: pulse/async/plugin_proxy_handler.go
package async
import (
"context"
"github.com/teranos/QNTX/plugin/grpc"
"github.com/teranos/QNTX/plugin/grpc/protocol"
)
// PluginProxyHandler forwards job execution to a plugin via gRPC
type PluginProxyHandler struct {
handlerName string
plugin *grpc.ExternalDomainProxy
}
func NewPluginProxyHandler(handlerName string, plugin *grpc.ExternalDomainProxy) *PluginProxyHandler {
return &PluginProxyHandler{
handlerName: handlerName,
plugin: plugin,
}
}
func (h *PluginProxyHandler) Name() string {
return h.handlerName
}
func (h *PluginProxyHandler) Execute(ctx context.Context, job *Job) error {
// Forward job to plugin via gRPC
req := &protocol.ExecuteJobRequest{
JobId: job.ID,
HandlerName: h.handlerName,
Payload: job.Payload,
TimeoutSecs: 300, // TODO: configurable
}
resp, err := h.plugin.Client().ExecuteJob(ctx, req)
if err != nil {
return errors.Wrap(err, "plugin execution failed")
}
if !resp.Success {
return errors.New(resp.Error)
}
// Update job progress/cost from plugin response
if resp.ProgressTotal > 0 {
job.Progress = Progress{
Current: int(resp.ProgressCurrent),
Total: int(resp.ProgressTotal),
}
}
if resp.CostActual > 0 {
job.CostActual = resp.CostActual
}
return nil
}
Modify plugin initialization to register handlers with Pulse:
Updated: server/init.go or plugin initialization code
func initializePlugins(ctx context.Context, db *sql.DB, logger *zap.SugaredLogger) error {
pluginManager := grpcplugin.NewPluginManager(logger)
// Load plugins from config
if err := pluginManager.LoadPlugins(ctx, pluginConfigs); err != nil {
return err
}
// Create Pulse handler registry
pulseRegistry := async.NewHandlerRegistry()
// Register handlers from each plugin
for _, plugin := range pluginManager.GetAllPlugins() {
metadata := plugin.Metadata()
// Get handler names from plugin (via Initialize response)
handlerNames := plugin.GetHandlerNames() // NEW method
for _, handlerName := range handlerNames {
logger.Infof("Registering plugin handler: %s (from %s plugin)",
handlerName, metadata.Name)
proxyHandler := async.NewPluginProxyHandler(handlerName, plugin)
pulseRegistry.Register(proxyHandler)
}
}
return nil
}
Phase 1: Add protocol support (no behavior change)
InitializeResponse with handler_names to protobufExecuteJob RPC to protobufInitializeResponse (empty list for now)Phase 2: Implement plugin-side handlers
execute_job methodinitialize responsePhase 3: Implement Pulse proxy handlers
PluginProxyHandler in GoPhase 4: Remove ad-hoc handlers
pulse/async/python_handler.go (from branch #2)cmd/qntx/commands/pulse.goPhase 5: Migrate existing handlers
ixgest.git handler into qntx-code pluginServiceRegistry patternats_parser.go queries attestations for handlerspulse/async/Scenario: Plugin process crashes while executing a job
Mitigation:
Scenario: Two plugins claim the same handler name
Mitigation:
python.python.script vs rust.python.script)Scenario: gRPC call overhead for every job execution
Analysis:
Mitigation:
Scenario: Existing deployments break
Mitigation:
Handler discovery: Should plugins announce handlers dynamically (via attestations) or statically (via Initialize)?
Handler namespacing: Should handler names include plugin name prefix?
python:script)Job lifecycle: Should plugins update job status themselves?
Timeouts: Who enforces timeout - Pulse or plugin?
Existing Go handlers: Migrate all to plugins, or keep some in core?
InitializeResponse message with handler_names fieldExecuteJob RPC to DomainPluginServiceExecuteJobRequest and ExecuteJobResponse messagesmake proto or similarqntx-python/initialize() to return InitializeResponseexecute_job() RPC handlerhandler_namepulse/async/plugin_proxy_handler.goGetHandlerNames() method to ExternalDomainProxyserver/init.go to wire plugin handlers to Pulsepython.script jobsdocs/api/grpc-plugin.md with new RPCdocs/development/external-plugin-guide.mdpulse/async/python_handler.go (from feature branch)cmd/qntx/commands/pulse.goixgest.git to qntx-code pluginBranch #2 approach:
Job "python.script"
↓
Pulse worker
↓
PythonScriptHandler (Go)
↓
HTTP POST to /api/python/execute (actually gRPC)
↓
Python plugin
Proposed approach:
Job "python.script"
↓
Pulse worker
↓
PluginProxyHandler (Go)
↓
ExecuteJob RPC
↓
Python plugin
Differences:
Effort comparison:
Plugin-Pulse integration is a natural evolution of QNTX's plugin architecture. By enabling plugins to register async handlers, we:
The implementation is straightforward, backward compatible, and provides immediate value for the Python plugin while establishing a pattern for all future plugins.