This document explains how ORLY handles concurrent writes to WebSocket connections safely and efficiently.
ORLY uses a single-writer pattern with channel-based coordination to multiplex writes from multiple goroutines to each WebSocket connection. This prevents concurrent write panics and ensures message ordering.
Each WebSocket connection has exactly ONE dedicated writer goroutine, but MANY producer goroutines can safely queue messages through a buffered channel. This is the standard Go solution for the "multiple producers, single consumer" concurrency pattern.
The gorilla/websocket library (and WebSockets in general) don't allow concurrent writes - attempting to write from multiple goroutines causes panics. ORLY's channel-based approach elegantly serializes all writes while maintaining high throughput.
Each Listener (WebSocket connection) has a dedicated write channel defined in `app/listener.go:35`:
type Listener struct {
writeChan chan publish.WriteRequest // Buffered channel (capacity: 100)
writeDone chan struct{} // Signals writer exit
// ... other fields
}
Created during connection setup in `app/handle-websocket.go:94`:
listener := &Listener{
writeChan: make(chan publish.WriteRequest, 100),
writeDone: make(chan struct{}),
// ...
}
The writeWorker() defined in `app/listener.go:133-201` is the ONLY goroutine allowed to call conn.WriteMessage():
func (l *Listener) writeWorker() {
defer close(l.writeDone)
for {
select {
case <-l.ctx.Done():
return
case req, ok := <-l.writeChan:
if !ok {
return // Channel closed
}
if req.IsPing {
// Send ping control frame
l.conn.WriteControl(websocket.PingMessage, nil, deadline)
} else if req.IsControl {
// Send control message
l.conn.WriteControl(req.MsgType, req.Data, req.Deadline)
} else {
// Send regular message
l.conn.SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
l.conn.WriteMessage(req.MsgType, req.Data)
}
}
}
}
Started once per connection in `app/handle-websocket.go:102`:
go listener.writeWorker()
All write operations are wrapped in a WriteRequest defined in `pkg/protocol/publish/publisher.go:13-19`:
type WriteRequest struct {
Data []byte
MsgType int // websocket.TextMessage, PingMessage, etc.
IsControl bool // Control frame?
Deadline time.Time // For control messages
IsPing bool // Special ping handling
}
Several goroutines send write requests to the channel:
Used by protocol handlers (EVENT, REQ, COUNT, etc.) in `app/listener.go:88-108`:
func (l *Listener) Write(p []byte) (n int, err error) {
select {
case l.writeChan <- publish.WriteRequest{
Data: p,
MsgType: websocket.TextMessage,
}:
return len(p), nil
case <-time.After(DefaultWriteTimeout):
return 0, errorf.E("write channel timeout")
}
}
Each active subscription runs a goroutine that receives events from the publisher and forwards them in `app/handle-req.go:696-736`:
// Subscription goroutine (one per REQ)
go func() {
for {
select {
case ev := <-evC: // Receive from publisher
res := eventenvelope.NewFrom(subID, ev)
if err = res.Write(l); err != nil { // ← Sends to writeChan
log.E.F("failed to write event")
}
}
}
}()
Sends periodic pings in `app/handle-websocket.go:252-283`:
func (s *Server) Pinger(ctx context.Context, listener *Listener, ticker *time.Ticker) {
for {
select {
case <-ticker.C:
// Send ping with special flag
listener.writeChan <- publish.WriteRequest{
IsPing: true,
MsgType: pingCount,
}
}
}
}
┌─────────────────────────────────────────────────────────────┐
│ WebSocket Connection │
└─────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────┐
│ Listener (per conn) │
│ writeChan: chan WriteRequest (100) │
└────────────────────────────────────────┘
▲ ▲ ▲ ▲
│ │ │ │
┌─────────────┼───┼───┼───┼─────────────┐
│ PRODUCERS (Multiple Goroutines) │
├─────────────────────────────────────────┤
│ 1. Handler goroutine │
│ └─> Write(okMsg) ───────────────┐ │
│ │ │
│ 2. Subscription goroutine (REQ1) │ │
│ └─> Write(event1) ──────────────┼──┐ │
│ │ │ │
│ 3. Subscription goroutine (REQ2) │ │ │
│ └─> Write(event2) ──────────────┼──┼─┤
│ │ │ │
│ 4. Pinger goroutine │ │ │
│ └─> writeChan <- PING ──────────┼──┼─┼┐
└─────────────────────────────────────┼──┼─┼┤
▼ ▼ ▼▼
┌──────────────────────────────┐
│ writeChan (buffered) │
│ [req1][req2][ping][req3] │
└──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ CONSUMER (Single Writer Goroutine) │
├─────────────────────────────────────────────┤
│ writeWorker() ─── ONLY goroutine allowed │
│ to call WriteMessage() │
└─────────────────────────────────────────────┘
│
▼
conn.WriteMessage(msgType, data)
│
▼
┌─────────────────┐
│ Client Browser │
└─────────────────┘
The publisher system also uses the write channel map defined in `app/publisher.go:25-26`:
type WriteChanMap map[*websocket.Conn]chan publish.WriteRequest
type P struct {
WriteChans WriteChanMap // Maps conn → write channel
// ...
}
When an event is published (see `app/publisher.go:153-268`):
Write(l) which enqueues to writeChanORLY uses TWO channel layers:
This separation provides:
This architecture matches patterns used in production relays like khatru and enables ORLY to handle thousands of concurrent subscriptions efficiently.
Multiple goroutines can safely queue messages without any mutexes - the channel provides synchronization.
Writes use a timeout (see `app/listener.go:104`):
case <-time.After(DefaultWriteTimeout):
return 0, errorf.E("write channel timeout")
If the channel is full (100 messages buffered), writes timeout rather than blocking indefinitely.
Connection cleanup in `app/handle-websocket.go:184-187`:
// Close write channel to signal worker to exit
close(listener.writeChan)
// Wait for write worker to finish
<-listener.writeDone
Ensures all queued messages are sent before closing the connection.
Pings use a special IsPing flag so the write worker can prioritize them during heavy traffic, preventing timeout disconnections.
Defined in `app/handle-websocket.go:19-28`:
const (
DefaultWriteWait = 10 * time.Second // Write deadline for normal messages
DefaultPongWait = 60 * time.Second // Time to wait for pong response
DefaultPingWait = 30 * time.Second // Interval between pings
DefaultWriteTimeout = 3 * time.Second // Timeout for write channel send
DefaultMaxMessageSize = 512000 // Max incoming message size (512KB)
ClientMessageSizeLimit = 100 * 1024 * 1024 // Max client message size (100MB)
)
✅ No concurrent write panics - single writer guarantee ✅ High throughput - buffered channel (100 messages) ✅ Fair ordering - FIFO queue semantics ✅ Simple producer code - just send to channel ✅ Backpressure management - timeout on full queue ✅ Clean shutdown - channel close signals completion ✅ Priority handling - pings can be prioritized
This pattern is the standard Go idiom for serializing operations and is used throughout high-performance network services.