manager.go raw
1 // Package cluster provides cluster replication with persistent state
2 package cluster
3
4 import (
5 "context"
6 "encoding/binary"
7 "encoding/json"
8 "fmt"
9 "net/http"
10 gosync "sync"
11 "time"
12
13 "github.com/dgraph-io/badger/v4"
14 "next.orly.dev/pkg/nostr/crypto/keys"
15 "next.orly.dev/pkg/nostr/encoders/event"
16 "next.orly.dev/pkg/nostr/encoders/hex"
17 "next.orly.dev/pkg/nostr/encoders/kind"
18 "next.orly.dev/pkg/lol/log"
19 "next.orly.dev/pkg/database"
20 "next.orly.dev/pkg/database/indexes/types"
21 "next.orly.dev/pkg/sync/common"
22 )
23
24 // EventPublisher is an interface for publishing events
25 type EventPublisher interface {
26 Deliver(*event.E)
27 }
28
29 // Manager handles cluster replication between relay instances
30 type Manager struct {
31 ctx context.Context
32 cancel context.CancelFunc
33 db *database.D
34 adminNpubs []string
35 relayIdentityPubkey string // Our relay's identity pubkey (hex)
36 selfURLs map[string]bool // URLs discovered to be ourselves (for fast lookups)
37 members map[string]*Member // keyed by relay URL
38 membersMux gosync.RWMutex
39 pollTicker *time.Ticker
40 pollDone chan struct{}
41 httpClient *http.Client
42 propagatePrivilegedEvents bool
43 publisher EventPublisher
44 nip11Cache *common.NIP11Cache
45 }
46
47 // Member represents a cluster member
48 type Member struct {
49 HTTPURL string
50 WebSocketURL string
51 LastSerial uint64
52 LastPoll time.Time
53 Status string // "active", "error", "unknown"
54 ErrorCount int
55 }
56
57 // LatestSerialResponse returns the latest serial
58 type LatestSerialResponse struct {
59 Serial uint64 `json:"serial"`
60 Timestamp int64 `json:"timestamp"`
61 }
62
63 // EventsRangeResponse contains events in a range
64 type EventsRangeResponse struct {
65 Events []EventInfo `json:"events"`
66 HasMore bool `json:"has_more"`
67 NextFrom uint64 `json:"next_from,omitempty"`
68 }
69
70 // EventInfo contains metadata about an event
71 type EventInfo struct {
72 Serial uint64 `json:"serial"`
73 ID string `json:"id"`
74 Timestamp int64 `json:"timestamp"`
75 }
76
77 // Config holds configuration for the cluster manager
78 type Config struct {
79 AdminNpubs []string
80 PropagatePrivilegedEvents bool
81 PollInterval time.Duration
82 NIP11CacheTTL time.Duration
83 }
84
85 // DefaultConfig returns default configuration
86 func DefaultConfig() *Config {
87 return &Config{
88 PropagatePrivilegedEvents: true,
89 PollInterval: 5 * time.Second,
90 NIP11CacheTTL: 30 * time.Minute,
91 }
92 }
93
94 // NewManager creates a new cluster manager
95 func NewManager(ctx context.Context, db *database.D, cfg *Config, publisher EventPublisher) *Manager {
96 ctx, cancel := context.WithCancel(ctx)
97
98 if cfg == nil {
99 cfg = DefaultConfig()
100 }
101
102 // Get our relay identity pubkey
103 var relayPubkey string
104 if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
105 if pk, err := keys.SecretBytesToPubKeyHex(skb); err == nil {
106 relayPubkey = pk
107 }
108 }
109
110 cm := &Manager{
111 ctx: ctx,
112 cancel: cancel,
113 db: db,
114 adminNpubs: cfg.AdminNpubs,
115 relayIdentityPubkey: relayPubkey,
116 selfURLs: make(map[string]bool),
117 members: make(map[string]*Member),
118 pollDone: make(chan struct{}),
119 propagatePrivilegedEvents: cfg.PropagatePrivilegedEvents,
120 publisher: publisher,
121 httpClient: &http.Client{
122 Timeout: 30 * time.Second,
123 },
124 nip11Cache: common.NewNIP11Cache(cfg.NIP11CacheTTL),
125 }
126
127 return cm
128 }
129
130 // Start starts the cluster polling loop
131 func (cm *Manager) Start() {
132 log.I.Ln("starting cluster replication manager")
133
134 // Load persisted peer state from database
135 if err := cm.loadPeerState(); err != nil {
136 log.W.F("failed to load cluster peer state: %v", err)
137 }
138
139 cm.pollTicker = time.NewTicker(5 * time.Second)
140 go cm.pollingLoop()
141 }
142
143 // Stop stops the cluster polling loop
144 func (cm *Manager) Stop() {
145 log.I.Ln("stopping cluster replication manager")
146 cm.cancel()
147 if cm.pollTicker != nil {
148 cm.pollTicker.Stop()
149 }
150 <-cm.pollDone
151 }
152
153 // GetRelayIdentityPubkey returns the relay's identity pubkey
154 func (cm *Manager) GetRelayIdentityPubkey() string {
155 return cm.relayIdentityPubkey
156 }
157
158 // GetMembers returns a copy of the current members
159 func (cm *Manager) GetMembers() []*Member {
160 cm.membersMux.RLock()
161 defer cm.membersMux.RUnlock()
162 members := make([]*Member, 0, len(cm.members))
163 for _, m := range cm.members {
164 memberCopy := *m
165 members = append(members, &memberCopy)
166 }
167 return members
168 }
169
170 // GetMember returns a specific member by URL or nil if not found
171 func (cm *Manager) GetMember(httpURL string) *Member {
172 cm.membersMux.RLock()
173 defer cm.membersMux.RUnlock()
174 if m, ok := cm.members[httpURL]; ok {
175 memberCopy := *m
176 return &memberCopy
177 }
178 return nil
179 }
180
181 // GetLatestSerial returns the latest serial and timestamp from the database
182 func (cm *Manager) GetLatestSerial() (uint64, int64) {
183 serial, err := cm.getLatestSerialFromDB()
184 if err != nil {
185 return 0, time.Now().Unix()
186 }
187 return serial, time.Now().Unix()
188 }
189
190 // PropagatePrivilegedEvents returns whether privileged events should be propagated
191 func (cm *Manager) PropagatePrivilegedEvents() bool {
192 return cm.propagatePrivilegedEvents
193 }
194
195 // GetEventsInRange returns events in a serial range with pagination
196 func (cm *Manager) GetEventsInRange(from, to uint64, limit int) ([]EventInfo, bool, uint64) {
197 events, hasMore, nextFrom, err := cm.getEventsInRangeFromDB(from, to, limit)
198 if err != nil {
199 return nil, false, 0
200 }
201 return events, hasMore, nextFrom
202 }
203
204 // IsSelfURL checks if a URL is our own relay
205 func (cm *Manager) IsSelfURL(url string) bool {
206 cm.membersMux.RLock()
207 result := cm.selfURLs[url]
208 cm.membersMux.RUnlock()
209 return result
210 }
211
212 // MarkSelfURL marks a URL as belonging to us
213 func (cm *Manager) MarkSelfURL(url string) {
214 cm.membersMux.Lock()
215 cm.selfURLs[url] = true
216 cm.membersMux.Unlock()
217 }
218
219 func (cm *Manager) pollingLoop() {
220 defer close(cm.pollDone)
221
222 for {
223 select {
224 case <-cm.ctx.Done():
225 return
226 case <-cm.pollTicker.C:
227 cm.pollAllMembers()
228 }
229 }
230 }
231
232 func (cm *Manager) pollAllMembers() {
233 cm.membersMux.RLock()
234 members := make([]*Member, 0, len(cm.members))
235 for _, member := range cm.members {
236 members = append(members, member)
237 }
238 cm.membersMux.RUnlock()
239
240 for _, member := range members {
241 go cm.pollMember(member)
242 }
243 }
244
245 func (cm *Manager) pollMember(member *Member) {
246 // Get latest serial from peer
247 latestResp, err := cm.getLatestSerial(member.HTTPURL)
248 if err != nil {
249 log.W.F("failed to get latest serial from %s: %v", member.HTTPURL, err)
250 cm.updateMemberStatus(member, "error")
251 return
252 }
253
254 cm.updateMemberStatus(member, "active")
255 member.LastPoll = time.Now()
256
257 // Check if we need to fetch new events
258 if latestResp.Serial <= member.LastSerial {
259 return // No new events
260 }
261
262 // Fetch events in range
263 from := member.LastSerial + 1
264 to := latestResp.Serial
265
266 eventsResp, err := cm.getEventsInRange(member.HTTPURL, from, to, 1000)
267 if err != nil {
268 log.W.F("failed to get events from %s: %v", member.HTTPURL, err)
269 return
270 }
271
272 // Process fetched events
273 for _, eventInfo := range eventsResp.Events {
274 if cm.shouldFetchEvent(eventInfo) {
275 // Fetch full event via WebSocket and store it
276 if err := cm.fetchAndStoreEvent(member.WebSocketURL, eventInfo.ID, cm.publisher); err != nil {
277 log.W.F("failed to fetch/store event %s from %s: %v", eventInfo.ID, member.HTTPURL, err)
278 } else {
279 log.D.F("successfully replicated event %s from %s", eventInfo.ID, member.HTTPURL)
280 }
281 }
282 }
283
284 // Update last serial if we processed all events
285 if !eventsResp.HasMore && member.LastSerial != to {
286 member.LastSerial = to
287 // Persist the updated serial to database
288 if err := cm.savePeerState(member.HTTPURL, to); err != nil {
289 log.W.F("failed to persist serial %d for peer %s: %v", to, member.HTTPURL, err)
290 }
291 }
292 }
293
294 func (cm *Manager) getLatestSerial(peerURL string) (*LatestSerialResponse, error) {
295 url := fmt.Sprintf("%s/cluster/latest", peerURL)
296 resp, err := cm.httpClient.Get(url)
297 if err != nil {
298 return nil, err
299 }
300 defer resp.Body.Close()
301
302 if resp.StatusCode != http.StatusOK {
303 return nil, fmt.Errorf("HTTP %d", resp.StatusCode)
304 }
305
306 var result LatestSerialResponse
307 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
308 return nil, err
309 }
310
311 return &result, nil
312 }
313
314 func (cm *Manager) getEventsInRange(peerURL string, from, to uint64, limit int) (*EventsRangeResponse, error) {
315 url := fmt.Sprintf("%s/cluster/events?from=%d&to=%d&limit=%d", peerURL, from, to, limit)
316 resp, err := cm.httpClient.Get(url)
317 if err != nil {
318 return nil, err
319 }
320 defer resp.Body.Close()
321
322 if resp.StatusCode != http.StatusOK {
323 return nil, fmt.Errorf("HTTP %d", resp.StatusCode)
324 }
325
326 var result EventsRangeResponse
327 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
328 return nil, err
329 }
330
331 return &result, nil
332 }
333
334 func (cm *Manager) shouldFetchEvent(eventInfo EventInfo) bool {
335 // Relays MAY choose not to store every event they receive
336 // For now, accept all events
337 return true
338 }
339
340 func (cm *Manager) updateMemberStatus(member *Member, status string) {
341 member.Status = status
342 if status == "error" {
343 member.ErrorCount++
344 } else {
345 member.ErrorCount = 0
346 }
347 }
348
349 // UpdateMembership updates the cluster membership
350 func (cm *Manager) UpdateMembership(relayURLs []string) {
351 cm.membersMux.Lock()
352 defer cm.membersMux.Unlock()
353
354 // Remove members not in the new list
355 for url := range cm.members {
356 found := false
357 for _, newURL := range relayURLs {
358 if newURL == url {
359 found = true
360 break
361 }
362 }
363 if !found {
364 delete(cm.members, url)
365 // Remove persisted state for removed peer
366 if err := cm.removePeerState(url); err != nil {
367 log.W.F("failed to remove persisted state for peer %s: %v", url, err)
368 }
369 log.D.F("removed cluster member: %s", url)
370 }
371 }
372
373 // Add new members (filter out self once at this point)
374 for _, url := range relayURLs {
375 // Skip if already exists
376 if _, exists := cm.members[url]; exists {
377 continue
378 }
379
380 // Fast path: check if we already know this URL is ours
381 if cm.selfURLs[url] {
382 log.D.F("removed self from cluster members (known URL): %s", url)
383 continue
384 }
385
386 // Slow path: check via NIP-11 pubkey
387 if cm.relayIdentityPubkey != "" {
388 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
389 peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, url)
390 cancel()
391
392 if err != nil {
393 log.D.F("couldn't fetch NIP-11 for %s, adding to cluster anyway: %v", url, err)
394 } else if peerPubkey == cm.relayIdentityPubkey {
395 log.D.F("removed self from cluster members (discovered): %s (pubkey: %s)", url, cm.relayIdentityPubkey)
396 // Cache this URL as ours for future fast lookups
397 cm.selfURLs[url] = true
398 continue
399 }
400 }
401
402 // Add member
403 member := &Member{
404 HTTPURL: url,
405 WebSocketURL: url, // TODO: Convert to WebSocket URL
406 LastSerial: 0,
407 Status: "unknown",
408 }
409 cm.members[url] = member
410 log.D.F("added cluster member: %s", url)
411 }
412 }
413
414 // HandleMembershipEvent processes a cluster membership event (Kind 39108)
415 func (cm *Manager) HandleMembershipEvent(ev *event.E) error {
416 // Verify the event is signed by a cluster admin
417 adminFound := false
418 for _, adminNpub := range cm.adminNpubs {
419 // TODO: Convert adminNpub to pubkey and verify signature
420 // For now, accept all events (this should be properly validated)
421 _ = adminNpub // Mark as used to avoid compiler warning
422 adminFound = true
423 break
424 }
425
426 if !adminFound {
427 return fmt.Errorf("event not signed by cluster admin")
428 }
429
430 // Parse the relay URLs from the tags
431 var relayURLs []string
432 for _, tag := range *ev.Tags {
433 if len(tag.T) >= 2 && string(tag.T[0]) == "relay" {
434 relayURLs = append(relayURLs, string(tag.T[1]))
435 }
436 }
437
438 if len(relayURLs) == 0 {
439 return fmt.Errorf("no relay URLs found in membership event")
440 }
441
442 // Update cluster membership
443 cm.UpdateMembership(relayURLs)
444
445 log.D.F("updated cluster membership with %d relays from event %x", len(relayURLs), ev.ID)
446
447 return nil
448 }
449
450 // HTTP Handlers
451
452 // HandleLatestSerial handles GET /cluster/latest
453 func (cm *Manager) HandleLatestSerial(w http.ResponseWriter, r *http.Request) {
454 if r.Method != http.MethodGet {
455 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
456 return
457 }
458
459 // Check if request is from ourselves by examining the Referer or Origin header
460 origin := r.Header.Get("Origin")
461 referer := r.Header.Get("Referer")
462
463 if cm.relayIdentityPubkey != "" && (origin != "" || referer != "") {
464 checkURL := origin
465 if checkURL == "" {
466 checkURL = referer
467 }
468
469 // Fast path: check known self-URLs
470 if cm.selfURLs[checkURL] {
471 log.D.F("rejecting cluster latest request from self (known URL): %s", checkURL)
472 http.Error(w, "Cannot sync with self", http.StatusBadRequest)
473 return
474 }
475
476 // Slow path: verify via NIP-11
477 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
478 peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, checkURL)
479 cancel()
480
481 if err == nil && peerPubkey == cm.relayIdentityPubkey {
482 log.D.F("rejecting cluster latest request from self (discovered): %s", checkURL)
483 // Cache for future fast lookups
484 cm.membersMux.Lock()
485 cm.selfURLs[checkURL] = true
486 cm.membersMux.Unlock()
487 http.Error(w, "Cannot sync with self", http.StatusBadRequest)
488 return
489 }
490 }
491
492 // Get the latest serial from database
493 latestSerial, err := cm.getLatestSerialFromDB()
494 if err != nil {
495 log.W.F("failed to get latest serial: %v", err)
496 http.Error(w, "Internal server error", http.StatusInternalServerError)
497 return
498 }
499
500 response := LatestSerialResponse{
501 Serial: latestSerial,
502 Timestamp: time.Now().Unix(),
503 }
504
505 w.Header().Set("Content-Type", "application/json")
506 json.NewEncoder(w).Encode(response)
507 }
508
509 // HandleEventsRange handles GET /cluster/events
510 func (cm *Manager) HandleEventsRange(w http.ResponseWriter, r *http.Request) {
511 if r.Method != http.MethodGet {
512 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
513 return
514 }
515
516 // Check if request is from ourselves
517 origin := r.Header.Get("Origin")
518 referer := r.Header.Get("Referer")
519
520 if cm.relayIdentityPubkey != "" && (origin != "" || referer != "") {
521 checkURL := origin
522 if checkURL == "" {
523 checkURL = referer
524 }
525
526 // Fast path: check known self-URLs
527 if cm.selfURLs[checkURL] {
528 log.D.F("rejecting cluster events request from self (known URL): %s", checkURL)
529 http.Error(w, "Cannot sync with self", http.StatusBadRequest)
530 return
531 }
532
533 // Slow path: verify via NIP-11
534 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
535 peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, checkURL)
536 cancel()
537
538 if err == nil && peerPubkey == cm.relayIdentityPubkey {
539 log.D.F("rejecting cluster events request from self (discovered): %s", checkURL)
540 cm.membersMux.Lock()
541 cm.selfURLs[checkURL] = true
542 cm.membersMux.Unlock()
543 http.Error(w, "Cannot sync with self", http.StatusBadRequest)
544 return
545 }
546 }
547
548 // Parse query parameters
549 fromStr := r.URL.Query().Get("from")
550 toStr := r.URL.Query().Get("to")
551 limitStr := r.URL.Query().Get("limit")
552
553 from := uint64(0)
554 to := uint64(0)
555 limit := 1000
556
557 if fromStr != "" {
558 fmt.Sscanf(fromStr, "%d", &from)
559 }
560 if toStr != "" {
561 fmt.Sscanf(toStr, "%d", &to)
562 }
563 if limitStr != "" {
564 fmt.Sscanf(limitStr, "%d", &limit)
565 if limit > 10000 {
566 limit = 10000
567 }
568 }
569
570 // Get events in range
571 events, hasMore, nextFrom, err := cm.getEventsInRangeFromDB(from, to, limit)
572 if err != nil {
573 log.W.F("failed to get events in range: %v", err)
574 http.Error(w, "Internal server error", http.StatusInternalServerError)
575 return
576 }
577
578 response := EventsRangeResponse{
579 Events: events,
580 HasMore: hasMore,
581 NextFrom: nextFrom,
582 }
583
584 w.Header().Set("Content-Type", "application/json")
585 json.NewEncoder(w).Encode(response)
586 }
587
588 func (cm *Manager) getLatestSerialFromDB() (uint64, error) {
589 var maxSerial uint64 = 0
590
591 err := cm.db.View(func(txn *badger.Txn) error {
592 it := txn.NewIterator(badger.IteratorOptions{
593 Reverse: true,
594 Prefix: []byte{0},
595 })
596 defer it.Close()
597
598 it.Seek([]byte{0})
599 if it.Valid() {
600 key := it.Item().Key()
601 if len(key) >= 5 {
602 serial := binary.BigEndian.Uint64(key[len(key)-8:]) >> 24
603 if serial > maxSerial {
604 maxSerial = serial
605 }
606 }
607 }
608
609 return nil
610 })
611
612 return maxSerial, err
613 }
614
615 func (cm *Manager) getEventsInRangeFromDB(from, to uint64, limit int) ([]EventInfo, bool, uint64, error) {
616 var events []EventInfo
617 var hasMore bool
618 var nextFrom uint64
619
620 fromSerial := &types.Uint40{}
621 toSerial := &types.Uint40{}
622
623 if err := fromSerial.Set(from); err != nil {
624 return nil, false, 0, err
625 }
626 if err := toSerial.Set(to); err != nil {
627 return nil, false, 0, err
628 }
629
630 err := cm.db.View(func(txn *badger.Txn) error {
631 it := txn.NewIterator(badger.IteratorOptions{
632 Prefix: []byte{0},
633 })
634 defer it.Close()
635
636 count := 0
637 it.Seek([]byte{0})
638
639 for it.Valid() && count < limit {
640 key := it.Item().Key()
641
642 if len(key) >= 8 && key[0] == 0 && key[1] == 0 && key[2] == 0 {
643 if len(key) >= 8 {
644 serial := binary.BigEndian.Uint64(key[len(key)-8:]) >> 24
645
646 if serial >= from && serial <= to {
647 serial40 := &types.Uint40{}
648 if err := serial40.Set(serial); err != nil {
649 continue
650 }
651
652 ev, err := cm.db.FetchEventBySerial(serial40)
653 if err != nil {
654 continue
655 }
656
657 shouldPropagate := true
658 if !cm.propagatePrivilegedEvents && kind.IsPrivileged(ev.Kind) {
659 shouldPropagate = false
660 }
661
662 if shouldPropagate {
663 events = append(events, EventInfo{
664 Serial: serial,
665 ID: hex.Enc(ev.ID),
666 Timestamp: ev.CreatedAt,
667 })
668 count++
669 }
670
671 ev.Free()
672 }
673 }
674 }
675
676 it.Next()
677 }
678
679 if it.Valid() {
680 hasMore = true
681 nextKey := it.Item().Key()
682 if len(nextKey) >= 8 && nextKey[0] == 0 && nextKey[1] == 0 && nextKey[2] == 0 {
683 nextSerial := binary.BigEndian.Uint64(nextKey[len(nextKey)-8:]) >> 24
684 nextFrom = nextSerial
685 }
686 }
687
688 return nil
689 })
690
691 return events, hasMore, nextFrom, err
692 }
693
694 func (cm *Manager) fetchAndStoreEvent(wsURL, eventID string, publisher EventPublisher) error {
695 // TODO: Implement WebSocket connection and event fetching
696 log.D.F("fetchAndStoreEvent called for %s from %s (placeholder implementation)", eventID, wsURL)
697 return nil
698 }
699
700 // Database key prefixes for cluster state persistence
701 const (
702 clusterPeerStatePrefix = "cluster:peer:"
703 )
704
705 func (cm *Manager) loadPeerState() error {
706 cm.membersMux.Lock()
707 defer cm.membersMux.Unlock()
708
709 prefix := []byte(clusterPeerStatePrefix)
710 return cm.db.View(func(txn *badger.Txn) error {
711 it := txn.NewIterator(badger.IteratorOptions{
712 Prefix: prefix,
713 })
714 defer it.Close()
715
716 for it.Rewind(); it.Valid(); it.Next() {
717 item := it.Item()
718 key := item.Key()
719
720 peerURL := string(key[len(prefix):])
721
722 var serial uint64
723 err := item.Value(func(val []byte) error {
724 if len(val) == 8 {
725 serial = binary.BigEndian.Uint64(val)
726 }
727 return nil
728 })
729 if err != nil {
730 log.W.F("failed to read peer state for %s: %v", peerURL, err)
731 continue
732 }
733
734 if member, exists := cm.members[peerURL]; exists {
735 member.LastSerial = serial
736 log.D.F("loaded persisted serial %d for existing peer %s", serial, peerURL)
737 } else {
738 member := &Member{
739 HTTPURL: peerURL,
740 WebSocketURL: peerURL,
741 LastSerial: serial,
742 Status: "unknown",
743 }
744 cm.members[peerURL] = member
745 log.D.F("loaded persisted serial %d for new peer %s", serial, peerURL)
746 }
747 }
748 return nil
749 })
750 }
751
752 func (cm *Manager) savePeerState(peerURL string, serial uint64) error {
753 key := []byte(clusterPeerStatePrefix + peerURL)
754 value := make([]byte, 8)
755 binary.BigEndian.PutUint64(value, serial)
756
757 return cm.db.Update(func(txn *badger.Txn) error {
758 return txn.Set(key, value)
759 })
760 }
761
762 func (cm *Manager) removePeerState(peerURL string) error {
763 key := []byte(clusterPeerStatePrefix + peerURL)
764
765 return cm.db.Update(func(txn *badger.Txn) error {
766 return txn.Delete(key)
767 })
768 }
769