sab-channel.mjs raw
1 // sab-channel.mjs — SPSC ring buffer channel over SharedArrayBuffer.
2 //
3 // SAB layout (bytes):
4 // [0..3] write_idx u32 (sender advances; receiver waits here)
5 // [4..7] read_idx u32 (receiver advances; sender waits here)
6 // [8..11] slot_size u32 const, power-of-two
7 // [12..15] slot_count u32 const, power-of-two
8 // [16..19] flags u32 bit0=closed
9 // [20..31] reserved
10 // [32..] slots slot_count * slot_size bytes
11 //
12 // Slot layout (slot_size bytes):
13 // [0] is_final u8
14 // [1..3] reserved
15 // [4..7] total_len u32 LE (total message bytes; only in first slot; 0 in subsequent)
16 // [8..11] chunk_len u32 LE (payload bytes in this slot)
17 // [12..] payload slot_size - 12 bytes
18 //
19 // SPSC: one sender, one receiver. No CAS. slot_count must be power-of-two.
20
21 const HDR = 32; // SAB header bytes before slots
22 const SLOT_HDR = 12; // per-slot header bytes
23
24 export function create(slotSize, slotCount) {
25 if ((slotSize & (slotSize - 1)) !== 0) throw new Error('slot_size must be power-of-two');
26 if ((slotCount & (slotCount - 1)) !== 0) throw new Error('slot_count must be power-of-two');
27 if (slotSize < 64) throw new Error('slot_size minimum 64');
28 const sab = new SharedArrayBuffer(HDR + slotCount * slotSize);
29 const u32 = new Uint32Array(sab);
30 u32[2] = slotSize;
31 u32[3] = slotCount;
32 return sab;
33 }
34
35 // send blocks until a slot is free, then writes bytes as chunked slots.
36 // Throws if the channel is closed or bytes.length exceeds channel capacity.
37 export function send(sab, bytes) {
38 const u32 = new Uint32Array(sab);
39 const slotSize = u32[2];
40 const slotCount = u32[3];
41 const payload_capacity = slotSize - SLOT_HDR;
42 const max_msg = (slotCount - 1) * payload_capacity;
43 if (bytes.length > max_msg) {
44 throw new Error('channel_send: message size ' + bytes.length + ' exceeds channel capacity ' + max_msg);
45 }
46
47 const total_len = bytes.length;
48 let offset = 0;
49 let first = true;
50
51 while (offset <= total_len) {
52 const chunk_len = Math.min(total_len - offset, payload_capacity);
53 const is_final = (offset + chunk_len >= total_len) ? 1 : 0;
54
55 // Wait for a free slot: write_idx - read_idx < slotCount.
56 // Re-check closed flag after each wakeup so a dead receiver unblocks the sender.
57 let wi = Atomics.load(u32, 0);
58 while (wi - Atomics.load(u32, 1) >= slotCount) {
59 Atomics.wait(u32, 1, Atomics.load(u32, 1));
60 wi = Atomics.load(u32, 0);
61 if (Atomics.load(u32, 4) & 1) throw new Error('channel_send: send on closed channel');
62 }
63 if (Atomics.load(u32, 4) & 1) throw new Error('channel_send: send on closed channel');
64
65 const slot_idx = wi & (slotCount - 1);
66 const base = HDR + slot_idx * slotSize;
67 const slot = new DataView(sab, base, slotSize);
68 const u8 = new Uint8Array(sab, base, slotSize);
69
70 slot.setUint8(0, is_final);
71 slot.setUint8(1, 0); slot.setUint8(2, 0); slot.setUint8(3, 0);
72 slot.setUint32(4, first ? total_len : 0, true);
73 slot.setUint32(8, chunk_len, true);
74 if (chunk_len > 0) u8.set(bytes.subarray(offset, offset + chunk_len), SLOT_HDR);
75
76 Atomics.store(u32, 0, wi + 1);
77 Atomics.notify(u32, 0, 1);
78
79 offset += chunk_len;
80 first = false;
81 if (is_final) break;
82 }
83 }
84
85 // recv blocks until a message is available.
86 // Returns byte count (0 for zero-byte message), or -1 if channel is closed.
87 // dstBuf must be at least (slotCount - 1) * (slotSize - SLOT_HDR) bytes.
88 export function recv(sab, dstBuf) {
89 const u32 = new Uint32Array(sab);
90 const slotSize = u32[2];
91 const slotCount = u32[3];
92 const payload_capacity = slotSize - SLOT_HDR;
93
94 let total_len = -1;
95 let dst_offset = 0;
96 let awaiting_first = true;
97
98 for (;;) {
99 // Wait for a slot. Re-check closed flag after each wakeup so a dead sender
100 // (forceClose) unblocks the receiver even with an empty ring.
101 let ri = Atomics.load(u32, 1);
102 while (Atomics.load(u32, 0) === ri) {
103 Atomics.wait(u32, 0, ri);
104 ri = Atomics.load(u32, 1);
105 if (Atomics.load(u32, 4) & 1) return -1; // closed while waiting
106 }
107 // Also check before reading: forceClose may have set the flag without
108 // writing a sentinel (if the ring was full).
109 if (awaiting_first && ri === Atomics.load(u32, 0) && (Atomics.load(u32, 4) & 1)) return -1;
110
111 const slot_idx = ri & (slotCount - 1);
112 const base = HDR + slot_idx * slotSize;
113 const slot = new DataView(sab, base, slotSize);
114 const u8 = new Uint8Array(sab, base, slotSize);
115
116 const is_final = slot.getUint8(0);
117 const slot_total_len = slot.getUint32(4, true);
118 const chunk_len = slot.getUint32(8, true);
119
120 // Advance read index before processing (allows sender to reuse slot).
121 Atomics.store(u32, 1, ri + 1);
122 Atomics.notify(u32, 1, 1);
123
124 if (awaiting_first) {
125 if (slot_total_len === 0) {
126 if (Atomics.load(u32, 4) & 1) return -1; // closed sentinel
127 return 0; // zero-byte send
128 }
129 if (slot_total_len > dstBuf.length) {
130 throw new Error('channel_recv: message size ' + slot_total_len + ' exceeds receiver buffer ' + dstBuf.length);
131 }
132 total_len = slot_total_len;
133 awaiting_first = false;
134 }
135
136 if (chunk_len > 0) {
137 dstBuf.set(u8.subarray(SLOT_HDR, SLOT_HDR + chunk_len), dst_offset);
138 dst_offset += chunk_len;
139 }
140
141 if (is_final) return total_len;
142 }
143 }
144
145 // close sets the closed flag, writes a sentinel zero-len slot, notifies receiver.
146 // Use when the sender side closes cleanly (knows the ring is not full).
147 export function close(sab) {
148 const u32 = new Uint32Array(sab);
149 const slotCount = u32[3];
150
151 Atomics.or(u32, 4, 1);
152
153 // Write sentinel slot (blocks if ring is full — clean close path).
154 let wi = Atomics.load(u32, 0);
155 while (wi - Atomics.load(u32, 1) >= slotCount) {
156 Atomics.wait(u32, 1, Atomics.load(u32, 1));
157 wi = Atomics.load(u32, 0);
158 }
159 const slotSize = u32[2];
160 const slot_idx = wi & (slotCount - 1);
161 const base = HDR + slot_idx * slotSize;
162 const slot = new DataView(sab, base, slotSize);
163 slot.setUint8(0, 1); // is_final
164 slot.setUint32(4, 0, true); // total_len = 0
165 slot.setUint32(8, 0, true); // chunk_len = 0
166 Atomics.store(u32, 0, wi + 1);
167 Atomics.notify(u32, 0, 1);
168 }
169
170 // forceClose sets the closed flag and wakes all blocked send/recv waiters
171 // without writing a sentinel slot. Used when a Worker dies unexpectedly.
172 // No role information needed: flag + notify both ends is sufficient.
173 // - Blocked recv: wakes in wait loop, sees bit0, returns -1.
174 // - Blocked send: wakes in wait loop, sees bit0, throws "send on closed channel".
175 export function forceClose(sab) {
176 const u32 = new Uint32Array(sab);
177 Atomics.or(u32, 4, 1);
178 Atomics.notify(u32, 0, Atomics.MAX_VALUE); // wake blocked receivers
179 Atomics.notify(u32, 1, Atomics.MAX_VALUE); // wake blocked senders
180 }
181