Envoy ExtProc Integration
The Semantic Router leverages Envoy's External Processing (ExtProc) filter to implement intelligent routing decisions. This integration provides a clean separation between traffic management (Envoy) and business logic (Semantic Router), enabling sophisticated routing capabilities while maintaining high performance.
Understanding Envoy ExtProc​
What is ExtProc?​
External Processing (ExtProc) is an Envoy filter that allows external services to participate in request and response processing. Unlike other extension mechanisms, ExtProc provides:
- Streaming Processing: Handle requests and responses as they flow through Envoy
- Full Control: Modify headers, body, and routing decisions
- Low Latency: Optimized gRPC communication between Envoy and external services
- Fault Tolerance: Built-in failure handling and timeout management
ExtProc vs Other Extension Methods​
Extension Method | Use Case | Latency | Flexibility | Complexity |
---|---|---|---|---|
HTTP Filters | Simple transformations | Lowest | Limited | Low |
WebAssembly (WASM) | Sandboxed logic | Low | Medium | Medium |
ExtProc | Complex business logic | Medium | High | Medium |
HTTP Callouts | External API calls | High | High | High |
Why ExtProc for Semantic Routing?
- Complex ML Models: Need full Python/Go ecosystem for BERT models
- Dynamic Decision Making: Requires sophisticated classification logic
- State Management: Needs caching and request tracking
- Observability: Requires comprehensive metrics and logging
ExtProc Protocol Architecture​
Communication Flow​
Processing Modes​
ExtProc can be configured to process different parts of the request/response lifecycle:
# Envoy ExtProc Configuration
processing_mode:
request_header_mode: "SEND" # Process request headers
response_header_mode: "SEND" # Process response headers
request_body_mode: "BUFFERED" # Process entire request body
response_body_mode: "BUFFERED" # Process entire response body
request_trailer_mode: "SKIP" # Skip request trailers
response_trailer_mode: "SKIP" # Skip response trailers
Mode Options:
SKIP
: Don't send to ExtProc (fastest)SEND
: Send headers/trailers onlyBUFFERED
: Send entire body (required for content analysis)STREAMED
: Send body in chunks (for streaming)
Semantic Router ExtProc Implementation​
Go Implementation Structure​
// Main ExtProc Server
type Server struct {
router *OpenAIRouter
server *grpc.Server
port int
}
// Router implements the ExtProc service interface
type OpenAIRouter struct {
Config *config.RouterConfig
CategoryDescriptions []string
Classifier *classification.Classifier
PIIChecker *pii.PolicyChecker
Cache *cache.SemanticCache
ToolsDatabase *tools.ToolsDatabase
pendingRequests map[string][]byte
pendingRequestsLock sync.Mutex
}
// Implements the ExtProc service interface
var _ ext_proc.ExternalProcessorServer = &OpenAIRouter{}
gRPC Service Implementation​
// Process handles the bidirectional streaming RPC
func (r *OpenAIRouter) Process(stream ext_proc.ExternalProcessor_ProcessServer) error {
log.Println("Started processing a new request")
ctx := &RequestContext{
Headers: make(map[string]string),
RequestID: generateRequestID(),
}
for {
// Receive request from Envoy
req, err := stream.Recv()
if err != nil {
return r.handleStreamError(err)
}
// Process based on request type
response, err := r.processRequest(ctx, req)
if err != nil {
return err
}
// Send response back to Envoy
if err := stream.Send(response); err != nil {
return err
}
}
}
Request Processing Pipeline​
1. Request Headers Processing​
func (r *OpenAIRouter) handleRequestHeaders(
ctx *RequestContext,
headers *ext_proc.ProcessingRequest_RequestHeaders,
) (*ext_proc.ProcessingResponse, error) {
// Extract and store headers
for _, header := range headers.RequestHeaders.Headers.Headers {
ctx.Headers[header.Key] = header.Value
}
// Extract request metadata
ctx.Method = ctx.Headers[":method"]
ctx.Path = ctx.Headers[":path"]
ctx.ContentType = ctx.Headers["content-type"]
// Continue processing - we need the body for classification
return &ext_proc.ProcessingResponse{
Response: &ext_proc.ProcessingResponse_RequestHeaders_{
RequestHeaders: &ext_proc.ProcessingResponse_RequestHeaders{},
},
}, nil
}
2. Request Body Processing (Core Logic)​
func (r *OpenAIRouter) handleRequestBody(
ctx *RequestContext,
body *ext_proc.ProcessingRequest_RequestBody,
) (*ext_proc.ProcessingResponse, error) {
// Extract request body
requestBody := body.RequestBody.Body
// Parse OpenAI API request
var openAIRequest OpenAIRequest
if err := json.Unmarshal(requestBody, &openAIRequest); err != nil {
return nil, fmt.Errorf("failed to parse OpenAI request: %w", err)
}
// Extract user query from messages
userQuery := extractUserQuery(openAIRequest.Messages)
// Step 1: Check semantic cache
if cachedResponse, found := r.Cache.Get(userQuery); found {
return r.handleCacheHit(cachedResponse)
}
// Step 2: Security checks
if blocked, reason := r.performSecurityChecks(userQuery); blocked {
return r.handleSecurityBlock(reason)
}
// Step 3: Classify query intent
classification, err := r.Classifier.ClassifyIntent(userQuery)
if err != nil {
return nil, err
}
// Step 4: Select optimal model
selectedEndpoint := r.selectModelEndpoint(classification)
// Step 5: Auto-select relevant tools
selectedTools := r.autoSelectTools(userQuery, openAIRequest.Tools)
// Step 6: Modify request if needed
modifiedRequest := r.modifyRequest(openAIRequest, selectedTools)
modifiedBody, _ := json.Marshal(modifiedRequest)
// Step 7: Set routing headers for Envoy
headerMutations := []*core.HeaderValueOption{
{
Header: &core.HeaderValue{
Key: "x-semantic-destination-endpoint",
Value: selectedEndpoint,
},
Append: &wrapperspb.BoolValue{Value: false},
},
{
Header: &core.HeaderValue{
Key: "x-selected-model",
Value: classification.Category,
},
Append: &wrapperspb.BoolValue{Value: false},
},
{
Header: &core.HeaderValue{
Key: "x-routing-confidence",
Value: fmt.Sprintf("%.3f", classification.Confidence),
},
Append: &wrapperspb.BoolValue{Value: false},
},
}
// Record routing decision for monitoring
r.recordRoutingDecision(ctx, classification, selectedEndpoint)
return &ext_proc.ProcessingResponse{
Response: &ext_proc.ProcessingResponse_RequestBody_{
RequestBody: &ext_proc.ProcessingResponse_RequestBody{
Response: &ext_proc.BodyResponse{
BodyMutation: &ext_proc.BodyMutation{
Mutation: &ext_proc.BodyMutation_Body{
Body: modifiedBody,
},
},
},
},
},
ModeOverride: &ext_proc.ProcessingMode{
RequestHeaderMode: ext_proc.ProcessingMode_SEND,
ResponseHeaderMode: ext_proc.ProcessingMode_SEND,
},
DynamicMetadata: r.buildDynamicMetadata(classification),
}, nil
}
3. Response Processing​
func (r *OpenAIRouter) handleResponseBody(
ctx *RequestContext,
responseBody *ext_proc.ProcessingRequest_ResponseBody,
) (*ext_proc.ProcessingResponse, error) {
// Parse model response
var modelResponse OpenAIResponse
if err := json.Unmarshal(responseBody.ResponseBody.Body, &modelResponse); err != nil {
return nil, err
}
// Store in semantic cache for future requests
if ctx.UserQuery != "" {
r.Cache.Store(ctx.UserQuery, modelResponse, ctx.SelectedModel)
}
// Record response metrics
r.recordResponseMetrics(ctx, modelResponse)
// Add routing metadata to response
modifiedResponse := r.addRoutingMetadata(modelResponse, ctx)
modifiedBody, _ := json.Marshal(modifiedResponse)
return &ext_proc.ProcessingResponse{
Response: &ext_proc.ProcessingResponse_ResponseBody_{
ResponseBody: &ext_proc.ProcessingResponse_ResponseBody{
Response: &ext_proc.BodyResponse{
BodyMutation: &ext_proc.BodyMutation{
Mutation: &ext_proc.BodyMutation_Body{
Body: modifiedBody,
},
},
},
},
},
}, nil
}
Envoy Configuration for ExtProc​
Complete Configuration Example​
# config/envoy.yaml
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 8801
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
# Comprehensive access logging
access_log:
- name: envoy.access_loggers.stdout
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
log_format:
json_format:
time: "%START_TIME%"
method: "%REQ(:METHOD)%"
path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%"
response_code: "%RESPONSE_CODE%"
duration: "%DURATION%"
selected_model: "%REQ(X-SELECTED-MODEL)%"
selected_endpoint: "%REQ(X-GATEWAY-DESTINATION-ENDPOINT)%"
routing_confidence: "%REQ(X-ROUTING-CONFIDENCE)%"
# Route configuration with dynamic routing
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
# Dynamic routing based on ExtProc decisions
- match:
prefix: "/"
headers:
- name: "x-semantic-destination-endpoint"
string_match:
exact: "endpoint1"
route:
cluster: math_model_cluster
timeout: 300s
- match:
prefix: "/"
headers:
- name: "x-semantic-destination-endpoint"
string_match:
exact: "endpoint2"
route:
cluster: creative_model_cluster
timeout: 300s
- match:
prefix: "/"
headers:
- name: "x-semantic-destination-endpoint"
string_match:
exact: "endpoint3"
route:
cluster: code_model_cluster
timeout: 300s
# Fallback route
- match:
prefix: "/"
route:
cluster: general_model_cluster
timeout: 300s
# HTTP filters chain
http_filters:
# ExtProc filter - MUST come before router filter
- name: envoy.filters.http.ext_proc
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor
# gRPC service configuration
grpc_service:
envoy_grpc:
cluster_name: semantic_router_extproc
timeout: 30s
# Processing mode configuration
processing_mode:
request_header_mode: "SEND"
response_header_mode: "SEND"
request_body_mode: "BUFFERED" # Required for content analysis
response_body_mode: "BUFFERED" # Required for caching
request_trailer_mode: "SKIP"
response_trailer_mode: "SKIP"
# Failure handling
failure_mode_allow: true # Continue on ExtProc failure
allow_mode_override: true # Allow ExtProc to change modes
message_timeout: 300s # Timeout for ExtProc responses
max_message_timeout: 600s # Maximum allowed timeout
# Advanced configuration
mutation_rules:
allow_all_routing: true
allow_envoy: true
disallow_system: false
disallow_x_forwarded: false
# Stats configuration
stats_config:
stats_matches:
- name: "extproc_requests"
actions:
- name: "extproc_requests_total"
action:
"@type": type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault
# Router filter - MUST come after ExtProc
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
suppress_envoy_headers: true
# Backend model clusters
clusters:
# ExtProc service cluster
- name: semantic_router_extproc
connect_timeout: 5s
type: STATIC
lb_policy: ROUND_ROBIN
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options:
# Optimize for ExtProc communication
connection_keepalive:
interval: 30s
timeout: 5s
max_concurrent_streams: 1000
load_assignment:
cluster_name: semantic_router_extproc
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 50051
# Health checking for ExtProc
health_checks:
- timeout: 5s
interval: 10s
unhealthy_threshold: 3
healthy_threshold: 2
grpc_health_check:
service_name: "semantic-router"
# Model endpoint clusters
- name: math_model_cluster
connect_timeout: 30s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: math_model_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 11434
# Health checks for model endpoints
health_checks:
- timeout: 10s
interval: 15s
unhealthy_threshold: 3
healthy_threshold: 2
http_health_check:
path: "/health"
expected_statuses:
- start: 200
end: 299
- name: creative_model_cluster
# Similar configuration for creative model...
- name: code_model_cluster
# Similar configuration for code model...
- name: general_model_cluster
# Similar configuration for general model...
Performance Optimization​
Reducing ExtProc Latency​
1. Connection Pooling and Keepalives​
# Optimize gRPC connection to ExtProc
grpc_service:
envoy_grpc:
cluster_name: semantic_router_extproc
timeout: 10s # Reduced from default 30s
# Cluster optimization
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
explicit_http_config:
http2_protocol_options:
connection_keepalive:
interval: 30s # Keep connections alive
timeout: 5s
max_concurrent_streams: 1000 # Allow many parallel requests
2. Selective Processing​
# Only process what's necessary
processing_mode:
request_header_mode: "SEND" # Always needed for routing
response_header_mode: "SKIP" # Skip if not needed
request_body_mode: "BUFFERED" # Required for classification
response_body_mode: "BUFFERED" # Only if caching enabled
3. Failure Mode Configuration​
# Fast failure handling
failure_mode_allow: true # Don't block traffic on ExtProc failure
message_timeout: 30s # Reasonable timeout
max_message_timeout: 60s # Emergency timeout
Memory Management​
Request Context Pooling​
// Pool request contexts to reduce GC pressure
var requestContextPool = sync.Pool{
New: func() interface{} {
return &RequestContext{
Headers: make(map[string]string, 10),
}
},
}
func (r *OpenAIRouter) Process(stream ext_proc.ExternalProcessor_ProcessServer) error {
// Get context from pool
ctx := requestContextPool.Get().(*RequestContext)
defer func() {
// Clean and return to pool
ctx.Reset()
requestContextPool.Put(ctx)
}()
// Process request...
}
Error Handling and Resilience​
ExtProc Error Handling​
func (r *OpenAIRouter) handleStreamError(err error) error {
if err == io.EOF {
log.Println("Stream ended gracefully")
return nil
}
// Handle gRPC status-based errors
if s, ok := status.FromError(err); ok {
switch s.Code() {
case codes.Canceled, codes.DeadlineExceeded:
log.Println("Stream canceled gracefully")
return nil
case codes.Unavailable:
log.Printf("ExtProc temporarily unavailable: %v", err)
return err
default:
log.Printf("gRPC error: %v", err)
return err
}
}
// Handle context cancellation
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Println("Stream canceled gracefully")
return nil
}
log.Printf("Unexpected error receiving request: %v", err)
return err
}
Graceful Degradation​
func (r *OpenAIRouter) handleClassificationFailure(
query string,
err error,
) *RoutingDecision {
log.Printf("Classification failed: %v, using fallback", err)
// Increment failure metrics
classificationFailures.Inc()
// Return safe fallback decision
return &RoutingDecision{
Category: "general",
Confidence: 0.0,
SelectedModel: r.Config.DefaultModel,
Fallback: true,
FailureReason: err.Error(),
}
}
Monitoring ExtProc Integration​
Key Metrics to Track​
// ExtProc-specific metrics
var (
extprocRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "extproc_requests_total",
Help: "Total ExtProc requests by type",
},
[]string{"request_type", "status"},
)
extprocProcessingDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "extproc_processing_duration_seconds",
Help: "Time spent processing ExtProc requests",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0},
},
[]string{"request_type"},
)
extprocStreamErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "extproc_stream_errors_total",
Help: "Total ExtProc stream errors",
},
[]string{"error_type"},
)
)
Health Check Implementation​
// Health check for ExtProc service
func (s *Server) Check(
ctx context.Context,
req *grpc_health_v1.HealthCheckRequest,
) (*grpc_health_v1.HealthCheckResponse, error) {
// Check classifier health
if !s.router.Classifier.IsHealthy() {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
}, nil
}
// Check cache health
if !s.router.Cache.IsHealthy() {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
}, nil
}
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
The ExtProc integration provides a powerful foundation for implementing complex routing logic while maintaining high performance and reliability. Next, explore the Router Implementation to understand the detailed classification and routing algorithms.