channel.mjs raw

   1  // TinyJS Runtime — Channels
   2  // Go channels with buffered/unbuffered semantics, close, and select.
   3  
   4  export class Channel {
   5    constructor(bufferSize = 0) {
   6      this.bufferSize = bufferSize;
   7      this.buffer = [];
   8      this.closed = false;
   9      this.sendWaiters = []; // [{value, resolve, reject}]
  10      this.recvWaiters = []; // [{resolve, reject}]
  11    }
  12  
  13    // Send a value. Blocks (awaits) if buffer full and no receiver waiting.
  14    async send(value) {
  15      if (this.closed) {
  16        throw new Error('send on closed channel');
  17      }
  18  
  19      // If a receiver is waiting, hand off directly.
  20      if (this.recvWaiters.length > 0) {
  21        const waiter = this.recvWaiters.shift();
  22        waiter.resolve({ value, ok: true });
  23        return;
  24      }
  25  
  26      // If buffer has space, enqueue.
  27      if (this.buffer.length < this.bufferSize) {
  28        this.buffer.push(value);
  29        return;
  30      }
  31  
  32      // Block until a receiver arrives.
  33      return new Promise((resolve, reject) => {
  34        this.sendWaiters.push({ value, resolve, reject });
  35      });
  36    }
  37  
  38    // Receive a value. Blocks (awaits) if buffer empty and no sender waiting.
  39    async recv() {
  40      // If buffer has data, return it. Then unblock a sender if waiting.
  41      if (this.buffer.length > 0) {
  42        const value = this.buffer.shift();
  43        if (this.sendWaiters.length > 0) {
  44          const sender = this.sendWaiters.shift();
  45          this.buffer.push(sender.value);
  46          sender.resolve();
  47        }
  48        return { value, ok: true };
  49      }
  50  
  51      // If a sender is waiting (unbuffered or buffer was empty), take directly.
  52      if (this.sendWaiters.length > 0) {
  53        const sender = this.sendWaiters.shift();
  54        sender.resolve();
  55        return { value: sender.value, ok: true };
  56      }
  57  
  58      // If closed and nothing buffered, return zero value.
  59      if (this.closed) {
  60        return { value: undefined, ok: false };
  61      }
  62  
  63      // Block until a sender arrives or channel closes.
  64      return new Promise((resolve) => {
  65        this.recvWaiters.push({ resolve });
  66      });
  67    }
  68  
  69    // Close the channel.
  70    close() {
  71      if (this.closed) {
  72        throw new Error('close of closed channel');
  73      }
  74      this.closed = true;
  75  
  76      // Wake all waiting receivers with zero value.
  77      while (this.recvWaiters.length > 0) {
  78        const waiter = this.recvWaiters.shift();
  79        waiter.resolve({ value: undefined, ok: false });
  80      }
  81  
  82      // Panic on waiting senders.
  83      while (this.sendWaiters.length > 0) {
  84        const sender = this.sendWaiters.shift();
  85        sender.reject(new Error('send on closed channel'));
  86      }
  87    }
  88  
  89    // Non-blocking try-send. Returns true if sent.
  90    trySend(value) {
  91      if (this.closed) throw new Error('send on closed channel');
  92  
  93      if (this.recvWaiters.length > 0) {
  94        const waiter = this.recvWaiters.shift();
  95        waiter.resolve({ value, ok: true });
  96        return true;
  97      }
  98  
  99      if (this.buffer.length < this.bufferSize) {
 100        this.buffer.push(value);
 101        return true;
 102      }
 103  
 104      return false;
 105    }
 106  
 107    // Non-blocking try-recv. Returns {value, ok, received}.
 108    tryRecv() {
 109      if (this.buffer.length > 0) {
 110        const value = this.buffer.shift();
 111        if (this.sendWaiters.length > 0) {
 112          const sender = this.sendWaiters.shift();
 113          this.buffer.push(sender.value);
 114          sender.resolve();
 115        }
 116        return { value, ok: true, received: true };
 117      }
 118  
 119      if (this.sendWaiters.length > 0) {
 120        const sender = this.sendWaiters.shift();
 121        sender.resolve();
 122        return { value: sender.value, ok: true, received: true };
 123      }
 124  
 125      if (this.closed) {
 126        return { value: undefined, ok: false, received: true };
 127      }
 128  
 129      return { value: undefined, ok: false, received: false };
 130    }
 131  }
 132  
 133  // Select statement implementation.
 134  // cases: [{ch, dir: 'send'|'recv', value?, id}]
 135  // hasDefault: boolean
 136  // Returns: {id, value, ok}
 137  export async function select(cases, hasDefault) {
 138    // Shuffle cases for fairness (Go spec requires pseudo-random selection).
 139    const shuffled = cases.map((c, i) => ({ ...c, origIndex: i }));
 140    for (let i = shuffled.length - 1; i > 0; i--) {
 141      const j = Math.floor(Math.random() * (i + 1));
 142      [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]];
 143    }
 144  
 145    // Try non-blocking first.
 146    for (const c of shuffled) {
 147      if (c.dir === 'send') {
 148        if (c.ch.trySend(c.value)) {
 149          return { id: c.id, value: undefined, ok: true };
 150        }
 151      } else {
 152        const result = c.ch.tryRecv();
 153        if (result.received) {
 154          return { id: c.id, value: result.value, ok: result.ok };
 155        }
 156      }
 157    }
 158  
 159    // Default case.
 160    if (hasDefault) {
 161      return { id: -1, value: undefined, ok: false };
 162    }
 163  
 164    // Block: race all cases.
 165    return new Promise((resolve) => {
 166      const cleanup = [];
 167      let resolved = false;
 168  
 169      for (const c of shuffled) {
 170        if (c.dir === 'recv') {
 171          const waiter = {
 172            resolve: (result) => {
 173              if (resolved) return;
 174              resolved = true;
 175              // Remove other waiters.
 176              for (const fn of cleanup) fn();
 177              resolve({ id: c.id, value: result.value, ok: result.ok });
 178            }
 179          };
 180          c.ch.recvWaiters.push(waiter);
 181          cleanup.push(() => {
 182            const idx = c.ch.recvWaiters.indexOf(waiter);
 183            if (idx >= 0) c.ch.recvWaiters.splice(idx, 1);
 184          });
 185        } else {
 186          const waiter = {
 187            value: c.value,
 188            resolve: () => {
 189              if (resolved) return;
 190              resolved = true;
 191              for (const fn of cleanup) fn();
 192              resolve({ id: c.id, value: undefined, ok: true });
 193            },
 194            reject: (err) => {
 195              if (resolved) return;
 196              resolved = true;
 197              for (const fn of cleanup) fn();
 198              // Propagate error (send on closed channel).
 199              throw err;
 200            }
 201          };
 202          c.ch.sendWaiters.push(waiter);
 203          cleanup.push(() => {
 204            const idx = c.ch.sendWaiters.indexOf(waiter);
 205            if (idx >= 0) c.ch.sendWaiters.splice(idx, 1);
 206          });
 207        }
 208      }
 209    });
 210  }
 211