xref: /plan9-contrib/sys/src/libthread/channel.c (revision 59cc4ca53493a3c6d2349fe2b7f7c40f7dce7294)
1 #include <u.h>
2 #include <libc.h>
3 #include "assert.h"
4 #include "threadimpl.h"
5 
6 static Lock chanlock;		// Central channel access lock
7 
8 void
9 chanfree(Channel *c) {
10 
11 	lock(&chanlock);
12 	if (c->qused == 0) {
13 		free(c);
14 	} else {
15 		c->freed = 1;
16 	}
17 	unlock(&chanlock);
18 }
19 
20 int
21 chaninit(Channel *c, int elemsize, int elemcnt) {
22 	if(elemcnt < 0 || elemsize <= 0 || c == nil)
23 		return -1;
24 	c->f = 0;
25 	c->n = 0;
26 	c->freed = 0;
27 	c->qused = 0;
28 	c->s = elemcnt;
29 	c->e = elemsize;
30 	_threaddebug(DBGCHAN, "chaninit %lux", c);
31 	return 1;
32 }
33 
34 Channel *
35 chancreate(int elemsize, int elemcnt) {
36 	Channel *c;
37 
38 	if(elemcnt < 0 || elemsize <= 0)
39 		return nil;
40 	c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize);
41 	if(c == nil)
42 		return c;
43 	c->f = 0;
44 	c->n = 0;
45 	c->freed = 0;
46 	c->qused = 0;
47 	c->s = elemcnt;
48 	c->e = elemsize;
49 	_threaddebug(DBGCHAN, "chancreate %lux", c);
50 	return c;
51 }
52 
53 int
54 alt(Alt *alts) {
55 	Alt *a, *xa;
56 	Channel *c;
57 	uchar *v;
58 	int i, n, entry;
59 	Proc *p;
60 	Thread *t;
61 
62 	lock(&chanlock);
63 
64 	(*procp)->curthread->alt = alts;
65 	(*procp)->curthread->call = Callalt;
66 repeat:
67 
68 	// Test which channels can proceed
69 	n = 1;
70 	a = nil;
71 	entry = -1;
72 	for (xa = alts; xa->op; xa++) {
73 		if (xa->op == CHANNOP) continue;
74 		if (xa->op == CHANNOBLK) {
75 			if (a == nil) {
76 				(*procp)->curthread->call = Callnil;
77 				unlock(&chanlock);
78 				return xa - alts;
79 			} else
80 				break;
81 		}
82 
83 		c = xa->c;
84 		if ((xa->op == CHANSND && c->n < c->s) ||
85 			(xa->op == CHANRCV && c->n)) {
86 				// There's room to send in the channel
87 				if (nrand(n) == 0) {
88 					a = xa;
89 					entry = -1;
90 				}
91 				n++;
92 		} else {
93 			// Test for blocked senders or receivers
94 			for (i = 0; i < 32; i++) {
95 				// Is it claimed?
96 				if (
97 					(c->qused & (1 << i))
98 					&& xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op
99 							// complementary op
100 					&& *c->qentry[i]->tag == nil
101 				) {
102 					// No
103 					if (nrand(n) == 0) {
104 						a = xa;
105 						entry = i;
106 					}
107 					n++;
108 					break;
109 				}
110 			}
111 		}
112 	}
113 
114 	if (a == nil) {
115 		// Nothing can proceed, enqueue on all channels
116 		c = nil;
117 		for (a = alts; a->op; a++) {
118 			Channel *cp;
119 
120 			if (a->op == CHANNOP || a->op == CHANNOBLK) continue;
121 			cp = a->c;
122 			a->tag = &c;
123 			for (i = 0; i < 32; i++) {
124 				if ((cp->qused & (1 << i)) == 0) {
125 					cp->qentry[i] = a;
126 					cp->qused |= a->q = 1 << i;
127 					break;
128 				}
129 			}
130 			threadassert(i != 32);
131 		}
132 
133 		// And wait for the rendez vous
134 		unlock(&chanlock);
135 		if (_threadrendezvous((ulong)&c, 0) == ~0) {
136 			(*procp)->curthread->call = Callnil;
137 			return -1;
138 		}
139 
140 		lock(&chanlock);
141 
142 		/* We rendezed-vous on channel c, dequeue from all channels
143 		 * and find the Alt struct to which c belongs
144 		 */
145 		a = nil;
146 		for (xa = alts; xa->op; xa++) {
147 			Channel *xc;
148 
149 			if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue;
150 			xc = xa->c;
151 			xc->qused &= ~xa->q;
152 			if (xc == c)
153 				a = xa;
154 
155 		}
156 
157 		if (c->s) {
158 			// Buffered channel, try again
159 			sleep(0);
160 			goto repeat;
161 		}
162 
163 		unlock(&chanlock);
164 
165 		if (c->freed) chanfree(c);
166 
167 		p = *procp;
168 		t = p->curthread;
169 		if (t->exiting)
170 			threadexits(nil);
171 		(*procp)->curthread->call = Callnil;
172 		return a - alts;
173 	}
174 
175 	c = a->c;
176 	// Channel c can proceed
177 
178 	if (c->s) {
179 		// Send or receive via the buffered channel
180 		if (a->op == CHANSND) {
181 			v = c->v + ((c->f + c->n) % c->s) * c->e;
182 			if (a->v)
183 				memmove(v, a->v, c->e);
184 			else
185 				memset(v, 0, c->e);
186 			c->n++;
187 		} else {
188 			if (a->v) {
189 				v = c->v + (c->f % c->s) * c->e;
190 				memmove(a->v, v, c->e);
191 			}
192 			c->n--;
193 			c->f++;
194 		}
195 	}
196 	if (entry < 0)
197 		for (i = 0; i < 32; i++) {
198 			if (
199 				(c->qused & (1 << i))
200 				&& c->qentry[i]->op == (CHANSND+CHANRCV) - a->op
201 				&& *c->qentry[i]->tag == nil
202 			) {
203 				// Unblock peer process
204 				xa = c->qentry[i];
205 				*xa->tag = c;
206 
207 				unlock(&chanlock);
208 				if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
209 					(*procp)->curthread->call = Callnil;
210 					return -1;
211 				}
212 				(*procp)->curthread->call = Callnil;
213 				return a - alts;
214 			}
215 		}
216 	if (entry >= 0) {
217 		xa = c->qentry[entry];
218 		if (a->op == CHANSND) {
219 			if (xa->v) {
220 				if (a->v)
221 					memmove(xa->v, a->v, c->e);
222 				else
223 					memset(xa->v, 0, c->e);
224 			}
225 		} else {
226 			if (a->v) {
227 				if (xa->v)
228 					memmove(a->v, xa->v, c->e);
229 				else
230 					memset(a->v, 0, c->e);
231 			}
232 		}
233 		*xa->tag = c;
234 
235 		unlock(&chanlock);
236 		if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
237 			(*procp)->curthread->call = Callnil;
238 			return -1;
239 		}
240 		(*procp)->curthread->call = Callnil;
241 		return a - alts;
242 	}
243 	unlock(&chanlock);
244 	yield();
245 	(*procp)->curthread->call = Callnil;
246 	return a - alts;
247 }
248 
249 int
250 nbrecv(Channel *c, void *v) {
251 	Alt *a;
252 	int i;
253 
254 	lock(&chanlock);
255 	if (c->qused) // There's somebody waiting
256 		for (i = 0; i < 32; i++) {
257 			a = c->qentry[i];
258 			if (
259 				(c->qused & (1 << i))
260 				&& a->op == CHANSND
261 				&& *a->tag == nil
262 			) {
263 				*a->tag = c;
264 				if (c->n) {
265 					// There's an item to receive in the buffered channel
266 					if (v)
267 						memmove(v, c->v + (c->f % c->s) * c->e, c->e);
268 					c->n--;
269 					c->f++;
270 				} else {
271 					if (v) {
272 						if (a->v)
273 							memmove(v, a->v, c->e);
274 						else
275 							memset(v, 0, c->e);
276 					}
277 				}
278 
279 				unlock(&chanlock);
280 				if (_threadrendezvous((ulong)a->tag, 0) == ~0)
281 					return -1;
282 				return 1;
283 			}
284 		}
285 	if (c->n) {
286 		// There's an item to receive in the buffered channel
287 		if (v)
288 			memmove(v, c->v + (c->f % c->s) * c->e, c->e);
289 		c->n--;
290 		c->f++;
291 		unlock(&chanlock);
292 		yield();
293 		return 1;
294 	}
295 	unlock(&chanlock);
296 	return 0;
297 }
298 
299 int
300 recv(Channel *c, void *v) {
301 	Alt a, *xa;
302 	Channel *tag;
303 	int i;
304 	Proc *p;
305 	Thread *t;
306 
307 
308 	lock(&chanlock);
309 retry:
310 	if (c->qused) // There's somebody waiting
311 		for (i = 0; i < 32; i++) {
312 			xa = c->qentry[i];
313 			if (
314 				(c->qused & (1 << i))
315 				&& xa->op == CHANSND
316 				&& *xa->tag == nil
317 			) {
318 				*xa->tag = c;
319 				if (c->n) {
320 					// There's an item to receive in the buffered channel
321 					if (v)
322 						memmove(v, c->v + (c->f % c->s) * c->e, c->e);
323 					c->n--;
324 					c->f++;
325 				} else {
326 					if (v) {
327 						if (xa->v)
328 							memmove(v, xa->v, c->e);
329 						else
330 							memset(v, 0, c->e);
331 					}
332 				}
333 
334 				unlock(&chanlock);
335 				if (_threadrendezvous((ulong)xa->tag, 0) == ~0)
336 					return -1;
337 				return 1;
338 			}
339 	}
340 	if (c->n) {
341 		// There's an item to receive in the buffered channel
342 		if (v)
343 			memmove(v, c->v + (c->f % c->s) * c->e, c->e);
344 		c->n--;
345 		c->f++;
346 		unlock(&chanlock);
347 		yield();
348 		return 1;
349 	}
350 	// Unbuffered, or buffered but empty
351 	tag = nil;
352 	a.c = c;
353 	a.v = v;
354 	a.tag = &tag;
355 	a.op = CHANRCV;
356 	p = *procp;
357 	t = p->curthread;
358 	t->alt = &a;
359 	t->call = Callrcv;
360 
361 	// enqueue on the channel
362 	for (i = 0; i < 32; i++)
363 		if ((c->qused & (1 << i)) == 0) {
364 			c->qentry[i] = &a;
365 			c->qused |= a.q = 1 << i;
366 			break;
367 		}
368 
369 	unlock(&chanlock);
370 	if (_threadrendezvous((ulong)&tag, 0) == ~0) {
371 		t->call = Callnil;
372 		return -1;
373 	}
374 	lock(&chanlock);
375 
376 	// dequeue from the channel
377 	c->qused &= ~a.q;
378 	if (c->s) goto retry;	// Buffered channels: try the queue again
379 	unlock(&chanlock);
380 	if (c->freed) chanfree(c);
381 	t->call = Callnil;
382 	if (t->exiting)
383 		threadexits(nil);
384 	return 1;
385 }
386 
387 int
388 nbsend(Channel *c, void *v) {
389 	Alt *a;
390 	int i;
391 
392 	lock(&chanlock);
393 	if (c->qused)	// Anybody waiting?
394 		for (i = 0; i < 32; i++) {
395 			a = c->qentry[i];
396 			if (
397 				(c->qused & (1 << i))
398 				&& a->op == CHANRCV
399 				&& *a->tag == nil
400 			) {
401 				*a->tag = c;
402 				if (c->n < c->s) {
403 					// There's room to send in the buffered channel
404 					if (v)
405 						memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
406 					else
407 						memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
408 					c->n++;
409 				} else {
410 					if (a->v) {
411 						if (v)
412 							memmove(a->v, v, c->e);
413 						else
414 							memset(a->v, 0, c->e);
415 					}
416 				}
417 
418 				unlock(&chanlock);
419 				if (_threadrendezvous((ulong)a->tag, 0) == ~0)
420 					return -1;
421 				return 1;
422 			}
423 	}
424 	if (c->n < c->s) {
425 		// There's room to send in the buffered channel
426 		if (v)
427 			memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
428 		else
429 			memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
430 		c->n++;
431 		unlock(&chanlock);
432 		yield();
433 		return 1;
434 	}
435 	unlock(&chanlock);
436 	return 0;
437 }
438 
439 int
440 send(Channel *c, void *v) {
441 	Alt a, *xa;
442 	Channel *tag;
443 	int i;
444 	Proc *p;
445 	Thread *t;
446 
447 	lock(&chanlock);
448 retry:
449 	if (c->qused)	// Anybody waiting?
450 		for (i = 0; i < 32; i++) {
451 			xa = c->qentry[i];
452 			threadassert(!(c->qused & (1 << i)) || xa != nil);
453 			if (
454 				(c->qused & (1 << i))
455 				&& xa->op == CHANRCV
456 				&& *xa->tag == nil
457 			) {
458 				*xa->tag = c;
459 				if (c->n < c->s) {
460 					// There's room to send in the buffered channel
461 					if (v)
462 						memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
463 					else
464 						memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
465 					c->n++;
466 				} else {
467 					if (xa->v) {
468 						if (v)
469 							memmove(xa->v, v, c->e);
470 						else
471 							memset(xa->v, 0, c->e);
472 					}
473 				}
474 
475 				unlock(&chanlock);
476 				if (_threadrendezvous((ulong)xa->tag, 0) == ~0)
477 					return -1;
478 				return 1;
479 			}
480 	}
481 	if (c->n < c->s) {
482 		// There's room to send in the buffered channel
483 		if (v)
484 			memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
485 		else
486 			memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
487 		c->n++;
488 		unlock(&chanlock);
489 		yield();
490 		return 1;
491 	}
492 	tag = nil;
493 	a.c = c;
494 	a.v = v;
495 	a.tag = &tag;
496 	a.op = CHANSND;
497 	p = *procp;
498 	t = p->curthread;
499 	t->alt = &a;
500 	t->call = Callsnd;
501 
502 	// enqueue on the channel
503 	for (i = 0; i < 32; i++)
504 		if ((c->qused & (1 << i)) == 0) {
505 			c->qentry[i] = &a;
506 			c->qused |= a.q = 1 << i;
507 			break;
508 		}
509 	unlock(&chanlock);
510 	if (_threadrendezvous((ulong)&tag, 0) == ~0) {
511 		t->call = Callnil;
512 		return -1;
513 	}
514 	// Unbuffered channels: data was transferred; dequeue
515 	lock(&chanlock);
516 	// dequeue from the channel
517 	c->qused &= ~a.q;
518 	if (c->s) goto retry;	// Buffered channels: try the queue again
519 	unlock(&chanlock);
520 	if (c->freed) chanfree(c);
521 	t->call = Callnil;
522 	if (t->exiting)
523 		threadexits(nil);
524 	return 1;
525 }
526 
527 int
528 sendul(Channel *c, ulong v) {
529 	threadassert(c->e == sizeof(ulong));
530 	return send(c, &v);
531 }
532 
533 ulong
534 recvul(Channel *c) {
535 	ulong v;
536 
537 	threadassert(c->e == sizeof(ulong));
538 	recv(c, &v);
539 	return v;
540 }
541 
542 int
543 sendp(Channel *c, void *v) {
544 	threadassert(c->e == sizeof(void *));
545 	return send(c, &v);
546 }
547 
548 void *
549 recvp(Channel *c) {
550 	void * v;
551 
552 	threadassert(c->e == sizeof(void *));
553 	recv(c, &v);
554 	return v;
555 }
556 
557 int
558 nbsendul(Channel *c, ulong v) {
559 	threadassert(c->e == sizeof(ulong));
560 	return nbsend(c, &v);
561 }
562 
563 ulong
564 nbrecvul(Channel *c) {
565 	ulong v;
566 
567 	threadassert(c->e == sizeof(ulong));
568 	if (nbrecv(c, &v) == 0)
569 		return 0;
570 	return v;
571 }
572 
573 int
574 nbsendp(Channel *c, void *v) {
575 	threadassert(c->e == sizeof(void *));
576 	return nbsend(c, &v);
577 }
578 
579 void *
580 nbrecvp(Channel *c) {
581 	void * v;
582 
583 	threadassert(c->e == sizeof(void *));
584 	if (nbrecv(c, &v) == 0)
585 		return nil;
586 	return v;
587 }
588