channel.mjs raw
1 // TinyJS Runtime — Channels
2 // Go channels with buffered/unbuffered semantics, close, and select.
3
4 export class Channel {
5 constructor(bufferSize = 0) {
6 this.bufferSize = bufferSize;
7 this.buffer = [];
8 this.closed = false;
9 this.sendWaiters = []; // [{value, resolve, reject}]
10 this.recvWaiters = []; // [{resolve, reject}]
11 }
12
13 // Skip cancelled waiters (from resolved selects).
14 _shiftRecv() {
15 while (this.recvWaiters.length > 0) {
16 const w = this.recvWaiters.shift();
17 if (!w.cancelled) return w;
18 }
19 return null;
20 }
21
22 _shiftSend() {
23 while (this.sendWaiters.length > 0) {
24 const w = this.sendWaiters.shift();
25 if (!w.cancelled) return w;
26 }
27 return null;
28 }
29
30 // Send a value. Blocks (awaits) if buffer full and no receiver waiting.
31 async send(value) {
32 if (this.closed) {
33 throw new Error('send on closed channel');
34 }
35
36 // If a receiver is waiting, hand off directly.
37 const waiter = this._shiftRecv();
38 if (waiter) {
39 waiter.resolve({ value, ok: true });
40 return;
41 }
42
43 // If buffer has space, enqueue.
44 if (this.buffer.length < this.bufferSize) {
45 this.buffer.push(value);
46 return;
47 }
48
49 // Block until a receiver arrives.
50 return new Promise((resolve, reject) => {
51 this.sendWaiters.push({ value, resolve, reject });
52 });
53 }
54
55 // Receive a value. Blocks (awaits) if buffer empty and no sender waiting.
56 async recv() {
57 // If buffer has data, return it. Then unblock a sender if waiting.
58 if (this.buffer.length > 0) {
59 const value = this.buffer.shift();
60 const sender = this._shiftSend();
61 if (sender) {
62 this.buffer.push(sender.value);
63 sender.resolve();
64 }
65 return { value, ok: true };
66 }
67
68 // If a sender is waiting (unbuffered or buffer was empty), take directly.
69 const sender = this._shiftSend();
70 if (sender) {
71 sender.resolve();
72 return { value: sender.value, ok: true };
73 }
74
75 // If closed and nothing buffered, return zero value.
76 if (this.closed) {
77 return { value: undefined, ok: false };
78 }
79
80 // Block until a sender arrives or channel closes.
81 return new Promise((resolve) => {
82 this.recvWaiters.push({ resolve });
83 });
84 }
85
86 // Close the channel.
87 close() {
88 if (this.closed) {
89 throw new Error('close of closed channel');
90 }
91 this.closed = true;
92
93 // Wake all waiting receivers with zero value.
94 let w;
95 while ((w = this._shiftRecv())) {
96 w.resolve({ value: undefined, ok: false });
97 }
98
99 // Panic on waiting senders.
100 while ((w = this._shiftSend())) {
101 w.reject(new Error('send on closed channel'));
102 }
103 }
104
105 // Non-blocking try-send. Returns true if sent.
106 trySend(value) {
107 if (this.closed) throw new Error('send on closed channel');
108
109 const waiter = this._shiftRecv();
110 if (waiter) {
111 waiter.resolve({ value, ok: true });
112 return true;
113 }
114
115 if (this.buffer.length < this.bufferSize) {
116 this.buffer.push(value);
117 return true;
118 }
119
120 return false;
121 }
122
123 // Non-blocking try-recv. Returns {value, ok, received}.
124 tryRecv() {
125 if (this.buffer.length > 0) {
126 const value = this.buffer.shift();
127 const sender = this._shiftSend();
128 if (sender) {
129 this.buffer.push(sender.value);
130 sender.resolve();
131 }
132 return { value, ok: true, received: true };
133 }
134
135 const sender = this._shiftSend();
136 if (sender) {
137 sender.resolve();
138 return { value: sender.value, ok: true, received: true };
139 }
140
141 if (this.closed) {
142 return { value: undefined, ok: false, received: true };
143 }
144
145 return { value: undefined, ok: false, received: false };
146 }
147 }
148
149 // Select statement implementation.
150 // cases: [{ch, dir: 'send'|'recv', value?, id}]
151 // hasDefault: boolean
152 // Returns: {id, value, ok}
153 export async function select(cases, hasDefault) {
154 // Shuffle cases for fairness (Go spec requires pseudo-random selection).
155 const shuffled = cases.map((c, i) => ({ ...c, origIndex: i }));
156 for (let i = shuffled.length - 1; i > 0; i--) {
157 const j = Math.floor(Math.random() * (i + 1));
158 [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]];
159 }
160
161 // Try non-blocking first.
162 for (const c of shuffled) {
163 if (c.dir === 'send') {
164 if (c.ch.trySend(c.value)) {
165 return { id: c.id, value: undefined, ok: true };
166 }
167 } else {
168 const result = c.ch.tryRecv();
169 if (result.received) {
170 return { id: c.id, value: result.value, ok: result.ok };
171 }
172 }
173 }
174
175 // Default case.
176 if (hasDefault) {
177 return { id: -1, value: undefined, ok: false };
178 }
179
180 // Block: race all cases. Use cancel flags instead of splice for O(1) cleanup.
181 return new Promise((resolve, reject) => {
182 const waiters = [];
183 let resolved = false;
184
185 function cancelOthers() {
186 for (const w of waiters) w.cancelled = true;
187 }
188
189 for (const c of shuffled) {
190 if (c.dir === 'recv') {
191 const waiter = {
192 cancelled: false,
193 resolve: (result) => {
194 if (resolved) return;
195 resolved = true;
196 cancelOthers();
197 resolve({ id: c.id, value: result.value, ok: result.ok });
198 }
199 };
200 waiters.push(waiter);
201 c.ch.recvWaiters.push(waiter);
202 } else {
203 const waiter = {
204 cancelled: false,
205 value: c.value,
206 resolve: () => {
207 if (resolved) return;
208 resolved = true;
209 cancelOthers();
210 resolve({ id: c.id, value: undefined, ok: true });
211 },
212 reject: (err) => {
213 if (resolved) return;
214 resolved = true;
215 cancelOthers();
216 reject(err);
217 }
218 };
219 waiters.push(waiter);
220 c.ch.sendWaiters.push(waiter);
221 }
222 }
223 });
224 }
225