session.go raw
1 package nrc
2
3 import (
4 "context"
5 "encoding/json"
6 "sync"
7 "time"
8 )
9
10 const (
11 // DefaultSessionTimeout is the default inactivity timeout for sessions.
12 DefaultSessionTimeout = 30 * time.Minute
13 // DefaultMaxSubscriptions is the default maximum subscriptions per session.
14 DefaultMaxSubscriptions = 100
15 )
16
17 // Session represents an NRC client session through the tunnel.
18 type Session struct {
19 // ID is the unique session identifier.
20 ID string
21 // ClientPubkey is the public key of the connected client.
22 ClientPubkey []byte
23 // ConversationKey is the NIP-44 conversation key for this session.
24 ConversationKey []byte
25 // DeviceName is the optional device identifier.
26 DeviceName string
27 // AuthMode is the authentication mode used.
28 AuthMode AuthMode
29
30 // CreatedAt is when the session was created.
31 CreatedAt time.Time
32 // LastActivity is the timestamp of the last activity.
33 LastActivity time.Time
34
35 // subscriptions maps client subscription IDs to internal subscription state.
36 subscriptions map[string]*Subscription
37 // subMu protects the subscriptions map.
38 subMu sync.RWMutex
39
40 // ctx is the session context.
41 ctx context.Context
42 // cancel cancels the session context.
43 cancel context.CancelFunc
44
45 // eventCh receives events from the local relay for this session.
46 eventCh chan *SessionEvent
47 }
48
49 // Subscription represents a tunneled subscription.
50 type Subscription struct {
51 // ID is the client's subscription ID.
52 ID string
53 // CreatedAt is when the subscription was created.
54 CreatedAt time.Time
55 // EventCount tracks how many events have been sent.
56 EventCount int64
57 // EOSESent indicates whether EOSE has been sent.
58 EOSESent bool
59 }
60
61 // SessionEvent wraps a relay response for delivery to the client.
62 type SessionEvent struct {
63 // Type is the response type (EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH).
64 Type string
65 // Payload is the response payload array.
66 Payload []any
67 // RequestEventID is the ID of the request event this responds to (if applicable).
68 RequestEventID string
69 }
70
71 // NewSession creates a new session.
72 func NewSession(id string, clientPubkey, conversationKey []byte, authMode AuthMode, deviceName string) *Session {
73 ctx, cancel := context.WithCancel(context.Background())
74 now := time.Now()
75 return &Session{
76 ID: id,
77 ClientPubkey: clientPubkey,
78 ConversationKey: conversationKey,
79 DeviceName: deviceName,
80 AuthMode: authMode,
81 CreatedAt: now,
82 LastActivity: now,
83 subscriptions: make(map[string]*Subscription),
84 ctx: ctx,
85 cancel: cancel,
86 eventCh: make(chan *SessionEvent, 100),
87 }
88 }
89
90 // Context returns the session's context.
91 func (s *Session) Context() context.Context {
92 return s.ctx
93 }
94
95 // Close closes the session and cleans up resources.
96 func (s *Session) Close() {
97 s.cancel()
98 close(s.eventCh)
99 }
100
101 // Events returns the channel for receiving events destined for this session.
102 func (s *Session) Events() <-chan *SessionEvent {
103 return s.eventCh
104 }
105
106 // SendEvent sends an event to the session's event channel.
107 func (s *Session) SendEvent(ev *SessionEvent) bool {
108 select {
109 case s.eventCh <- ev:
110 return true
111 case <-s.ctx.Done():
112 return false
113 default:
114 // Channel full, drop event
115 return false
116 }
117 }
118
119 // Touch updates the last activity timestamp.
120 func (s *Session) Touch() {
121 s.LastActivity = time.Now()
122 }
123
124 // IsExpired checks if the session has been inactive too long.
125 func (s *Session) IsExpired(timeout time.Duration) bool {
126 return time.Since(s.LastActivity) > timeout
127 }
128
129 // AddSubscription adds a new subscription to the session.
130 func (s *Session) AddSubscription(subID string) error {
131 s.subMu.Lock()
132 defer s.subMu.Unlock()
133
134 if len(s.subscriptions) >= DefaultMaxSubscriptions {
135 return ErrTooManySubscriptions
136 }
137
138 s.subscriptions[subID] = &Subscription{
139 ID: subID,
140 CreatedAt: time.Now(),
141 }
142 return nil
143 }
144
145 // RemoveSubscription removes a subscription from the session.
146 func (s *Session) RemoveSubscription(subID string) {
147 s.subMu.Lock()
148 defer s.subMu.Unlock()
149 delete(s.subscriptions, subID)
150 }
151
152 // GetSubscription returns a subscription by ID.
153 func (s *Session) GetSubscription(subID string) *Subscription {
154 s.subMu.RLock()
155 defer s.subMu.RUnlock()
156 return s.subscriptions[subID]
157 }
158
159 // HasSubscription checks if a subscription exists.
160 func (s *Session) HasSubscription(subID string) bool {
161 s.subMu.RLock()
162 defer s.subMu.RUnlock()
163 _, ok := s.subscriptions[subID]
164 return ok
165 }
166
167 // SubscriptionCount returns the number of active subscriptions.
168 func (s *Session) SubscriptionCount() int {
169 s.subMu.RLock()
170 defer s.subMu.RUnlock()
171 return len(s.subscriptions)
172 }
173
174 // MarkEOSE marks a subscription as having sent EOSE.
175 func (s *Session) MarkEOSE(subID string) {
176 s.subMu.Lock()
177 defer s.subMu.Unlock()
178 if sub, ok := s.subscriptions[subID]; ok {
179 sub.EOSESent = true
180 }
181 }
182
183 // IncrementEventCount increments the event count for a subscription.
184 func (s *Session) IncrementEventCount(subID string) {
185 s.subMu.Lock()
186 defer s.subMu.Unlock()
187 if sub, ok := s.subscriptions[subID]; ok {
188 sub.EventCount++
189 }
190 }
191
192 // SessionManager manages multiple NRC sessions.
193 type SessionManager struct {
194 sessions map[string]*Session
195 mu sync.RWMutex
196 timeout time.Duration
197 }
198
199 // NewSessionManager creates a new session manager.
200 func NewSessionManager(timeout time.Duration) *SessionManager {
201 if timeout == 0 {
202 timeout = DefaultSessionTimeout
203 }
204 return &SessionManager{
205 sessions: make(map[string]*Session),
206 timeout: timeout,
207 }
208 }
209
210 // Get returns a session by ID.
211 func (m *SessionManager) Get(sessionID string) *Session {
212 m.mu.RLock()
213 defer m.mu.RUnlock()
214 return m.sessions[sessionID]
215 }
216
217 // GetOrCreate gets an existing session or creates a new one.
218 func (m *SessionManager) GetOrCreate(sessionID string, clientPubkey, conversationKey []byte, authMode AuthMode, deviceName string) *Session {
219 m.mu.Lock()
220 defer m.mu.Unlock()
221
222 if session, ok := m.sessions[sessionID]; ok {
223 session.Touch()
224 return session
225 }
226
227 session := NewSession(sessionID, clientPubkey, conversationKey, authMode, deviceName)
228 m.sessions[sessionID] = session
229 return session
230 }
231
232 // Remove removes a session.
233 func (m *SessionManager) Remove(sessionID string) {
234 m.mu.Lock()
235 defer m.mu.Unlock()
236
237 if session, ok := m.sessions[sessionID]; ok {
238 session.Close()
239 delete(m.sessions, sessionID)
240 }
241 }
242
243 // CleanupExpired removes expired sessions.
244 func (m *SessionManager) CleanupExpired() int {
245 m.mu.Lock()
246 defer m.mu.Unlock()
247
248 var removed int
249 for id, session := range m.sessions {
250 if session.IsExpired(m.timeout) {
251 session.Close()
252 delete(m.sessions, id)
253 removed++
254 }
255 }
256 return removed
257 }
258
259 // Count returns the number of active sessions.
260 func (m *SessionManager) Count() int {
261 m.mu.RLock()
262 defer m.mu.RUnlock()
263 return len(m.sessions)
264 }
265
266 // Close closes all sessions.
267 func (m *SessionManager) Close() {
268 m.mu.Lock()
269 defer m.mu.Unlock()
270
271 for _, session := range m.sessions {
272 session.Close()
273 }
274 m.sessions = make(map[string]*Session)
275 }
276
277 // RequestMessage represents a parsed NRC request message.
278 type RequestMessage struct {
279 Type string // EVENT, REQ, CLOSE, AUTH, COUNT, IDS
280 Payload []any
281 }
282
283 // ResponseMessage represents an NRC response message to be sent.
284 type ResponseMessage struct {
285 Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH, IDS, CHUNK
286 Payload []any
287 }
288
289 // EventManifestEntry describes an event for manifest diffing (used by IDS).
290 type EventManifestEntry struct {
291 Kind int `json:"kind"`
292 ID string `json:"id"`
293 CreatedAt int64 `json:"created_at"`
294 D string `json:"d,omitempty"` // For parameterized replaceable events (kinds 30000-39999)
295 }
296
297 // ChunkMessage represents a chunk of a large message.
298 type ChunkMessage struct {
299 Type string `json:"type"` // Always "CHUNK"
300 MessageID string `json:"messageId"` // Unique ID for the chunked message
301 Index int `json:"index"` // 0-based chunk index
302 Total int `json:"total"` // Total number of chunks
303 Data string `json:"data"` // Base64 encoded chunk data
304 }
305
306 // ParseRequestContent parses the decrypted content of an NRC request.
307 func ParseRequestContent(content []byte) (*RequestMessage, error) {
308 // Content format: {"type": "EVENT|REQ|...", "payload": [...]}
309 // Parse as generic JSON
310 var msg struct {
311 Type string `json:"type"`
312 Payload []any `json:"payload"`
313 }
314
315 if err := json.Unmarshal(content, &msg); err != nil {
316 return nil, err
317 }
318
319 if msg.Type == "" {
320 return nil, ErrInvalidMessageType
321 }
322
323 return &RequestMessage{
324 Type: msg.Type,
325 Payload: msg.Payload,
326 }, nil
327 }
328
329 // MarshalResponseContent marshals an NRC response for encryption.
330 func MarshalResponseContent(resp *ResponseMessage) ([]byte, error) {
331 msg := struct {
332 Type string `json:"type"`
333 Payload []any `json:"payload"`
334 }{
335 Type: resp.Type,
336 Payload: resp.Payload,
337 }
338 return json.Marshal(msg)
339 }
340