This guide explains how to develop external QNTX domain plugins that run as separate processes and communicate via gRPC. For the complete gRPC API reference, see Plugin gRPC API.
There is one plugin interface: DomainPlugin. Both built-in and external plugins implement the same interface:
DomainPlugin directly (e.g., code.Plugin)ExternalDomainProxy implements DomainPlugin by proxying gRPC calls to a sidecar processFrom the Registry's perspective, there is no difference. This enables:
┌─────────────────────────────────────┐
│ domains.Registry │
│ (treats all plugins identically) │
└─────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ code.Plugin │ │ finance.Plugin │ │ExternalDomainProxy│
│ (built-in) │ │ (built-in) │ │ (adapter) │
│ │ │ │ │ │
│ implements │ │ implements │ │ implements │
│ DomainPlugin │ │ DomainPlugin │ │ DomainPlugin │
└──────────────────┘ └──────────────────┘ └─────────┬─────────┘
│ gRPC
▼
┌──────────────────┐
│ External Plugin │
│ (sidecar process)│
│ │
│ PluginServer │
│ wraps DomainPlugin│
└──────────────────┘
The ExternalDomainProxy is simply an adapter that:
DomainPlugin interfacePluginServermkdir my-plugin
cd my-plugin
go mod init github.com/myorg/qntx-myplugin
go get github.com/teranos/QNTX/plugin
go get github.com/teranos/QNTX/plugin/grpc
go get google.golang.org/grpc
// plugin.go
package main
import (
"context"
"net/http"
"github.com/spf13/cobra"
"github.com/teranos/QNTX/plugin"
)
type MyPlugin struct {
services domains.ServiceRegistry
}
func NewMyPlugin() *MyPlugin {
return &MyPlugin{}
}
func (p *MyPlugin) Metadata() domains.Metadata {
return domains.Metadata{
Name: "myplugin",
Version: "1.0.0",
QNTXVersion: ">= 0.1.0",
Description: "My custom QNTX plugin",
Author: "Your Name",
License: "MIT",
}
}
func (p *MyPlugin) Initialize(ctx context.Context, services domains.ServiceRegistry) error {
p.services = services
logger := services.Logger("myplugin")
logger.Info("MyPlugin initialized")
return nil
}
func (p *MyPlugin) Shutdown(ctx context.Context) error {
return nil
}
func (p *MyPlugin) Commands() []*cobra.Command {
return []*cobra.Command{
{
Use: "myplugin",
Short: "My plugin commands",
Run: func(cmd *cobra.Command, args []string) {
cmd.Println("Hello from MyPlugin!")
},
},
}
}
func (p *MyPlugin) RegisterHTTP(mux *http.ServeMux) error {
mux.HandleFunc("/api/myplugin/hello", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello from MyPlugin!"))
})
return nil
}
func (p *MyPlugin) RegisterWebSocket() (map[string]domains.WebSocketHandler, error) {
return nil, nil
}
func (p *MyPlugin) Health(ctx context.Context) domains.HealthStatus {
return domains.HealthStatus{
Healthy: true,
Message: "MyPlugin is healthy",
}
}
// main.go
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
plugingrpc "github.com/teranos/QNTX/plugin/grpc"
"go.uber.org/zap"
)
var port = flag.Int("port", 9000, "gRPC server port")
func main() {
flag.Parse()
logger, _ := zap.NewProduction()
sugar := logger.Sugar()
defer logger.Sync()
plugin := NewMyPlugin()
server := plugingrpc.NewPluginServer(plugin, sugar)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
addr := fmt.Sprintf(":%d", *port)
sugar.Infow("Starting plugin", "address", addr)
if err := server.Serve(ctx, addr); err != nil {
sugar.Fatalw("Server error", "error", err)
}
}
go build -o qntx-myplugin .
./qntx-myplugin --port 9001
The PluginManager loads external plugins and returns DomainPlugin instances that can be registered with the Registry:
// In QNTX server initialization
manager := grpc.NewPluginManager(logger)
// Load external plugins from config
configs := []grpc.PluginConfig{
{Name: "myplugin", Enabled: true, Address: "localhost:9001"},
}
manager.LoadPlugins(ctx, configs)
// Get plugins as DomainPlugin instances and register them
for _, plugin := range manager.GetAllPlugins() {
registry.Register(plugin) // Same API as built-in plugins
}
Or configure via ~/.qntx/am.plugins.toml:
[[plugins]]
name = "myplugin"
enabled = true
address = "localhost:9001"
For auto-start:
[[plugins]]
name = "myplugin"
enabled = true
binary = "qntx-myplugin"
auto_start = true
qntx-myplugin/
├── main.go # Entry point with gRPC server
├── plugin.go # DomainPlugin implementation
├── commands.go # CLI command definitions
├── handlers.go # HTTP handler implementations
├── go.mod
├── go.sum
├── README.md
└── config/
└── am.myplugin.toml # Default configuration
Every plugin must implement:
type DomainPlugin interface {
Metadata() Metadata
Initialize(ctx context.Context, services ServiceRegistry) error
Shutdown(ctx context.Context) error
Commands() []*cobra.Command
RegisterHTTP(mux *http.ServeMux) error
RegisterWebSocket() (map[string]WebSocketHandler, error)
Health(ctx context.Context) HealthStatus
}
The gRPC protocol is defined in domains/grpc/protocol/domain.proto:
service DomainPluginService {
rpc Metadata(Empty) returns (MetadataResponse);
rpc Initialize(InitializeRequest) returns (Empty);
rpc Shutdown(Empty) returns (Empty);
rpc Commands(Empty) returns (CommandsResponse);
rpc ExecuteCommand(ExecuteCommandRequest) returns (ExecuteCommandResponse);
rpc HandleHTTP(HTTPRequest) returns (HTTPResponse);
rpc HandleWebSocket(stream WebSocketMessage) returns (stream WebSocketMessage);
rpc Health(Empty) returns (HealthResponse);
}
HTTP requests to /api/<plugin-name>/* are forwarded to the plugin via HandleHTTP:
HTTPRequest protobufHTTPResponseCLI commands are executed via ExecuteCommand:
qntx <plugin> <subcommand>ExecuteCommandRequest with args/flags# Build for current platform
go build -o qntx-myplugin .
# Cross-compile for Linux
GOOS=linux GOARCH=amd64 go build -o qntx-myplugin-linux .
Install to the QNTX plugins directory:
mkdir -p ~/.qntx/plugins
cp qntx-myplugin ~/.qntx/plugins/
chmod +x ~/.qntx/plugins/qntx-myplugin
Create plugin configuration at ~/.qntx/am.myplugin.toml:
# MyPlugin configuration
api_key = "${MYPLUGIN_API_KEY}"
endpoint = "https://api.example.com"
cache_ttl_seconds = 300
func TestMyPlugin_Initialize(t *testing.T) {
plugin := NewMyPlugin()
// Create mock service registry
logger := zaptest.NewLogger(t).Sugar()
services := &mockServiceRegistry{logger: logger}
err := plugin.Initialize(context.Background(), services)
assert.NoError(t, err)
}
Use the provided test helpers:
func TestPluginIntegration(t *testing.T) {
logger := zaptest.NewLogger(t).Sugar()
plugin := NewMyPlugin()
server := plugingrpc.NewPluginServer(plugin, logger)
// Start server on random port
listener, _ := net.Listen("tcp", "localhost:0")
defer listener.Close()
grpcServer := grpc.NewServer()
protocol.RegisterDomainPluginServiceServer(grpcServer, server)
go grpcServer.Serve(listener)
defer grpcServer.Stop()
// Connect client
client, err := plugingrpc.NewPluginClient(listener.Addr().String(), logger)
require.NoError(t, err)
// Test plugin via client
meta := client.Metadata()
assert.Equal(t, "myplugin", meta.Name)
}
Initialize (causes QNTX to fail-fast)func (p *MyPlugin) Initialize(ctx context.Context, services domains.ServiceRegistry) error {
logger := services.Logger("myplugin")
if err := p.connectToAPI(); err != nil {
logger.Errorw("Failed to connect to API", "error", err)
return fmt.Errorf("API connection failed: %w", err)
}
return nil
}
Implement meaningful health checks:
func (p *MyPlugin) Health(ctx context.Context) domains.HealthStatus {
details := make(map[string]interface{})
// Check API connection
if err := p.api.Ping(ctx); err != nil {
return domains.HealthStatus{
Healthy: false,
Message: "API unreachable",
Details: map[string]interface{}{
"api_error": err.Error(),
},
}
}
details["api"] = "connected"
return domains.HealthStatus{
Healthy: true,
Message: "All systems operational",
Details: details,
}
}
All routes must be under /api/<plugin-name>/:
func (p *MyPlugin) RegisterHTTP(mux *http.ServeMux) error {
// ✅ Correct: namespaced routes
mux.HandleFunc("/api/myplugin/", p.handleRoot)
mux.HandleFunc("/api/myplugin/data", p.handleData)
// ❌ Wrong: will conflict with other plugins
// mux.HandleFunc("/data", p.handleData)
return nil
}
Handle shutdown signals properly:
func (p *MyPlugin) Shutdown(ctx context.Context) error {
logger := p.services.Logger("myplugin")
// Stop background workers
if p.worker != nil {
p.worker.Stop()
}
// Close connections
if p.apiClient != nil {
if err := p.apiClient.Close(); err != nil {
logger.Warnw("API client close error", "error", err)
}
}
logger.Info("Plugin shutdown complete")
return nil
}
Specify QNTX version constraints:
func (p *MyPlugin) Metadata() domains.Metadata {
return domains.Metadata{
Name: "myplugin",
Version: "1.0.0",
QNTXVersion: ">= 0.1.0, < 2.0.0", // Semver constraint
// ...
}
}
See the code domain plugin for a complete reference:
cmd/plugins/code/main.godomains/code/plugin.gomake plugins