This reference documents high-performance WebSocket patterns from the strfry Nostr relay implementation in C++.
strfry uses 6 specialized thread pools for different operations:
┌─────────────────────────────────────────────────────────────┐
│ Main Thread (I/O) │
│ - epoll event loop │
│ - WebSocket message reception │
│ - Connection management │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌────▼────┐ ┌───▼────┐ ┌───▼────┐
│Ingester │ │ReqWorker│ │Negentropy│
│ (3) │ │ (3) │ │ (2) │
└─────────┘ └─────────┘ └─────────┘
│ │ │
┌────▼────┐ ┌───▼────┐
│ Writer │ │ReqMonitor│
│ (1) │ │ (3) │
└─────────┘ └─────────┘
Thread Pool Responsibilities:
Deterministic thread assignment:
int threadId = connId % numThreads;
Benefits:
struct ConnectionState {
uint64_t connId; // Unique connection identifier
std::string remoteAddr; // Client IP address
// Subscription state
flat_str subId; // Current subscription ID
std::shared_ptr<Subscription> sub; // Subscription filter
uint64_t latestEventSent = 0; // Latest event ID sent
// Compression state (per-message deflate)
PerMessageDeflate pmd;
// Parsing state (reused buffer)
std::string parseBuffer;
// Signature verification context (reused)
secp256k1_context *secpCtx;
};
Key design decisions:
// Pseudocode representation of strfry's I/O loop
uWS::App app;
app.ws<ConnectionState>("/*", {
.compression = uWS::SHARED_COMPRESSOR,
.maxPayloadLength = 16 * 1024 * 1024,
.idleTimeout = 120,
.maxBackpressure = 1 * 1024 * 1024,
.upgrade = nullptr,
.open = [](auto *ws) {
auto *state = ws->getUserData();
state->connId = nextConnId++;
state->remoteAddr = getRemoteAddress(ws);
state->secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
LI << "New connection: " << state->connId << " from " << state->remoteAddr;
},
.message = [](auto *ws, std::string_view message, uWS::OpCode opCode) {
auto *state = ws->getUserData();
// Reuse parseBuffer to avoid allocation
state->parseBuffer.assign(message.data(), message.size());
try {
// Parse JSON (nlohmann::json)
auto json = nlohmann::json::parse(state->parseBuffer);
// Extract command type
auto cmdStr = json[0].get<std::string>();
if (cmdStr == "EVENT") {
handleEventMessage(ws, std::move(json));
}
else if (cmdStr == "REQ") {
handleReqMessage(ws, std::move(json));
}
else if (cmdStr == "CLOSE") {
handleCloseMessage(ws, std::move(json));
}
else if (cmdStr == "NEG-OPEN") {
handleNegentropyOpen(ws, std::move(json));
}
else {
sendNotice(ws, "unknown command: " + cmdStr);
}
}
catch (std::exception &e) {
sendNotice(ws, "Error: " + std::string(e.what()));
}
},
.close = [](auto *ws, int code, std::string_view message) {
auto *state = ws->getUserData();
LI << "Connection closed: " << state->connId
<< " code=" << code
<< " msg=" << std::string(message);
// Cleanup
secp256k1_context_destroy(state->secpCtx);
cleanupSubscription(state->connId);
},
});
app.listen(8080, [](auto *token) {
if (token) {
LI << "Listening on port 8080";
}
});
app.run();
Key patterns:
state->parseBuffer avoids allocation per messagestd::move(json) transfers ownership to handlervoid handleEventMessage(auto *ws, nlohmann::json &&json) {
auto *state = ws->getUserData();
// Pack message with connection ID
auto msg = MsgIngester{
.connId = state->connId,
.payload = std::move(json),
};
// Dispatch to Ingester thread pool (deterministic assignment)
tpIngester->dispatchToThread(state->connId, std::move(msg));
}
void handleReqMessage(auto *ws, nlohmann::json &&json) {
auto *state = ws->getUserData();
// Pack message
auto msg = MsgReq{
.connId = state->connId,
.payload = std::move(json),
};
// Dispatch to ReqWorker thread pool
tpReqWorker->dispatchToThread(state->connId, std::move(msg));
}
Message passing pattern:
// ThreadPool::dispatchToThread
void dispatchToThread(uint64_t connId, Message &&msg) {
size_t threadId = connId % threads.size();
threads[threadId]->queue.push(std::move(msg));
}
Benefits:
std::move transfers ownership without copyingvoid IngesterThread::run() {
while (running) {
Message msg;
if (!queue.pop(msg, 100ms)) continue;
// Extract event from JSON
auto event = parseEvent(msg.payload);
// Validate event ID
if (!validateEventId(event)) {
sendOK(msg.connId, event.id, false, "invalid: id mismatch");
continue;
}
// Verify signature (using thread-local secp256k1 context)
if (!verifySignature(event, secpCtx)) {
sendOK(msg.connId, event.id, false, "invalid: signature verification failed");
continue;
}
// Check for duplicate (bloom filter + database)
if (isDuplicate(event.id)) {
sendOK(msg.connId, event.id, true, "duplicate: already have this event");
continue;
}
// Send to Writer thread
auto writerMsg = MsgWriter{
.connId = msg.connId,
.event = std::move(event),
};
tpWriter->dispatch(std::move(writerMsg));
}
}
Validation sequence:
void WriterThread::run() {
// Single thread for all database writes
while (running) {
Message msg;
if (!queue.pop(msg, 100ms)) continue;
// Write to database
bool success = db.insertEvent(msg.event);
// Send OK to client
sendOK(msg.connId, msg.event.id, success,
success ? "" : "error: failed to store");
if (success) {
// Broadcast to subscribers
broadcastEvent(msg.event);
}
}
}
Single-writer pattern:
void broadcastEvent(const Event &event) {
// Serialize event JSON once
std::string eventJson = serializeEvent(event);
// Iterate all active subscriptions
for (auto &[connId, sub] : activeSubscriptions) {
// Check if filter matches
if (!sub->filter.matches(event)) continue;
// Check if event newer than last sent
if (event.id <= sub->latestEventSent) continue;
// Send to connection
auto msg = MsgWebSocket{
.connId = connId,
.payload = eventJson, // Reuse serialized JSON
};
tpWebSocket->dispatch(std::move(msg));
// Update latest sent
sub->latestEventSent = event.id;
}
}
Critical optimization: Serialize event JSON once, send to N subscribers
Performance impact: For 1000 subscribers, reduces:
void ReqWorkerThread::run() {
while (running) {
MsgReq msg;
if (!queue.pop(msg, 100ms)) continue;
// Parse REQ message: ["REQ", subId, filter1, filter2, ...]
std::string subId = msg.payload[1];
// Create subscription object
auto sub = std::make_shared<Subscription>();
sub->subId = subId;
// Parse filters
for (size_t i = 2; i < msg.payload.size(); i++) {
Filter filter = parseFilter(msg.payload[i]);
sub->filters.push_back(filter);
}
// Store subscription
activeSubscriptions[msg.connId] = sub;
// Query stored events
std::vector<Event> events = db.queryEvents(sub->filters);
// Send matching events
for (const auto &event : events) {
sendEvent(msg.connId, subId, event);
}
// Send EOSE
sendEOSE(msg.connId, subId);
// Notify ReqMonitor to watch for real-time events
auto monitorMsg = MsgReqMonitor{
.connId = msg.connId,
.subId = subId,
};
tpReqMonitor->dispatchToThread(msg.connId, std::move(monitorMsg));
}
}
Query optimization:
std::vector<Event> Database::queryEvents(const std::vector<Filter> &filters) {
// Combine filters with OR logic
std::string sql = "SELECT * FROM events WHERE ";
for (size_t i = 0; i < filters.size(); i++) {
if (i > 0) sql += " OR ";
sql += buildFilterSQL(filters[i]);
}
sql += " ORDER BY created_at DESC LIMIT 1000";
return executeQuery(sql);
}
Filter SQL generation:
std::string buildFilterSQL(const Filter &filter) {
std::vector<std::string> conditions;
// Event IDs
if (!filter.ids.empty()) {
conditions.push_back("id IN (" + joinQuoted(filter.ids) + ")");
}
// Authors
if (!filter.authors.empty()) {
conditions.push_back("pubkey IN (" + joinQuoted(filter.authors) + ")");
}
// Kinds
if (!filter.kinds.empty()) {
conditions.push_back("kind IN (" + join(filter.kinds) + ")");
}
// Time range
if (filter.since) {
conditions.push_back("created_at >= " + std::to_string(*filter.since));
}
if (filter.until) {
conditions.push_back("created_at <= " + std::to_string(*filter.until));
}
// Tags (requires JOIN with tags table)
if (!filter.tags.empty()) {
for (const auto &[tagName, tagValues] : filter.tags) {
conditions.push_back(
"EXISTS (SELECT 1 FROM tags WHERE tags.event_id = events.id "
"AND tags.name = '" + tagName + "' "
"AND tags.value IN (" + joinQuoted(tagValues) + "))"
);
}
}
return "(" + join(conditions, " AND ") + ")";
}
void ReqMonitorThread::run() {
// Subscribe to event broadcast channel
auto eventSubscription = subscribeToEvents();
while (running) {
Event event;
if (!eventSubscription.receive(event, 100ms)) continue;
// Check all subscriptions assigned to this thread
for (auto &[connId, sub] : mySubscriptions) {
// Only process subscriptions for this thread
if (connId % numThreads != threadId) continue;
// Check if filter matches
bool matches = false;
for (const auto &filter : sub->filters) {
if (filter.matches(event)) {
matches = true;
break;
}
}
if (matches) {
sendEvent(connId, sub->subId, event);
}
}
}
}
Pattern: Monitor thread watches event stream, sends to matching subscriptions
void handleCloseMessage(auto *ws, nlohmann::json &&json) {
auto *state = ws->getUserData();
// Parse CLOSE message: ["CLOSE", subId]
std::string subId = json[1];
// Remove subscription
activeSubscriptions.erase(state->connId);
LI << "Subscription closed: connId=" << state->connId
<< " subId=" << subId;
}
Problem: Serializing same event 1000× for 1000 subscribers is wasteful
Solution: Serialize once, send to all
// BAD: Serialize for each subscriber
for (auto &sub : subscriptions) {
std::string json = serializeEvent(event); // Repeated!
send(sub.connId, json);
}
// GOOD: Serialize once
std::string json = serializeEvent(event);
for (auto &sub : subscriptions) {
send(sub.connId, json); // Reuse!
}
Measurement: For 1000 subscribers, reduces broadcast time from 100ms to 1ms
Problem: Copying large JSON objects is expensive
Solution: Transfer ownership with std::move
// BAD: Copies JSON object
void dispatch(Message msg) {
queue.push(msg); // Copy
}
// GOOD: Moves JSON object
void dispatch(Message &&msg) {
queue.push(std::move(msg)); // Move
}
Benefit: Zero-copy message passing between threads
Problem: Allocating buffer for each message
Solution: Reuse buffer per connection
struct ConnectionState {
std::string parseBuffer; // Reused for all messages
};
void handleMessage(std::string_view msg) {
state->parseBuffer.assign(msg.data(), msg.size());
auto json = nlohmann::json::parse(state->parseBuffer);
// ...
}
Benefit: Eliminates 10,000+ allocations/second per connection
Problem: Virtual function calls for polymorphic messages
Solution: std::variant with std::visit
// BAD: Virtual function (pointer indirection, vtable lookup)
struct Message {
virtual void handle() = 0;
};
// GOOD: std::variant (no indirection, inlined)
using Message = std::variant<
MsgIngester,
MsgReq,
MsgWriter,
MsgWebSocket
>;
void handle(Message &&msg) {
std::visit([](auto &&m) { m.handle(); }, msg);
}
Benefit: Compiler inlines visit, eliminates virtual call overhead
Problem: Database query for every event to check duplicate
Solution: In-memory bloom filter for fast negative
class DuplicateDetector {
BloomFilter bloom; // Fast probabilistic check
bool isDuplicate(const std::string &eventId) {
// Fast negative (definitely not seen)
if (!bloom.contains(eventId)) {
bloom.insert(eventId);
return false;
}
// Possible positive (maybe seen, check database)
if (db.eventExists(eventId)) {
return true;
}
// False positive
bloom.insert(eventId);
return false;
}
};
Benefit: 99% of duplicate checks avoid database query
Problem: Lock contention on message queue
Solution: Batch multiple pushes with single lock
class MessageQueue {
std::mutex mutex;
std::deque<Message> queue;
void pushBatch(std::vector<Message> &messages) {
std::lock_guard lock(mutex);
for (auto &msg : messages) {
queue.push_back(std::move(msg));
}
}
};
Benefit: Reduces lock acquisitions by 10-100×
Problem: WebSocket compression slower than desired
Solution: Train ZSTD dictionary on typical Nostr messages
// Train dictionary on corpus of Nostr events
std::string corpus = collectTypicalEvents();
ZSTD_CDict *dict = ZSTD_createCDict(
corpus.data(), corpus.size(),
compressionLevel
);
// Use dictionary for compression
size_t compressedSize = ZSTD_compress_usingCDict(
cctx, dst, dstSize,
src, srcSize, dict
);
Benefit: 10-20% better compression ratio, 2× faster decompression
Problem: Unnecessary string copies when parsing
Solution: Use std::string_view for zero-copy
// BAD: Copies substring
std::string extractCommand(const std::string &msg) {
return msg.substr(0, 5); // Copy
}
// GOOD: View into original string
std::string_view extractCommand(std::string_view msg) {
return msg.substr(0, 5); // No copy
}
Benefit: Eliminates allocations during parsing
struct PerMessageDeflate {
z_stream deflate_stream;
z_stream inflate_stream;
// Sliding window for compression history
static constexpr int WINDOW_BITS = 15;
static constexpr int MEM_LEVEL = 8;
void init() {
// Initialize deflate (compression)
deflate_stream.zalloc = Z_NULL;
deflate_stream.zfree = Z_NULL;
deflate_stream.opaque = Z_NULL;
deflateInit2(&deflate_stream,
Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
-WINDOW_BITS, // Negative = no zlib header
MEM_LEVEL,
Z_DEFAULT_STRATEGY);
// Initialize inflate (decompression)
inflate_stream.zalloc = Z_NULL;
inflate_stream.zfree = Z_NULL;
inflate_stream.opaque = Z_NULL;
inflateInit2(&inflate_stream, -WINDOW_BITS);
}
std::string compress(std::string_view data) {
// Compress with sliding window
deflate_stream.next_in = (Bytef*)data.data();
deflate_stream.avail_in = data.size();
std::string compressed;
compressed.resize(deflateBound(&deflate_stream, data.size()));
deflate_stream.next_out = (Bytef*)compressed.data();
deflate_stream.avail_out = compressed.size();
deflate(&deflate_stream, Z_SYNC_FLUSH);
compressed.resize(compressed.size() - deflate_stream.avail_out);
return compressed;
}
};
Typical compression ratios:
strfry uses LMDB (Lightning Memory-Mapped Database) for event storage:
// Key-value stores
struct EventDB {
// Primary event storage (key: event ID, value: event data)
lmdb::dbi eventsDB;
// Index by pubkey (key: pubkey + created_at, value: event ID)
lmdb::dbi pubkeyDB;
// Index by kind (key: kind + created_at, value: event ID)
lmdb::dbi kindDB;
// Index by tags (key: tag_name + tag_value + created_at, value: event ID)
lmdb::dbi tagsDB;
// Deletion index (key: event ID, value: deletion event ID)
lmdb::dbi deletionsDB;
};
Why LMDB?
struct RelayStats {
std::atomic<uint64_t> totalConnections{0};
std::atomic<uint64_t> activeConnections{0};
std::atomic<uint64_t> eventsReceived{0};
std::atomic<uint64_t> eventsSent{0};
std::atomic<uint64_t> bytesReceived{0};
std::atomic<uint64_t> bytesSent{0};
void recordConnection() {
totalConnections.fetch_add(1, std::memory_order_relaxed);
activeConnections.fetch_add(1, std::memory_order_relaxed);
}
void recordDisconnection() {
activeConnections.fetch_sub(1, std::memory_order_relaxed);
}
void recordEventReceived(size_t bytes) {
eventsReceived.fetch_add(1, std::memory_order_relaxed);
bytesReceived.fetch_add(bytes, std::memory_order_relaxed);
}
};
Atomic operations: Lock-free updates from multiple threads
struct PerformanceMetrics {
// Latency histograms
Histogram eventIngestionLatency;
Histogram subscriptionQueryLatency;
Histogram eventBroadcastLatency;
// Thread pool queue depths
std::atomic<size_t> ingesterQueueDepth{0};
std::atomic<size_t> writerQueueDepth{0};
std::atomic<size_t> reqWorkerQueueDepth{0};
void recordIngestion(std::chrono::microseconds duration) {
eventIngestionLatency.record(duration.count());
}
};
[relay]
bind = 0.0.0.0
port = 8080
maxConnections = 10000
maxMessageSize = 16777216 # 16 MB
[ingester]
threads = 3
queueSize = 10000
[writer]
threads = 1
queueSize = 1000
batchSize = 100
[reqWorker]
threads = 3
queueSize = 10000
[db]
path = /var/lib/strfry/events.lmdb
maxSizeGB = 100
# Increase file descriptor limit
ulimit -n 65536
# Increase maximum socket connections
sysctl -w net.core.somaxconn=4096
# TCP tuning
sysctl -w net.ipv4.tcp_fin_timeout=15
sysctl -w net.ipv4.tcp_tw_reuse=1
Per connection:
Total: ~433 KB per connection
For 10,000 connections: ~4.3 GB
Single-core can handle:
Recommended:
Key architectural patterns:
Performance characteristics:
When to use strfry patterns:
Trade-offs:
Further reading: