channel.mjs raw

   1  // TinyJS Runtime — Channels
   2  // Go channels with buffered/unbuffered semantics, close, and select.
   3  
   4  export class Channel {
   5    constructor(bufferSize = 0) {
   6      this.bufferSize = bufferSize;
   7      this.buffer = [];
   8      this.closed = false;
   9      this.sendWaiters = []; // [{value, resolve, reject}]
  10      this.recvWaiters = []; // [{resolve, reject}]
  11    }
  12  
  13    // Skip cancelled waiters (from resolved selects).
  14    _shiftRecv() {
  15      while (this.recvWaiters.length > 0) {
  16        const w = this.recvWaiters.shift();
  17        if (!w.cancelled) return w;
  18      }
  19      return null;
  20    }
  21  
  22    _shiftSend() {
  23      while (this.sendWaiters.length > 0) {
  24        const w = this.sendWaiters.shift();
  25        if (!w.cancelled) return w;
  26      }
  27      return null;
  28    }
  29  
  30    // Send a value. Blocks (awaits) if buffer full and no receiver waiting.
  31    async send(value) {
  32      if (this.closed) {
  33        throw new Error('send on closed channel');
  34      }
  35  
  36      // If a receiver is waiting, hand off directly.
  37      const waiter = this._shiftRecv();
  38      if (waiter) {
  39        waiter.resolve({ value, ok: true });
  40        return;
  41      }
  42  
  43      // If buffer has space, enqueue.
  44      if (this.buffer.length < this.bufferSize) {
  45        this.buffer.push(value);
  46        return;
  47      }
  48  
  49      // Block until a receiver arrives.
  50      return new Promise((resolve, reject) => {
  51        this.sendWaiters.push({ value, resolve, reject });
  52      });
  53    }
  54  
  55    // Receive a value. Blocks (awaits) if buffer empty and no sender waiting.
  56    async recv() {
  57      // If buffer has data, return it. Then unblock a sender if waiting.
  58      if (this.buffer.length > 0) {
  59        const value = this.buffer.shift();
  60        const sender = this._shiftSend();
  61        if (sender) {
  62          this.buffer.push(sender.value);
  63          sender.resolve();
  64        }
  65        return { value, ok: true };
  66      }
  67  
  68      // If a sender is waiting (unbuffered or buffer was empty), take directly.
  69      const sender = this._shiftSend();
  70      if (sender) {
  71        sender.resolve();
  72        return { value: sender.value, ok: true };
  73      }
  74  
  75      // If closed and nothing buffered, return zero value.
  76      if (this.closed) {
  77        return { value: undefined, ok: false };
  78      }
  79  
  80      // Block until a sender arrives or channel closes.
  81      return new Promise((resolve) => {
  82        this.recvWaiters.push({ resolve });
  83      });
  84    }
  85  
  86    // Close the channel.
  87    close() {
  88      if (this.closed) {
  89        throw new Error('close of closed channel');
  90      }
  91      this.closed = true;
  92  
  93      // Wake all waiting receivers with zero value.
  94      let w;
  95      while ((w = this._shiftRecv())) {
  96        w.resolve({ value: undefined, ok: false });
  97      }
  98  
  99      // Panic on waiting senders.
 100      while ((w = this._shiftSend())) {
 101        w.reject(new Error('send on closed channel'));
 102      }
 103    }
 104  
 105    // Non-blocking try-send. Returns true if sent.
 106    trySend(value) {
 107      if (this.closed) throw new Error('send on closed channel');
 108  
 109      const waiter = this._shiftRecv();
 110      if (waiter) {
 111        waiter.resolve({ value, ok: true });
 112        return true;
 113      }
 114  
 115      if (this.buffer.length < this.bufferSize) {
 116        this.buffer.push(value);
 117        return true;
 118      }
 119  
 120      return false;
 121    }
 122  
 123    // Non-blocking try-recv. Returns {value, ok, received}.
 124    tryRecv() {
 125      if (this.buffer.length > 0) {
 126        const value = this.buffer.shift();
 127        const sender = this._shiftSend();
 128        if (sender) {
 129          this.buffer.push(sender.value);
 130          sender.resolve();
 131        }
 132        return { value, ok: true, received: true };
 133      }
 134  
 135      const sender = this._shiftSend();
 136      if (sender) {
 137        sender.resolve();
 138        return { value: sender.value, ok: true, received: true };
 139      }
 140  
 141      if (this.closed) {
 142        return { value: undefined, ok: false, received: true };
 143      }
 144  
 145      return { value: undefined, ok: false, received: false };
 146    }
 147  }
 148  
 149  // Select statement implementation.
 150  // cases: [{ch, dir: 'send'|'recv', value?, id}]
 151  // hasDefault: boolean
 152  // Returns: {id, value, ok}
 153  export async function select(cases, hasDefault) {
 154    // Shuffle cases for fairness (Go spec requires pseudo-random selection).
 155    const shuffled = cases.map((c, i) => ({ ...c, origIndex: i }));
 156    for (let i = shuffled.length - 1; i > 0; i--) {
 157      const j = Math.floor(Math.random() * (i + 1));
 158      [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]];
 159    }
 160  
 161    // Try non-blocking first.
 162    for (const c of shuffled) {
 163      if (c.dir === 'send') {
 164        if (c.ch.trySend(c.value)) {
 165          return { id: c.id, value: undefined, ok: true };
 166        }
 167      } else {
 168        const result = c.ch.tryRecv();
 169        if (result.received) {
 170          return { id: c.id, value: result.value, ok: result.ok };
 171        }
 172      }
 173    }
 174  
 175    // Default case.
 176    if (hasDefault) {
 177      return { id: -1, value: undefined, ok: false };
 178    }
 179  
 180    // Block: race all cases. Use cancel flags instead of splice for O(1) cleanup.
 181    return new Promise((resolve, reject) => {
 182      const waiters = [];
 183      let resolved = false;
 184  
 185      function cancelOthers() {
 186        for (const w of waiters) w.cancelled = true;
 187      }
 188  
 189      for (const c of shuffled) {
 190        if (c.dir === 'recv') {
 191          const waiter = {
 192            cancelled: false,
 193            resolve: (result) => {
 194              if (resolved) return;
 195              resolved = true;
 196              cancelOthers();
 197              resolve({ id: c.id, value: result.value, ok: result.ok });
 198            }
 199          };
 200          waiters.push(waiter);
 201          c.ch.recvWaiters.push(waiter);
 202        } else {
 203          const waiter = {
 204            cancelled: false,
 205            value: c.value,
 206            resolve: () => {
 207              if (resolved) return;
 208              resolved = true;
 209              cancelOthers();
 210              resolve({ id: c.id, value: undefined, ok: true });
 211            },
 212            reject: (err) => {
 213              if (resolved) return;
 214              resolved = true;
 215              cancelOthers();
 216              reject(err);
 217            }
 218          };
 219          waiters.push(waiter);
 220          c.ch.sendWaiters.push(waiter);
 221        }
 222      }
 223    });
 224  }
 225