xref: /plan9-contrib/sys/src/libthread/channel.c (revision 7dd7cddf99dd7472612f1413b4da293630e6b1bc)
1 #include <u.h>
2 #include <libc.h>
3 #include "assert.h"
4 #include "threadimpl.h"
5 
6 /* Channel structure.  S is the size of the buffer.  For unbuffered channels
7  * s is zero.  v is an array of s values.  If s is zero, v is unused.
8  * f and n represent the state of the queue pointed to by v.
9  * rcvrs and sndrs must be initialized to nil and should not be touched
10  * by code outside channel.c
11  */
12 
13 struct Channel {
14 	int	s;		// Size of the channel (may be zero)
15 	uint	f;		// Extraction point (insertion pt: (f + n) % s)
16 	uint	n;		// Number of values in the channel
17 	int	e;		// Element size
18 	int	freed;		// Set when channel is being deleted
19 	ulong	qused;		// Bitmap of used entries in rcvrs
20 	Alt	*qentry[32];	// Receivers/senders waiting
21 	uchar	v[1];		// Array of max(1, s) values in the channel
22 };
23 
24 static Lock chanlock;		// Central channel access lock
25 
26 ulong rendezvouses;
27 
28 void
29 chanfree(Channel *c) {
30 
31 	lock(&chanlock);
32 	if (c->qused == 0) {
33 		free(c);
34 	} else {
35 		c->freed = 1;
36 	}
37 	unlock(&chanlock);
38 }
39 
40 Channel *
41 chancreate(int elemsize, int elemcnt) {
42 	Channel *c;
43 
44 	if(elemcnt < 0 || elemsize <= 0)
45 		return nil;
46 	c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize);
47 	if(c == nil)
48 		return c;
49 	c->f = 0;
50 	c->n = 0;
51 	c->freed = 0;
52 	c->qused = 0;
53 	c->s = elemcnt;
54 	c->e = elemsize;
55 	_threaddebug(DBGCHAN, "chancreate %lux", c);
56 	return c;
57 }
58 
59 int
60 alt(Alt *alts) {
61 	Alt *a, *xa;
62 	Channel *c;
63 	uchar *v;
64 	int i, n, entry;
65 	Proc *p;
66 	Thread *t;
67 
68 	lock(&chanlock);
69 
70 	(*procp)->curthread->alt = alts;
71 	(*procp)->curthread->call = Callalt;
72 repeat:
73 
74 	// Test which channels can proceed
75 	n = 1;
76 	a = nil;
77 	entry = -1;
78 	for (xa = alts; xa->op; xa++) {
79 		if (xa->op == CHANNOP) continue;
80 		if (xa->op == CHANNOBLK) {
81 			if (a == nil) {
82 				(*procp)->curthread->call = Callnil;
83 				unlock(&chanlock);
84 				return xa - alts;
85 			} else
86 				continue;
87 		}
88 
89 		c = xa->c;
90 		if ((xa->op == CHANSND && c->n < c->s) ||
91 			(xa->op == CHANRCV && c->n)) {
92 				// There's room to send in the channel
93 				if (nrand(n) == 0) {
94 					a = xa;
95 					entry = -1;
96 				}
97 				n++;
98 		} else {
99 			// Test for blocked senders or receivers
100 			for (i = 0; i < 32; i++) {
101 				// Is it claimed?
102 				if (
103 					(c->qused & (1 << i))
104 					&& xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op
105 							// complementary op
106 					&& *c->qentry[i]->tag == nil
107 				) {
108 					// No
109 					if (nrand(n) == 0) {
110 						a = xa;
111 						entry = i;
112 					}
113 					n++;
114 					break;
115 				}
116 			}
117 		}
118 	}
119 
120 	if (a == nil) {
121 		// Nothing can proceed, enqueue on all channels
122 		c = nil;
123 		for (a = alts; a->op; a++) {
124 			Channel *cp;
125 
126 			if (a->op == CHANNOP || a->op == CHANNOBLK) continue;
127 			cp = a->c;
128 			a->tag = &c;
129 			for (i = 0; i < 32; i++) {
130 				if ((cp->qused & (1 << i)) == 0) {
131 					cp->qentry[i] = a;
132 					cp->qused |= a->q = 1 << i;
133 					break;
134 				}
135 			}
136 			threadassert(i != 32);
137 		}
138 
139 		// And wait for the rendez vous
140 		rendezvouses++;
141 		unlock(&chanlock);
142 		if (threadrendezvous((ulong)&c, 0) == ~0) {
143 			(*procp)->curthread->call = Callnil;
144 			return -1;
145 		}
146 
147 		lock(&chanlock);
148 
149 		/* We rendezed-vous on channel c, dequeue from all channels
150 		 * and find the Alt struct to which c belongs
151 		 */
152 		a = nil;
153 		for (xa = alts; xa->op; xa++) {
154 			Channel *xc;
155 
156 			if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue;
157 			xc = xa->c;
158 			xc->qused &= ~xa->q;
159 			if (xc == c)
160 				a = xa;
161 
162 		}
163 
164 		if (c->s) {
165 			// Buffered channel, try again
166 			sleep(0);
167 			goto repeat;
168 		}
169 
170 		unlock(&chanlock);
171 
172 		if (c->freed) chanfree(c);
173 
174 		p = *procp;
175 		t = p->curthread;
176 		if (t->exiting)
177 			threadexits(nil);
178 		(*procp)->curthread->call = Callnil;
179 		return a - alts;
180 	}
181 
182 	c = a->c;
183 	// Channel c can proceed
184 
185 	if (c->s) {
186 		// Send or receive via the buffered channel
187 		if (a->op == CHANSND) {
188 			v = c->v + ((c->f + c->n) % c->s) * c->e;
189 			if (a->v)
190 				memmove(v, a->v, c->e);
191 			else
192 				memset(v, 0, c->e);
193 			c->n++;
194 		} else {
195 			if (a->v) {
196 				v = c->v + (c->f % c->s) * c->e;
197 				memmove(a->v, v, c->e);
198 			}
199 			c->n--;
200 			c->f++;
201 		}
202 	}
203 	if (entry < 0)
204 		for (i = 0; i < 32; i++) {
205 			if (
206 				(c->qused & (1 << i))
207 				&& c->qentry[i]->op == (CHANSND+CHANRCV) - a->op
208 				&& *c->qentry[i]->tag == nil
209 			) {
210 				// Unblock peer process
211 				xa = c->qentry[i];
212 				*xa->tag = c;
213 
214 				rendezvouses++;
215 				unlock(&chanlock);
216 				if (threadrendezvous((ulong)xa->tag, 0) == ~0) {
217 					(*procp)->curthread->call = Callnil;
218 					return -1;
219 				}
220 				(*procp)->curthread->call = Callnil;
221 				return a - alts;
222 			}
223 		}
224 	if (entry >= 0) {
225 		xa = c->qentry[entry];
226 		if (a->op == CHANSND) {
227 			if (xa->v) {
228 				if (a->v)
229 					memmove(xa->v, a->v, c->e);
230 				else
231 					memset(xa->v, 0, c->e);
232 			}
233 		} else {
234 			if (a->v) {
235 				if (xa->v)
236 					memmove(a->v, xa->v, c->e);
237 				else
238 					memset(a->v, 0, c->e);
239 			}
240 		}
241 		*xa->tag = c;
242 
243 		rendezvouses++;
244 		unlock(&chanlock);
245 		if (threadrendezvous((ulong)xa->tag, 0) == ~0) {
246 			(*procp)->curthread->call = Callnil;
247 			return -1;
248 		}
249 		(*procp)->curthread->call = Callnil;
250 		return a - alts;
251 	}
252 	unlock(&chanlock);
253 	yield();
254 	(*procp)->curthread->call = Callnil;
255 	return a - alts;
256 }
257 
258 int
259 nbrecv(Channel *c, void *v) {
260 	Alt *a;
261 	int i;
262 
263 	lock(&chanlock);
264 	if (c->qused) // There's somebody waiting
265 		for (i = 0; i < 32; i++) {
266 			a = c->qentry[i];
267 			if (
268 				(c->qused & (1 << i))
269 				&& a->op == CHANSND
270 				&& *a->tag == nil
271 			) {
272 				*a->tag = c;
273 				if (c->n) {
274 					// There's an item to receive in the buffered channel
275 					if (v)
276 						memmove(v, c->v + (c->f % c->s) * c->e, c->e);
277 					c->n--;
278 					c->f++;
279 				} else {
280 					if (v) {
281 						if (a->v)
282 							memmove(v, a->v, c->e);
283 						else
284 							memset(v, 0, c->e);
285 					}
286 				}
287 
288 				rendezvouses++;
289 				unlock(&chanlock);
290 				if (threadrendezvous((ulong)a->tag, 0) == ~0)
291 					return -1;
292 				return 1;
293 			}
294 		}
295 	if (c->n) {
296 		// There's an item to receive in the buffered channel
297 		if (v)
298 			memmove(v, c->v + (c->f % c->s) * c->e, c->e);
299 		c->n--;
300 		c->f++;
301 		unlock(&chanlock);
302 		yield();
303 		return 1;
304 	}
305 	unlock(&chanlock);
306 	return 0;
307 }
308 
309 int
310 recv(Channel *c, void *v) {
311 	Alt a, *xa;
312 	Channel *tag;
313 	int i;
314 	Proc *p;
315 	Thread *t;
316 
317 
318 	lock(&chanlock);
319 retry:
320 	if (c->qused) // There's somebody waiting
321 		for (i = 0; i < 32; i++) {
322 			xa = c->qentry[i];
323 			if (
324 				(c->qused & (1 << i))
325 				&& xa->op == CHANSND
326 				&& *xa->tag == nil
327 			) {
328 				*xa->tag = c;
329 				if (c->n) {
330 					// There's an item to receive in the buffered channel
331 					if (v)
332 						memmove(v, c->v + (c->f % c->s) * c->e, c->e);
333 					c->n--;
334 					c->f++;
335 				} else {
336 					if (v) {
337 						if (xa->v)
338 							memmove(v, xa->v, c->e);
339 						else
340 							memset(v, 0, c->e);
341 					}
342 				}
343 
344 				rendezvouses++;
345 				unlock(&chanlock);
346 				if (threadrendezvous((ulong)xa->tag, 0) == ~0)
347 					return -1;
348 				return 1;
349 			}
350 	}
351 	if (c->n) {
352 		// There's an item to receive in the buffered channel
353 		if (v)
354 			memmove(v, c->v + (c->f % c->s) * c->e, c->e);
355 		c->n--;
356 		c->f++;
357 		unlock(&chanlock);
358 		yield();
359 		return 1;
360 	}
361 	// Unbuffered, or buffered but empty
362 	tag = nil;
363 	a.c = c;
364 	a.v = v;
365 	a.tag = &tag;
366 	a.op = CHANRCV;
367 	p = *procp;
368 	t = p->curthread;
369 	t->alt = &a;
370 	t->call = Callrcv;
371 
372 	// enqueue on the channel
373 	for (i = 0; i < 32; i++)
374 		if ((c->qused & (1 << i)) == 0) {
375 			c->qentry[i] = &a;
376 			c->qused |= a.q = 1 << i;
377 			break;
378 		}
379 
380 	rendezvouses++;
381 	unlock(&chanlock);
382 	if (threadrendezvous((ulong)&tag, 0) == ~0) {
383 		t->call = Callnil;
384 		return -1;
385 	}
386 	lock(&chanlock);
387 
388 	// dequeue from the channel
389 	c->qused &= ~a.q;
390 	if (c->s) goto retry;	// Buffered channels: try the queue again
391 	unlock(&chanlock);
392 	if (c->freed) chanfree(c);
393 	t->call = Callnil;
394 	if (t->exiting)
395 		threadexits(nil);
396 	return 1;
397 }
398 
399 int
400 nbsend(Channel *c, void *v) {
401 	Alt *a;
402 	int i;
403 
404 	lock(&chanlock);
405 	if (c->qused)	// Anybody waiting?
406 		for (i = 0; i < 32; i++) {
407 			a = c->qentry[i];
408 			if (
409 				(c->qused & (1 << i))
410 				&& a->op == CHANRCV
411 				&& *a->tag == nil
412 			) {
413 				*a->tag = c;
414 				if (c->n < c->s) {
415 					// There's room to send in the buffered channel
416 					if (v)
417 						memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
418 					else
419 						memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
420 					c->n++;
421 				} else {
422 					if (a->v) {
423 						if (v)
424 							memmove(a->v, v, c->e);
425 						else
426 							memset(a->v, 0, c->e);
427 					}
428 				}
429 
430 				rendezvouses++;
431 				unlock(&chanlock);
432 				if (threadrendezvous((ulong)a->tag, 0) == ~0)
433 					return -1;
434 				return 1;
435 			}
436 	}
437 	if (c->n < c->s) {
438 		// There's room to send in the buffered channel
439 		if (v)
440 			memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
441 		else
442 			memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
443 		c->n++;
444 		unlock(&chanlock);
445 		yield();
446 		return 1;
447 	}
448 	unlock(&chanlock);
449 	return 0;
450 }
451 
452 int
453 send(Channel *c, void *v) {
454 	Alt a, *xa;
455 	Channel *tag;
456 	int i;
457 	Proc *p;
458 	Thread *t;
459 
460 	lock(&chanlock);
461 retry:
462 	if (c->qused)	// Anybody waiting?
463 		for (i = 0; i < 32; i++) {
464 			xa = c->qentry[i];
465 			threadassert(!(c->qused & (1 << i)) || xa != nil);
466 			if (
467 				(c->qused & (1 << i))
468 				&& xa->op == CHANRCV
469 				&& *xa->tag == nil
470 			) {
471 				*xa->tag = c;
472 				if (c->n < c->s) {
473 					// There's room to send in the buffered channel
474 					if (v)
475 						memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
476 					else
477 						memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
478 					c->n++;
479 				} else {
480 					if (xa->v) {
481 						if (v)
482 							memmove(xa->v, v, c->e);
483 						else
484 							memset(xa->v, 0, c->e);
485 					}
486 				}
487 
488 				rendezvouses++;
489 				unlock(&chanlock);
490 				if (threadrendezvous((ulong)xa->tag, 0) == ~0)
491 					return -1;
492 				return 1;
493 			}
494 	}
495 	if (c->n < c->s) {
496 		// There's room to send in the buffered channel
497 		if (v)
498 			memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
499 		else
500 			memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
501 		c->n++;
502 		unlock(&chanlock);
503 		yield();
504 		return 1;
505 	}
506 	tag = nil;
507 	a.c = c;
508 	a.v = v;
509 	a.tag = &tag;
510 	a.op = CHANSND;
511 	p = *procp;
512 	t = p->curthread;
513 	t->alt = &a;
514 	t->call = Callsnd;
515 
516 	// enqueue on the channel
517 	for (i = 0; i < 32; i++)
518 		if ((c->qused & (1 << i)) == 0) {
519 			c->qentry[i] = &a;
520 			c->qused |= a.q = 1 << i;
521 			break;
522 		}
523 	rendezvouses++;
524 	unlock(&chanlock);
525 	if (threadrendezvous((ulong)&tag, 0) == ~0) {
526 		t->call = Callnil;
527 		return -1;
528 	}
529 	// Unbuffered channels: data was transferred; dequeue
530 	lock(&chanlock);
531 	// dequeue from the channel
532 	c->qused &= ~a.q;
533 	if (c->s) goto retry;	// Buffered channels: try the queue again
534 	unlock(&chanlock);
535 	if (c->freed) chanfree(c);
536 	t->call = Callnil;
537 	if (t->exiting)
538 		threadexits(nil);
539 	return 1;
540 }
541 
542 int
543 sendul(Channel *c, ulong v) {
544 	threadassert(c->e == sizeof(ulong));
545 	return send(c, &v);
546 }
547 
548 ulong
549 recvul(Channel *c) {
550 	ulong v;
551 
552 	threadassert(c->e == sizeof(ulong));
553 	recv(c, &v);
554 	return v;
555 }
556 
557 int
558 sendp(Channel *c, void *v) {
559 	threadassert(c->e == sizeof(void *));
560 	return send(c, &v);
561 }
562 
563 void *
564 recvp(Channel *c) {
565 	void * v;
566 
567 	threadassert(c->e == sizeof(void *));
568 	recv(c, &v);
569 	return v;
570 }
571 
572 int
573 nbsendul(Channel *c, ulong v) {
574 	threadassert(c->e == sizeof(ulong));
575 	return nbsend(c, &v);
576 }
577 
578 ulong
579 nbrecvul(Channel *c) {
580 	ulong v;
581 
582 	threadassert(c->e == sizeof(ulong));
583 	if (nbrecv(c, &v) == 0)
584 		return 0;
585 	return v;
586 }
587 
588 int
589 nbsendp(Channel *c, void *v) {
590 	threadassert(c->e == sizeof(void *));
591 	return nbsend(c, &v);
592 }
593 
594 void *
595 nbrecvp(Channel *c) {
596 	void * v;
597 
598 	threadassert(c->e == sizeof(void *));
599 	if (nbrecv(c, &v) == 0)
600 		return nil;
601 	return v;
602 }
603