This reference documents production-ready WebSocket patterns from the khatru Nostr relay implementation in Go.
github.com/fasthttp/websocket// relay.go, lines 54-119
type Relay struct {
// Service configuration
ServiceURL string
upgrader websocket.Upgrader
// WebSocket lifecycle hooks
RejectConnection []func(r *http.Request) bool
OnConnect []func(ctx context.Context)
OnDisconnect []func(ctx context.Context)
// Event processing hooks
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
StoreEvent []func(ctx context.Context, event *nostr.Event) error
ReplaceEvent []func(ctx context.Context, event *nostr.Event) error
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
OnEventSaved []func(ctx context.Context, event *nostr.Event)
OnEphemeralEvent []func(ctx context.Context, event *nostr.Event)
// Filter/query hooks
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
CountEventsHLL []func(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error)
// Broadcast control
PreventBroadcast []func(ws *WebSocket, event *nostr.Event) bool
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
// Client tracking
clients map[*WebSocket][]listenerSpec
listeners []listener
clientsMutex sync.Mutex
// WebSocket parameters
WriteWait time.Duration // Default: 10 seconds
PongWait time.Duration // Default: 60 seconds
PingPeriod time.Duration // Default: 30 seconds
MaxMessageSize int64 // Default: 512000 bytes
// Router support (for multi-relay setups)
routes []Route
getSubRelayFromEvent func(*nostr.Event) *Relay
getSubRelayFromFilter func(nostr.Filter) *Relay
// Protocol extensions
Negentropy bool // NIP-77 support
}
// relay.go, lines 31-35
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
},
Key configuration choices:
EnableCompression: trueRecommended production settings:
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
EnableCompression: true, // 60-80% bandwidth reduction
CheckOrigin: func(r *http.Request) bool {
// For public relays: return true
// For private relays: validate origin
origin := r.Header.Get("Origin")
return isAllowedOrigin(origin)
},
},
// websocket.go, lines 12-32
type WebSocket struct {
conn *websocket.Conn
mutex sync.Mutex // Protects all write operations
// Original HTTP request (for IP, headers, etc.)
Request *http.Request
// Connection lifecycle context
Context context.Context
cancel context.CancelFunc
// NIP-42 authentication
Challenge string // Random 8-byte hex string
AuthedPublicKey string // Authenticated pubkey after AUTH
Authed chan struct{} // Closed when authenticated
authLock sync.Mutex
// NIP-77 negentropy sessions (for efficient set reconciliation)
negentropySessions *xsync.MapOf[string, *NegentropySession]
}
Design decisions:
xsync.MapOf provides concurrent access without locks// websocket.go, lines 34-46
func (ws *WebSocket) WriteJSON(any any) error {
ws.mutex.Lock()
err := ws.conn.WriteJSON(any)
ws.mutex.Unlock()
return err
}
func (ws *WebSocket) WriteMessage(t int, b []byte) error {
ws.mutex.Lock()
err := ws.conn.WriteMessage(t, b)
ws.mutex.Unlock()
return err
}
Critical pattern: ALL writes to WebSocket MUST be protected by mutex
Common mistake:
// DON'T DO THIS - Race condition!
go func() {
ws.conn.WriteJSON(msg1) // Not protected
}()
go func() {
ws.conn.WriteJSON(msg2) // Not protected
}()
Correct approach:
// DO THIS - Protected writes
go func() {
ws.WriteJSON(msg1) // Uses mutex
}()
go func() {
ws.WriteJSON(msg2) // Uses mutex
}()
// handlers.go, lines 29-52
func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// CORS middleware for non-WebSocket requests
corsMiddleware := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{
http.MethodHead,
http.MethodGet,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
},
AllowedHeaders: []string{"Authorization", "*"},
MaxAge: 86400,
})
// Route based on request type
if r.Header.Get("Upgrade") == "websocket" {
rl.HandleWebsocket(w, r) // WebSocket connection
} else if r.Header.Get("Accept") == "application/nostr+json" {
corsMiddleware.Handler(http.HandlerFunc(rl.HandleNIP11)).ServeHTTP(w, r) // NIP-11 metadata
} else if r.Header.Get("Content-Type") == "application/nostr+json+rpc" {
corsMiddleware.Handler(http.HandlerFunc(rl.HandleNIP86)).ServeHTTP(w, r) // NIP-86 management
} else {
corsMiddleware.Handler(rl.serveMux).ServeHTTP(w, r) // Other routes
}
}
Pattern: Single HTTP handler multiplexes all request types by headers
// handlers.go, lines 55-105
func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
// Pre-upgrade rejection hooks
for _, reject := range rl.RejectConnection {
if reject(r) {
w.WriteHeader(429) // Too Many Requests
return
}
}
// Perform WebSocket upgrade
conn, err := rl.upgrader.Upgrade(w, r, nil)
if err != nil {
rl.Log.Printf("failed to upgrade websocket: %v\n", err)
return
}
// Create ping ticker for keep-alive
ticker := time.NewTicker(rl.PingPeriod)
// Generate NIP-42 authentication challenge
challenge := make([]byte, 8)
rand.Read(challenge)
// Initialize WebSocket wrapper
ws := &WebSocket{
conn: conn,
Request: r,
Challenge: hex.EncodeToString(challenge),
negentropySessions: xsync.NewMapOf[string, *NegentropySession](),
}
ws.Context, ws.cancel = context.WithCancel(context.Background())
// Register client
rl.clientsMutex.Lock()
rl.clients[ws] = make([]listenerSpec, 0, 2)
rl.clientsMutex.Unlock()
// Create connection context with WebSocket reference
ctx, cancel := context.WithCancel(
context.WithValue(context.Background(), wsKey, ws),
)
// Cleanup function for both goroutines
kill := func() {
// Trigger disconnect hooks
for _, ondisconnect := range rl.OnDisconnect {
ondisconnect(ctx)
}
// Stop timers and cancel contexts
ticker.Stop()
cancel()
ws.cancel()
// Close connection
ws.conn.Close()
// Remove from tracking
rl.removeClientAndListeners(ws)
}
// Launch read and write goroutines
go readLoop(ws, ctx, kill)
go writeLoop(ws, ctx, ticker, kill)
}
Key steps:
// handlers.go, lines 107-414
go func() {
defer kill()
// Configure read constraints
ws.conn.SetReadLimit(rl.MaxMessageSize)
ws.conn.SetReadDeadline(time.Now().Add(rl.PongWait))
// Auto-refresh deadline on Pong receipt
ws.conn.SetPongHandler(func(string) error {
ws.conn.SetReadDeadline(time.Now().Add(rl.PongWait))
return nil
})
// Trigger connection hooks
for _, onconnect := range rl.OnConnect {
onconnect(ctx)
}
// Create message parser (sonic parser is stateful)
smp := nostr.NewMessageParser()
for {
// Read message (blocks until data available)
typ, msgb, err := ws.conn.ReadMessage()
if err != nil {
// Check if expected close
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseNormalClosure, // 1000
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
4537, // Custom: client preference
) {
rl.Log.Printf("unexpected close error from %s: %v\n",
GetIPFromRequest(r), err)
}
ws.cancel()
return
}
// Handle Ping manually (library should auto-respond, but...)
if typ == websocket.PingMessage {
ws.WriteMessage(websocket.PongMessage, nil)
continue
}
// Zero-copy conversion to string
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
// Parse message (sequential due to sonic parser constraint)
envelope, err := smp.ParseMessage(message)
// Handle message in separate goroutine (concurrent processing)
go func(message string) {
switch env := envelope.(type) {
case *nostr.EventEnvelope:
handleEvent(ctx, ws, env, rl)
case *nostr.ReqEnvelope:
handleReq(ctx, ws, env, rl)
case *nostr.CloseEnvelope:
handleClose(ctx, ws, env, rl)
case *nostr.CountEnvelope:
handleCount(ctx, ws, env, rl)
case *nostr.AuthEnvelope:
handleAuth(ctx, ws, env, rl)
case *nip77.OpenEnvelope:
handleNegentropyOpen(ctx, ws, env, rl)
case *nip77.MessageEnvelope:
handleNegentropyMsg(ctx, ws, env, rl)
case *nip77.CloseEnvelope:
handleNegentropyClose(ctx, ws, env, rl)
default:
ws.WriteJSON(nostr.NoticeEnvelope("unknown message type"))
}
}(message)
}
}()
Critical patterns:
- Read blocks up to PongWait (60s)
- Pong receipt resets deadline
- No Pong = timeout error = connection dead
`go
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
`
- Avoids allocation when converting []byte to string
- Safe because msgb is newly allocated by ReadMessage()
- smp.ParseMessage() called sequentially (parser has state)
- Message handling dispatched to goroutine (concurrent)
- Balances correctness and performance
`go
go func(message string) {
// Handle message
}(message)
`
- Allows next message to be read immediately
- Prevents slow handler blocking read loop
- Captures message to avoid data race
// handlers.go, lines 416-433
go func() {
defer kill()
for {
select {
case <-ctx.Done():
// Connection closed or context canceled
return
case <-ticker.C:
// Send ping every PingPeriod (30s)
err := ws.WriteMessage(websocket.PingMessage, nil)
if err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
}
return
}
}
}
}()
Purpose:
select to monitor context cancellationTiming relationship:
PingPeriod: 30 seconds (send ping every 30s)
PongWait: 60 seconds (expect pong within 60s)
Rule: PingPeriod < PongWait
If client doesn't respond to 2 consecutive pings,
connection times out after 60 seconds.
kill := func() {
// 1. Trigger disconnect hooks
for _, ondisconnect := range rl.OnDisconnect {
ondisconnect(ctx)
}
// 2. Stop timers
ticker.Stop()
// 3. Cancel contexts
cancel()
ws.cancel()
// 4. Close connection
ws.conn.Close()
// 5. Remove from tracking
rl.removeClientAndListeners(ws)
}
defer kill()
Cleanup order:
Why defer? Ensures cleanup runs even if goroutine panics
// handlers.go, lines 163-258
case *nostr.EventEnvelope:
// Validate event ID (must match hash of content)
if !env.Event.CheckID() {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: id is computed incorrectly",
})
return
}
// Validate signature
if ok, err := env.Event.CheckSignature(); err != nil {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to verify signature",
})
return
} else if !ok {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: signature is invalid",
})
return
}
// Check NIP-70 protected events
if nip70.IsProtected(env.Event) {
authed := GetAuthed(ctx)
if authed == "" {
// Request authentication
RequestAuth(ctx)
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "auth-required: must be published by authenticated event author",
})
return
}
}
// Route to subrelay if using relay routing
srl := rl
if rl.getSubRelayFromEvent != nil {
srl = rl.getSubRelayFromEvent(&env.Event)
}
// Handle event based on kind
var skipBroadcast bool
var writeErr error
if env.Event.Kind == 5 {
// Deletion event
writeErr = srl.handleDeleteRequest(ctx, &env.Event)
} else if nostr.IsEphemeralKind(env.Event.Kind) {
// Ephemeral event (20000-29999)
writeErr = srl.handleEphemeral(ctx, &env.Event)
} else {
// Normal event
skipBroadcast, writeErr = srl.handleNormal(ctx, &env.Event)
}
// Broadcast to subscribers (unless prevented)
if !skipBroadcast {
n := srl.notifyListeners(&env.Event)
// Can update reason with broadcast count
}
// Send OK response
ok := writeErr == nil
reason := ""
if writeErr != nil {
reason = writeErr.Error()
}
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: ok,
Reason: reason,
})
Validation sequence:
// handlers.go, lines 289-324
case *nostr.ReqEnvelope:
// Create WaitGroup for EOSE synchronization
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
// Create cancelable context for subscription
reqCtx, cancelReqCtx := context.WithCancelCause(ctx)
// Expose subscription ID in context
reqCtx = context.WithValue(reqCtx, subscriptionIdKey, env.SubscriptionID)
// Handle each filter
for _, filter := range env.Filters {
// Route to appropriate subrelay
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(filter)
}
// Query stored events
err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
if err != nil {
// Fail entire subscription if any filter rejected
reason := err.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
ws.WriteJSON(nostr.ClosedEnvelope{
SubscriptionID: env.SubscriptionID,
Reason: reason,
})
cancelReqCtx(errors.New("filter rejected"))
return
} else {
// Add listener for real-time events
rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx)
}
}
// Send EOSE when all stored events dispatched
go func() {
eose.Wait()
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
}()
Subscription lifecycle:
WaitGroup pattern:
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
// Each query handler calls eose.Done() when complete
go func() {
eose.Wait() // Wait for all queries
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
}()
// handlers.go, lines 325-327
case *nostr.CloseEnvelope:
id := string(*env)
rl.removeListenerId(ws, id)
Simple unsubscribe: Remove listener by subscription ID
// handlers.go, lines 328-341
case *nostr.AuthEnvelope:
// Compute relay WebSocket URL
wsBaseUrl := strings.Replace(rl.getBaseURL(r), "http", "ws", 1)
// Validate AUTH event
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {
// Store authenticated pubkey
ws.AuthedPublicKey = pubkey
// Close Authed channel (unblocks any waiting goroutines)
ws.authLock.Lock()
if ws.Authed != nil {
close(ws.Authed)
ws.Authed = nil
}
ws.authLock.Unlock()
// Send OK response
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: true})
} else {
// Validation failed
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to authenticate",
})
}
NIP-42 authentication:
ws.AuthedPublicKey// listener.go, lines 13-24
type listenerSpec struct {
id string // Subscription ID from REQ
cancel context.CancelCauseFunc // Cancels this subscription
index int // Position in subrelay.listeners array
subrelay *Relay // Reference to (sub)relay handling this
}
type listener struct {
id string // Subscription ID
filter nostr.Filter // Filter for matching events
ws *WebSocket // WebSocket connection
}
Two-level tracking:
clients map[*WebSocket][]listenerSpec- Tracks what subscriptions each client has - Enables cleanup when client disconnects
listeners []listener- Flat array for fast iteration when broadcasting - No maps, no allocations during broadcast
// listener.go, lines 36-60
func (rl *Relay) addListener(
ws *WebSocket,
id string,
subrelay *Relay,
filter nostr.Filter,
cancel context.CancelCauseFunc,
) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Get position where listener will be added
idx := len(subrelay.listeners)
// Add spec to client's list
rl.clients[ws] = append(specs, listenerSpec{
id: id,
cancel: cancel,
subrelay: subrelay,
index: idx,
})
// Add listener to relay's list
subrelay.listeners = append(subrelay.listeners, listener{
ws: ws,
id: id,
filter: filter,
})
}
}
O(1) append operation
// listener.go, lines 64-99
func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Iterate backwards for safe removal
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
if spec.id == id {
// Cancel subscription context
spec.cancel(ErrSubscriptionClosedByClient)
// Swap-delete from specs array
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
// Remove from listener list in subrelay
srl := spec.subrelay
// If not last element, swap with last
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex]
srl.listeners[spec.index] = moved
// Update moved listener's spec index
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
// Truncate listeners array
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
}
}
}
}
Swap-delete pattern:
Why not just delete?
append(arr[:i], arr[i+1:]...) is O(n) - shifts all elements// listener.go, lines 101-133
func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Remove each subscription
for s, spec := range specs {
srl := spec.subrelay
// Swap-delete from listeners array
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex]
srl.listeners[spec.index] = moved
// Mark current spec as invalid
rl.clients[ws][s].index = -1
// Update moved listener's spec
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
// Truncate listeners array
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
}
}
// Remove client from map
delete(rl.clients, ws)
}
Called when client disconnects: Removes all subscriptions for that client
// listener.go, lines 136-151
func (rl *Relay) notifyListeners(event *nostr.Event) int {
count := 0
listenersloop:
for _, listener := range rl.listeners {
// Check if filter matches event
if listener.filter.Matches(event) {
// Check if broadcast should be prevented (hooks)
for _, pb := range rl.PreventBroadcast {
if pb(listener.ws, event) {
continue listenersloop
}
}
// Send event to subscriber
listener.ws.WriteJSON(nostr.EventEnvelope{
SubscriptionID: &listener.id,
Event: *event,
})
count++
}
}
return count
}
Performance characteristics:
Optimization opportunity: For relays with thousands of subscriptions, consider:
// utils.go
const (
wsKey = iota // WebSocket connection
subscriptionIdKey // Current subscription ID
nip86HeaderAuthKey // NIP-86 authorization header
internalCallKey // Internal call marker
)
Pattern: Use iota for compile-time context key uniqueness
func GetConnection(ctx context.Context) *WebSocket {
wsi := ctx.Value(wsKey)
if wsi != nil {
return wsi.(*WebSocket)
}
return nil
}
Usage: Retrieve WebSocket in hooks and handlers
func GetAuthed(ctx context.Context) string {
// Check WebSocket auth
if conn := GetConnection(ctx); conn != nil {
return conn.AuthedPublicKey
}
// Check NIP-86 header auth
if nip86Auth := ctx.Value(nip86HeaderAuthKey); nip86Auth != nil {
return nip86Auth.(string)
}
return ""
}
Supports two auth mechanisms:
func RequestAuth(ctx context.Context) {
ws := GetConnection(ctx)
ws.authLock.Lock()
if ws.Authed == nil {
ws.Authed = make(chan struct{})
}
ws.authLock.Unlock()
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
}
Sends AUTH challenge to client
func (ws *WebSocket) WaitForAuth(timeout time.Duration) bool {
ws.authLock.Lock()
authChan := ws.Authed
ws.authLock.Unlock()
if authChan == nil {
return true // Already authenticated
}
select {
case <-authChan:
return true // Authenticated
case <-time.After(timeout):
return false // Timeout
}
}
Pattern: Use closed channel as signal
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
When safe:
msgb is newly allocated by ReadMessage()Savings: Avoids 512 KB allocation per message
go func(message string) {
handleMessage(message)
}(message)
Benefits:
Trade-off: Goroutine creation overhead (typically <1μs)
// O(1) deletion
arr[i] = arr[len(arr)-1]
arr = arr[:len(arr)-1]
// vs. O(n) deletion
arr = append(arr[:i], arr[i+1:]...)
When appropriate:
negentropySessions *xsync.MapOf[string, *NegentropySession]
vs. standard map with mutex:
sessions map[string]*NegentropySession
mutex sync.RWMutex
Benefits of xsync.MapOf:
Trade-off: Slightly more memory per entry
func TestWebSocketConnection(t *testing.T) {
relay := khatru.NewRelay()
// Start server
server := httptest.NewServer(relay)
defer server.Close()
// Convert http:// to ws://
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
// Connect client
ws, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("Dial failed: %v", err)
}
defer ws.Close()
// Send REQ
req := `["REQ","test",{"kinds":[1]}]`
if err := ws.WriteMessage(websocket.TextMessage, []byte(req)); err != nil {
t.Fatalf("WriteMessage failed: %v", err)
}
// Read EOSE
_, msg, err := ws.ReadMessage()
if err != nil {
t.Fatalf("ReadMessage failed: %v", err)
}
if !strings.Contains(string(msg), "EOSE") {
t.Errorf("Expected EOSE, got: %s", msg)
}
}
func TestRejectConnection(t *testing.T) {
relay := khatru.NewRelay()
// Add rejection hook
relay.RejectConnection = append(relay.RejectConnection,
func(r *http.Request) bool {
return r.RemoteAddr == "192.0.2.1:12345" // Block specific IP
},
)
server := httptest.NewServer(relay)
defer server.Close()
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
// Should fail to connect
ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err == nil {
ws.Close()
t.Fatal("Expected connection to be rejected")
}
if resp.StatusCode != 429 {
t.Errorf("Expected 429, got %d", resp.StatusCode)
}
}
relay := khatru.NewRelay()
relay.ServiceURL = "wss://relay.example.com"
relay.WriteWait = 10 * time.Second
relay.PongWait = 60 * time.Second
relay.PingPeriod = 30 * time.Second
relay.MaxMessageSize = 512000 // 512 KB
relay.upgrader.EnableCompression = true
relay.upgrader.CheckOrigin = func(r *http.Request) bool {
// For public relays: return true
// For private relays: validate origin
return true
}
import "golang.org/x/time/rate"
type RateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.Mutex
}
func (rl *RateLimiter) getLimiter(ip string) *rate.Limiter {
rl.mu.Lock()
defer rl.mu.Unlock()
limiter, exists := rl.limiters[ip]
if !exists {
limiter = rate.NewLimiter(10, 20) // 10/sec, burst 20
rl.limiters[ip] = limiter
}
return limiter
}
rateLimiter := &RateLimiter{limiters: make(map[string]*rate.Limiter)}
relay.RejectConnection = append(relay.RejectConnection,
func(r *http.Request) bool {
ip := getIP(r)
return !rateLimiter.getLimiter(ip).Allow()
},
)
relay.OnConnect = append(relay.OnConnect,
func(ctx context.Context) {
ws := khatru.GetConnection(ctx)
log.Printf("connection from %s", khatru.GetIP(ctx))
metrics.ActiveConnections.Inc()
},
)
relay.OnDisconnect = append(relay.OnDisconnect,
func(ctx context.Context) {
log.Printf("disconnection from %s", khatru.GetIP(ctx))
metrics.ActiveConnections.Dec()
},
)
server := &http.Server{
Addr: ":8080",
Handler: relay,
}
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
<-sigChan
log.Println("Shutting down...")
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}
Key architectural decisions:
When to use khatru patterns:
Performance characteristics:
Further reading: