sab-channel.mjs raw

   1  // sab-channel.mjs — SPSC ring buffer channel over SharedArrayBuffer.
   2  //
   3  // SAB layout (bytes):
   4  //   [0..3]   write_idx  u32  (sender advances; receiver waits here)
   5  //   [4..7]   read_idx   u32  (receiver advances; sender waits here)
   6  //   [8..11]  slot_size  u32  const, power-of-two
   7  //   [12..15] slot_count u32  const, power-of-two
   8  //   [16..19] flags      u32  bit0=closed
   9  //   [20..31] reserved
  10  //   [32..]   slots      slot_count * slot_size bytes
  11  //
  12  // Slot layout (slot_size bytes):
  13  //   [0]     is_final   u8
  14  //   [1..3]  reserved
  15  //   [4..7]  total_len  u32 LE  (total message bytes; only in first slot; 0 in subsequent)
  16  //   [8..11] chunk_len  u32 LE  (payload bytes in this slot)
  17  //   [12..]  payload    slot_size - 12 bytes
  18  //
  19  // SPSC: one sender, one receiver. No CAS. slot_count must be power-of-two.
  20  
  21  const HDR = 32;      // SAB header bytes before slots
  22  const SLOT_HDR = 12; // per-slot header bytes
  23  
  24  export function create(slotSize, slotCount) {
  25    if ((slotSize & (slotSize - 1)) !== 0) throw new Error('slot_size must be power-of-two');
  26    if ((slotCount & (slotCount - 1)) !== 0) throw new Error('slot_count must be power-of-two');
  27    if (slotSize < 64) throw new Error('slot_size minimum 64');
  28    const sab = new SharedArrayBuffer(HDR + slotCount * slotSize);
  29    const u32 = new Uint32Array(sab);
  30    u32[2] = slotSize;
  31    u32[3] = slotCount;
  32    return sab;
  33  }
  34  
  35  // send blocks until a slot is free, then writes bytes as chunked slots.
  36  // Throws if the channel is closed or bytes.length exceeds channel capacity.
  37  export function send(sab, bytes) {
  38    const u32 = new Uint32Array(sab);
  39    const slotSize = u32[2];
  40    const slotCount = u32[3];
  41    const payload_capacity = slotSize - SLOT_HDR;
  42    const max_msg = (slotCount - 1) * payload_capacity;
  43    if (bytes.length > max_msg) {
  44      throw new Error('channel_send: message size ' + bytes.length + ' exceeds channel capacity ' + max_msg);
  45    }
  46  
  47    const total_len = bytes.length;
  48    let offset = 0;
  49    let first = true;
  50  
  51    while (offset <= total_len) {
  52      const chunk_len = Math.min(total_len - offset, payload_capacity);
  53      const is_final = (offset + chunk_len >= total_len) ? 1 : 0;
  54  
  55      // Wait for a free slot: write_idx - read_idx < slotCount.
  56      // Re-check closed flag after each wakeup so a dead receiver unblocks the sender.
  57      let wi = Atomics.load(u32, 0);
  58      while (wi - Atomics.load(u32, 1) >= slotCount) {
  59        Atomics.wait(u32, 1, Atomics.load(u32, 1));
  60        wi = Atomics.load(u32, 0);
  61        if (Atomics.load(u32, 4) & 1) throw new Error('channel_send: send on closed channel');
  62      }
  63      if (Atomics.load(u32, 4) & 1) throw new Error('channel_send: send on closed channel');
  64  
  65      const slot_idx = wi & (slotCount - 1);
  66      const base = HDR + slot_idx * slotSize;
  67      const slot = new DataView(sab, base, slotSize);
  68      const u8 = new Uint8Array(sab, base, slotSize);
  69  
  70      slot.setUint8(0, is_final);
  71      slot.setUint8(1, 0); slot.setUint8(2, 0); slot.setUint8(3, 0);
  72      slot.setUint32(4, first ? total_len : 0, true);
  73      slot.setUint32(8, chunk_len, true);
  74      if (chunk_len > 0) u8.set(bytes.subarray(offset, offset + chunk_len), SLOT_HDR);
  75  
  76      Atomics.store(u32, 0, wi + 1);
  77      Atomics.notify(u32, 0, 1);
  78  
  79      offset += chunk_len;
  80      first = false;
  81      if (is_final) break;
  82    }
  83  }
  84  
  85  // recv blocks until a message is available.
  86  // Returns byte count (0 for zero-byte message), or -1 if channel is closed.
  87  // dstBuf must be at least (slotCount - 1) * (slotSize - SLOT_HDR) bytes.
  88  export function recv(sab, dstBuf) {
  89    const u32 = new Uint32Array(sab);
  90    const slotSize = u32[2];
  91    const slotCount = u32[3];
  92    const payload_capacity = slotSize - SLOT_HDR;
  93  
  94    let total_len = -1;
  95    let dst_offset = 0;
  96    let awaiting_first = true;
  97  
  98    for (;;) {
  99      // Wait for a slot. Re-check closed flag after each wakeup so a dead sender
 100      // (forceClose) unblocks the receiver even with an empty ring.
 101      let ri = Atomics.load(u32, 1);
 102      while (Atomics.load(u32, 0) === ri) {
 103        Atomics.wait(u32, 0, ri);
 104        ri = Atomics.load(u32, 1);
 105        if (Atomics.load(u32, 4) & 1) return -1; // closed while waiting
 106      }
 107      // Also check before reading: forceClose may have set the flag without
 108      // writing a sentinel (if the ring was full).
 109      if (awaiting_first && ri === Atomics.load(u32, 0) && (Atomics.load(u32, 4) & 1)) return -1;
 110  
 111      const slot_idx = ri & (slotCount - 1);
 112      const base = HDR + slot_idx * slotSize;
 113      const slot = new DataView(sab, base, slotSize);
 114      const u8 = new Uint8Array(sab, base, slotSize);
 115  
 116      const is_final = slot.getUint8(0);
 117      const slot_total_len = slot.getUint32(4, true);
 118      const chunk_len = slot.getUint32(8, true);
 119  
 120      // Advance read index before processing (allows sender to reuse slot).
 121      Atomics.store(u32, 1, ri + 1);
 122      Atomics.notify(u32, 1, 1);
 123  
 124      if (awaiting_first) {
 125        if (slot_total_len === 0) {
 126          if (Atomics.load(u32, 4) & 1) return -1; // closed sentinel
 127          return 0; // zero-byte send
 128        }
 129        if (slot_total_len > dstBuf.length) {
 130          throw new Error('channel_recv: message size ' + slot_total_len + ' exceeds receiver buffer ' + dstBuf.length);
 131        }
 132        total_len = slot_total_len;
 133        awaiting_first = false;
 134      }
 135  
 136      if (chunk_len > 0) {
 137        dstBuf.set(u8.subarray(SLOT_HDR, SLOT_HDR + chunk_len), dst_offset);
 138        dst_offset += chunk_len;
 139      }
 140  
 141      if (is_final) return total_len;
 142    }
 143  }
 144  
 145  // close sets the closed flag, writes a sentinel zero-len slot, notifies receiver.
 146  // Use when the sender side closes cleanly (knows the ring is not full).
 147  export function close(sab) {
 148    const u32 = new Uint32Array(sab);
 149    const slotCount = u32[3];
 150  
 151    Atomics.or(u32, 4, 1);
 152  
 153    // Write sentinel slot (blocks if ring is full — clean close path).
 154    let wi = Atomics.load(u32, 0);
 155    while (wi - Atomics.load(u32, 1) >= slotCount) {
 156      Atomics.wait(u32, 1, Atomics.load(u32, 1));
 157      wi = Atomics.load(u32, 0);
 158    }
 159    const slotSize = u32[2];
 160    const slot_idx = wi & (slotCount - 1);
 161    const base = HDR + slot_idx * slotSize;
 162    const slot = new DataView(sab, base, slotSize);
 163    slot.setUint8(0, 1);       // is_final
 164    slot.setUint32(4, 0, true); // total_len = 0
 165    slot.setUint32(8, 0, true); // chunk_len = 0
 166    Atomics.store(u32, 0, wi + 1);
 167    Atomics.notify(u32, 0, 1);
 168  }
 169  
 170  // forceClose sets the closed flag and wakes all blocked send/recv waiters
 171  // without writing a sentinel slot. Used when a Worker dies unexpectedly.
 172  // No role information needed: flag + notify both ends is sufficient.
 173  //   - Blocked recv: wakes in wait loop, sees bit0, returns -1.
 174  //   - Blocked send: wakes in wait loop, sees bit0, throws "send on closed channel".
 175  export function forceClose(sab) {
 176    const u32 = new Uint32Array(sab);
 177    Atomics.or(u32, 4, 1);
 178    Atomics.notify(u32, 0, Atomics.MAX_VALUE); // wake blocked receivers
 179    Atomics.notify(u32, 1, Atomics.MAX_VALUE); // wake blocked senders
 180  }
 181