xref: /plan9-contrib/sys/src/libthread/channel.c (revision a6a9e07217f318acf170f99684a55fba5200524f)
1 #include <u.h>
2 #include <libc.h>
3 #include <thread.h>
4 #include "threadimpl.h"
5 
6 static Lock chanlock;		/* central channel access lock */
7 
8 static void enqueue(Alt*, Channel**);
9 static void dequeue(Alt*);
10 static int canexec(Alt*);
11 static int altexec(Alt*, int);
12 
13 static void
14 _chanfree(Channel *c)
15 {
16 	int i, inuse;
17 
18 	inuse = 0;
19 	for(i = 0; i < c->nentry; i++)
20 		if(c->qentry[i])
21 			inuse = 1;
22 	if(inuse)
23 		c->freed = 1;
24 	else{
25 		if(c->qentry)
26 			free(c->qentry);
27 		free(c);
28 	}
29 }
30 
31 void
32 chanfree(Channel *c)
33 {
34 	lock(&chanlock);
35 	_chanfree(c);
36 	unlock(&chanlock);
37 }
38 
39 int
40 chaninit(Channel *c, int elemsize, int elemcnt)
41 {
42 	if(elemcnt < 0 || elemsize <= 0 || c == nil)
43 		return -1;
44 	c->f = 0;
45 	c->n = 0;
46 	c->freed = 0;
47 	c->e = elemsize;
48 	c->s = elemcnt;
49 	_threaddebug(DBGCHAN, "chaninit %p", c);
50 	return 1;
51 }
52 
53 Channel*
54 chancreate(int elemsize, int elemcnt)
55 {
56 	Channel *c;
57 
58 	if(elemcnt < 0 || elemsize <= 0)
59 		return nil;
60 	c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
61 	c->e = elemsize;
62 	c->s = elemcnt;
63 	_threaddebug(DBGCHAN, "chancreate %p", c);
64 	return c;
65 }
66 
67 int
68 alt(Alt *alts)
69 {
70 	Alt *a, *xa;
71 	Channel volatile *c;
72 	int n, s;
73 	ulong r;
74 	Thread *t;
75 
76 	/*
77 	 * The point of going splhi here is that note handlers
78 	 * might reasonably want to use channel operations,
79 	 * but that will hang if the note comes while we hold the
80 	 * chanlock.  Instead, we delay the note until we've dropped
81 	 * the lock.
82 	 */
83 	t = _threadgetproc()->thread;
84 	if(t->moribund || _threadexitsallstatus)
85 		yield();	/* won't return */
86 	s = _procsplhi();
87 	lock(&chanlock);
88 	t->alt = alts;
89 	t->chan = Chanalt;
90 
91 	/* test whether any channels can proceed */
92 	n = 0;
93 	a = nil;
94 
95 	for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
96 		xa->entryno = -1;
97 		if(xa->op == CHANNOP)
98 			continue;
99 
100 		c = xa->c;
101 		if(c==nil){
102 			unlock(&chanlock);
103 			_procsplx(s);
104 			t->chan = Channone;
105 			return -1;
106 		}
107 
108 		if(canexec(xa))
109 			if(nrand(++n) == 0)
110 				a = xa;
111 	}
112 
113 	if(a==nil){
114 		/* nothing can proceed */
115 		if(xa->op == CHANNOBLK){
116 			unlock(&chanlock);
117 			_procsplx(s);
118 			t->chan = Channone;
119 			return xa - alts;
120 		}
121 
122 		/* enqueue on all channels. */
123 		c = nil;
124 		for(xa=alts; xa->op!=CHANEND; xa++){
125 			if(xa->op==CHANNOP)
126 				continue;
127 			enqueue(xa, &c);
128 		}
129 
130 		/*
131 		 * wait for successful rendezvous.
132 		 * we can't just give up if the rendezvous
133 		 * is interrupted -- someone else might come
134 		 * along and try to rendezvous with us, so
135 		 * we need to be here.
136 		 */
137 	    Again:
138 		unlock(&chanlock);
139 		_procsplx(s);
140 		r = _threadrendezvous((ulong)&c, 0);
141 		s = _procsplhi();
142 		lock(&chanlock);
143 
144 		if(r==~0){		/* interrupted */
145 			if(c!=nil)		/* someone will meet us; go back */
146 				goto Again;
147 			c = (Channel*)~0;	/* so no one tries to meet us */
148 		}
149 
150 		/* dequeue from channels, find selected one */
151 		a = nil;
152 		for(xa=alts; xa->op!=CHANEND; xa++){
153 			if(xa->op==CHANNOP)
154 				continue;
155 			if(xa->c == c)
156 				a = xa;
157 			dequeue(xa);
158 		}
159 		unlock(&chanlock);
160 		_procsplx(s);
161 		if(a == nil){	/* we were interrupted */
162 			assert(c==(Channel*)~0);
163 			return -1;
164 		}
165 	}else{
166 		altexec(a, s);	/* unlocks chanlock, does splx */
167 	}
168 	_sched();
169 	t->chan = Channone;
170 	return a - alts;
171 }
172 
173 static int
174 runop(int op, Channel *c, void *v, int nb)
175 {
176 	int r;
177 	Alt a[2];
178 
179 	/*
180 	 * we could do this without calling alt,
181 	 * but the only reason would be performance,
182 	 * and i'm not convinced it matters.
183 	 */
184 	a[0].op = op;
185 	a[0].c = c;
186 	a[0].v = v;
187 	a[1].op = CHANEND;
188 	if(nb)
189 		a[1].op = CHANNOBLK;
190 	switch(r=alt(a)){
191 	case -1:	/* interrupted */
192 		return -1;
193 	case 1:	/* nonblocking, didn't accomplish anything */
194 		assert(nb);
195 		return 0;
196 	case 0:
197 		return 1;
198 	default:
199 		fprint(2, "ERROR: channel alt returned %d\n", r);
200 		abort();
201 		return -1;
202 	}
203 }
204 
205 int
206 recv(Channel *c, void *v)
207 {
208 	return runop(CHANRCV, c, v, 0);
209 }
210 
211 int
212 nbrecv(Channel *c, void *v)
213 {
214 	return runop(CHANRCV, c, v, 1);
215 }
216 
217 int
218 send(Channel *c, void *v)
219 {
220 	return runop(CHANSND, c, v, 0);
221 }
222 
223 int
224 nbsend(Channel *c, void *v)
225 {
226 	return runop(CHANSND, c, v, 1);
227 }
228 
229 static void
230 channelsize(Channel *c, int sz)
231 {
232 	if(c->e != sz){
233 		fprint(2, "expected channel with elements of size %d, got size %d",
234 			sz, c->e);
235 		abort();
236 	}
237 }
238 
239 int
240 sendul(Channel *c, ulong v)
241 {
242 	channelsize(c, sizeof(ulong));
243 	return send(c, &v);
244 }
245 
246 ulong
247 recvul(Channel *c)
248 {
249 	ulong v;
250 
251 	channelsize(c, sizeof(ulong));
252 	if(recv(c, &v) < 0)
253 		return ~0;
254 	return v;
255 }
256 
257 int
258 sendp(Channel *c, void *v)
259 {
260 	channelsize(c, sizeof(void*));
261 	return send(c, &v);
262 }
263 
264 void*
265 recvp(Channel *c)
266 {
267 	void *v;
268 
269 	channelsize(c, sizeof(void*));
270 	if(recv(c, &v) < 0)
271 		return nil;
272 	return v;
273 }
274 
275 int
276 nbsendul(Channel *c, ulong v)
277 {
278 	channelsize(c, sizeof(ulong));
279 	return nbsend(c, &v);
280 }
281 
282 ulong
283 nbrecvul(Channel *c)
284 {
285 	ulong v;
286 
287 	channelsize(c, sizeof(ulong));
288 	if(nbrecv(c, &v) == 0)
289 		return 0;
290 	return v;
291 }
292 
293 int
294 nbsendp(Channel *c, void *v)
295 {
296 	channelsize(c, sizeof(void*));
297 	return nbsend(c, &v);
298 }
299 
300 void*
301 nbrecvp(Channel *c)
302 {
303 	void *v;
304 
305 	channelsize(c, sizeof(void*));
306 	if(nbrecv(c, &v) == 0)
307 		return nil;
308 	return v;
309 }
310 
311 static int
312 emptyentry(Channel *c)
313 {
314 	int i, extra;
315 
316 	assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
317 
318 	for(i=0; i<c->nentry; i++)
319 		if(c->qentry[i]==nil)
320 			return i;
321 
322 	extra = 16;
323 	c->nentry += extra;
324 	c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
325 	if(c->qentry == nil)
326 		sysfatal("realloc channel entries: %r");
327 	memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
328 	return i;
329 }
330 
331 static void
332 enqueue(Alt *a, Channel **c)
333 {
334 	int i;
335 
336 	_threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
337 	a->tag = c;
338 	i = emptyentry(a->c);
339 	a->c->qentry[i] = a;
340 }
341 
342 static void
343 dequeue(Alt *a)
344 {
345 	int i;
346 	Channel *c;
347 
348 	c = a->c;
349 	for(i=0; i<c->nentry; i++)
350 		if(c->qentry[i]==a){
351 			_threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
352 			c->qentry[i] = nil;
353 			if(c->freed)
354 				_chanfree(c);
355 			return;
356 		}
357 }
358 
359 static int
360 canexec(Alt *a)
361 {
362 	int i, otherop;
363 	Channel *c;
364 
365 	c = a->c;
366 	/* are there senders or receivers blocked? */
367 	otherop = (CHANSND+CHANRCV) - a->op;
368 	for(i=0; i<c->nentry; i++)
369 		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
370 			_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
371 			return 1;
372 		}
373 
374 	/* is there room in the channel? */
375 	if((a->op==CHANSND && c->n < c->s)
376 	|| (a->op==CHANRCV && c->n > 0)){
377 		_threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
378 		return 1;
379 	}
380 
381 	return 0;
382 }
383 
384 static void*
385 altexecbuffered(Alt *a, int willreplace)
386 {
387 	uchar *v;
388 	Channel *c;
389 
390 	c = a->c;
391 	/* use buffered channel queue */
392 	if(a->op==CHANRCV && c->n > 0){
393 		_threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
394 		v = c->v + c->e*(c->f%c->s);
395 		if(!willreplace)
396 			c->n--;
397 		c->f++;
398 		return v;
399 	}
400 	if(a->op==CHANSND && c->n < c->s){
401 		_threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
402 		v = c->v + c->e*((c->f+c->n)%c->s);
403 		if(!willreplace)
404 			c->n++;
405 		return v;
406 	}
407 	abort();
408 	return nil;
409 }
410 
411 static void
412 altcopy(void *dst, void *src, int sz)
413 {
414 	if(dst){
415 		if(src)
416 			memmove(dst, src, sz);
417 		else
418 			memset(dst, 0, sz);
419 	}
420 }
421 
422 static int
423 altexec(Alt *a, int spl)
424 {
425 	volatile Alt *b;
426 	int i, n, otherop;
427 	Channel *c;
428 	void *me, *waiter, *buf;
429 
430 	c = a->c;
431 
432 	/* rendezvous with others */
433 	otherop = (CHANSND+CHANRCV) - a->op;
434 	n = 0;
435 	b = nil;
436 	me = a->v;
437 	for(i=0; i<c->nentry; i++)
438 		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
439 			if(nrand(++n) == 0)
440 				b = c->qentry[i];
441 	if(b != nil){
442 		_threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
443 		waiter = b->v;
444 		if(c->s && c->n){
445 			/*
446 			 * if buffer is full and there are waiters
447 			 * and we're meeting a waiter,
448 			 * we must be receiving.
449 			 *
450 			 * we use the value in the channel buffer,
451 			 * copy the waiter's value into the channel buffer
452 			 * on behalf of the waiter, and then wake the waiter.
453 			 */
454 			if(a->op!=CHANRCV)
455 				abort();
456 			buf = altexecbuffered(a, 1);
457 			altcopy(me, buf, c->e);
458 			altcopy(buf, waiter, c->e);
459 		}else{
460 			if(a->op==CHANRCV)
461 				altcopy(me, waiter, c->e);
462 			else
463 				altcopy(waiter, me, c->e);
464 		}
465 		*b->tag = c;	/* commits us to rendezvous */
466 		_threaddebug(DBGCHAN, "unlocking the chanlock");
467 		unlock(&chanlock);
468 		_procsplx(spl);
469 		_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
470 		while(_threadrendezvous((ulong)b->tag, 0) == ~0)
471 			;
472 		return 1;
473 	}
474 
475 	buf = altexecbuffered(a, 0);
476 	if(a->op==CHANRCV)
477 		altcopy(me, buf, c->e);
478 	else
479 		altcopy(buf, me, c->e);
480 
481 	unlock(&chanlock);
482 	_procsplx(spl);
483 	return 1;
484 }
485