Writing an AI Policy for the Self-Hosted Gateway¶
AI policies allow you to inspect, control, and modify traffic going to and from Large Language Models (LLMs), such as OpenAI, Anthropic, or other providers.
AI policies use the same Policy interface as standard gateway policies. For full API details, see the SDK Documentation.
The key difference is how you handle LLM request and response bodies, especially:
- JSON responses
- Streaming responses (SSE)
Key Idea¶
LLM responses come in two formats:
| Mode | Format |
|---|---|
| Non-streaming | Single JSON object |
| Streaming | SSE events (data: {...}) |
Your policy must be implemented to handle both formats.
Which Interfaces to Implement¶
Choose based on what your policy needs to do:
| Goal | Interface | Mode Setting |
|---|---|---|
| Inspect prompt / model | RequestPolicy |
RequestBodyMode: BodyModeBuffer |
| Inspect headers (auth, routing) | RequestHeaderPolicy |
RequestHeaderMode: HeaderModeProcess |
| Inspect or modify buffered (in-memory) response | ResponsePolicy |
ResponseBodyMode: BodyModeBuffer |
| Inspect or modify streaming response | StreamingResponsePolicy (embeds ResponsePolicy) |
ResponseBodyMode: BodyModeStream |
Recommended Pattern¶
For most AI policies, implement both:
ResponsePolicy- Handles buffered responses where the entire response is available at once. This could be a non-streaming JSON response or concatenated SSE events.StreamingResponsePolicy- Handles streaming responses, which could be JSON or SSE events.
Tip
The gateway automatically chooses which handler to call. OnResponseBodyChunk is invoked only if the entire policy chain is streaming-compatible. If any policy in the chain does not support streaming, OnResponseBody is used as a fallback.
Minimal AI Policy Example¶
package myaipolicy
import (
"context"
policy "github.com/wso2/api-platform/sdk/core/policy/v1alpha2"
)
type MyAIPolicy struct{}
func (p *MyAIPolicy) Mode() policy.ProcessingMode {
return policy.ProcessingMode{
RequestBodyMode: policy.BodyModeBuffer,
ResponseBodyMode: policy.BodyModeStream,
}
}
// Request Inspection
func (p *MyAIPolicy) OnRequestBody(
ctx context.Context,
reqCtx *policy.RequestContext,
params map[string]interface{},
) policy.RequestAction {
// Inspect model + messages
return nil
}
// Streaming Response Handling
func (p *MyAIPolicy) OnResponseBodyChunk(
ctx context.Context,
respCtx *policy.ResponseStreamContext,
chunk *policy.StreamBody,
params map[string]interface{},
) policy.StreamingResponseAction {
// Accumulate + process
return policy.ForwardResponseChunk{}
}
// Gating response chunks before processing response
func (p *MyAIPolicy) NeedsMoreResponseData(_ []byte) bool {
return false
}
// Buffered Fallback
func (p *MyAIPolicy) OnResponseBody(
ctx context.Context,
respCtx *policy.ResponseContext,
params map[string]interface{},
) policy.ResponseAction {
// Same logic as streaming, applied to the full body
return nil
}
Best Practices¶
- Always handle both streaming and non-streaming - The gateway may fall back to buffered mode if any policy in the chain does not support streaming.
- Use Metadata to share state - Pass data between request and response phases using the
Metadatamap. - Implement streaming + fallback for compatibility - Ensure your policy works correctly regardless of whether the chain runs in streaming or buffered mode.