xref: /plan9-contrib/sys/src/cmd/fossil/9proc.c (revision a84536681645e23c630ce4ef2e5c3b284d4c590b)
1 #include "stdinc.h"
2 
3 #include "9.h"
4 #include "dat.h"
5 #include "fns.h"
6 
7 enum {
8 	NConInit	= 128,
9 	NMsgInit	= 384,
10 	NMsgProcInit	= 64,
11 	NMsizeInit	= 8192+IOHDRSZ,
12 };
13 
14 static struct {
15 	VtLock*	alock;			/* alloc */
16 	Msg*	ahead;
17 	VtRendez* arendez;
18 
19 	int	maxmsg;
20 	int	nmsg;
21 	int	nmsgstarve;
22 
23 	VtLock*	rlock;			/* read */
24 	Msg*	rhead;
25 	Msg*	rtail;
26 	VtRendez* rrendez;
27 
28 	int	maxproc;
29 	int	nproc;
30 	int	nprocstarve;
31 
32 	u32int	msize;			/* immutable */
33 } mbox;
34 
35 static struct {
36 	VtLock*	alock;			/* alloc */
37 	Con*	ahead;
38 	VtRendez* arendez;
39 
40 	VtLock*	clock;
41 	Con*	chead;
42 	Con*	ctail;
43 
44 	int	maxcon;
45 	int	ncon;
46 	int	nconstarve;
47 
48 	u32int	msize;
49 } cbox;
50 
51 static void
52 conFree(Con* con)
53 {
54 	assert(con->version == nil);
55 	assert(con->mhead == nil);
56 	assert(con->whead == nil);
57 	assert(con->nfid == 0);
58 	assert(con->state == ConMoribund);
59 
60 	if(con->fd >= 0){
61 		close(con->fd);
62 		con->fd = -1;
63 	}
64 	con->state = ConDead;
65 	con->aok = 0;
66 	con->noauth = con->noperm = con->wstatallow = 0;
67 	con->isconsole = 0;
68 
69 	vtLock(cbox.alock);
70 	if(con->cprev != nil)
71 		con->cprev->cnext = con->cnext;
72 	else
73 		cbox.chead = con->cnext;
74 	if(con->cnext != nil)
75 		con->cnext->cprev = con->cprev;
76 	else
77 		cbox.ctail = con->cprev;
78 	con->cprev = con->cnext = nil;
79 
80 	if(cbox.ncon > cbox.maxcon){
81 		if(con->name != nil)
82 			vtMemFree(con->name);
83 		vtLockFree(con->fidlock);
84 		vtMemFree(con->data);
85 		vtRendezFree(con->wrendez);
86 		vtLockFree(con->wlock);
87 		vtRendezFree(con->mrendez);
88 		vtLockFree(con->mlock);
89 		vtRendezFree(con->rendez);
90 		vtLockFree(con->lock);
91 		vtMemFree(con);
92 		cbox.ncon--;
93 		vtUnlock(cbox.alock);
94 		return;
95 	}
96 	con->anext = cbox.ahead;
97 	cbox.ahead = con;
98 	if(con->anext == nil)
99 		vtWakeup(cbox.arendez);
100 	vtUnlock(cbox.alock);
101 }
102 
103 static void
104 msgFree(Msg* m)
105 {
106 	assert(m->rwnext == nil);
107 	assert(m->flush == nil);
108 
109 	vtLock(mbox.alock);
110 	if(mbox.nmsg > mbox.maxmsg){
111 		vtMemFree(m->data);
112 		vtMemFree(m);
113 		mbox.nmsg--;
114 		vtUnlock(mbox.alock);
115 		return;
116 	}
117 	m->anext = mbox.ahead;
118 	mbox.ahead = m;
119 	if(m->anext == nil)
120 		vtWakeup(mbox.arendez);
121 	vtUnlock(mbox.alock);
122 }
123 
124 static Msg*
125 msgAlloc(Con* con)
126 {
127 	Msg *m;
128 
129 	vtLock(mbox.alock);
130 	while(mbox.ahead == nil){
131 		if(mbox.nmsg >= mbox.maxmsg){
132 			mbox.nmsgstarve++;
133 			vtSleep(mbox.arendez);
134 			continue;
135 		}
136 		m = vtMemAllocZ(sizeof(Msg));
137 		m->data = vtMemAlloc(mbox.msize);
138 		m->msize = mbox.msize;
139 		mbox.nmsg++;
140 		mbox.ahead = m;
141 		break;
142 	}
143 	m = mbox.ahead;
144 	mbox.ahead = m->anext;
145 	m->anext = nil;
146 	vtUnlock(mbox.alock);
147 
148 	m->con = con;
149 	m->state = MsgR;
150 	m->nowq = 0;
151 
152 	return m;
153 }
154 
155 static void
156 msgMunlink(Msg* m)
157 {
158 	Con *con;
159 
160 	con = m->con;
161 
162 	if(m->mprev != nil)
163 		m->mprev->mnext = m->mnext;
164 	else
165 		con->mhead = m->mnext;
166 	if(m->mnext != nil)
167 		m->mnext->mprev = m->mprev;
168 	else
169 		con->mtail = m->mprev;
170 	m->mprev = m->mnext = nil;
171 }
172 
173 void
174 msgFlush(Msg* m)
175 {
176 	Con *con;
177 	Msg *flush, *old;
178 
179 	con = m->con;
180 
181 	if(Dflag)
182 		fprint(2, "msgFlush %F\n", &m->t);
183 
184 	/*
185 	 * If this Tflush has been flushed, nothing to do.
186 	 * Look for the message to be flushed in the
187 	 * queue of all messages still on this connection.
188 	 * If it's not found must assume Elvis has already
189 	 * left the building and reply normally.
190 	 */
191 	vtLock(con->mlock);
192 	if(m->state == MsgF){
193 		vtUnlock(con->mlock);
194 		return;
195 	}
196 	for(old = con->mhead; old != nil; old = old->mnext)
197 		if(old->t.tag == m->t.oldtag)
198 			break;
199 	if(old == nil){
200 		if(Dflag)
201 			fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
202 		vtUnlock(con->mlock);
203 		return;
204 	}
205 
206 	if(Dflag)
207 		fprint(2, "\tmsgFlush found %F\n", &old->t);
208 
209 	/*
210 	 * Found it.
211 	 * There are two cases where the old message can be
212 	 * truly flushed and no reply to the original message given.
213 	 * The first is when the old message is in MsgR state; no
214 	 * processing has been done yet and it is still on the read
215 	 * queue. The second is if old is a Tflush, which doesn't
216 	 * affect the server state. In both cases, put the old
217 	 * message into MsgF state and let MsgWrite toss it after
218 	 * pulling it off the queue.
219 	 */
220 	if(old->state == MsgR || old->t.type == Tflush){
221 		old->state = MsgF;
222 		if(Dflag)
223 			fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
224 				m->t.oldtag);
225 	}
226 
227 	/*
228 	 * Link this flush message and the old message
229 	 * so multiple flushes can be coalesced (if there are
230 	 * multiple Tflush messages for a particular pending
231 	 * request, it is only necessary to respond to the last
232 	 * one, so any previous can be removed) and to be
233 	 * sure flushes wait for their corresponding old
234 	 * message to go out first.
235 	 * Waiting flush messages do not go on the write queue,
236 	 * they are processed after the old message is dealt
237 	 * with. There's no real need to protect the setting of
238 	 * Msg.nowq, the only code to check it runs in this
239 	 * process after this routine returns.
240 	 */
241 	if((flush = old->flush) != nil){
242 		if(Dflag)
243 			fprint(2, "msgFlush: remove %d from %d list\n",
244 				old->flush->t.tag, old->t.tag);
245 		m->flush = flush->flush;
246 		flush->flush = nil;
247 		msgMunlink(flush);
248 		msgFree(flush);
249 	}
250 	old->flush = m;
251 	m->nowq = 1;
252 
253 	if(Dflag)
254 		fprint(2, "msgFlush: add %d to %d queue\n",
255 			m->t.tag, old->t.tag);
256 	vtUnlock(con->mlock);
257 }
258 
259 static void
260 msgProc(void*)
261 {
262 	Msg *m;
263 	char *e;
264 	Con *con;
265 
266 	vtThreadSetName("msgProc");
267 
268 	for(;;){
269 		/*
270 		 * If surplus to requirements, exit.
271 		 * If not, wait for and pull a message off
272 		 * the read queue.
273 		 */
274 		vtLock(mbox.rlock);
275 		if(mbox.nproc > mbox.maxproc){
276 			mbox.nproc--;
277 			vtUnlock(mbox.rlock);
278 			break;
279 		}
280 		while(mbox.rhead == nil)
281 			vtSleep(mbox.rrendez);
282 		m = mbox.rhead;
283 		mbox.rhead = m->rwnext;
284 		m->rwnext = nil;
285 		vtUnlock(mbox.rlock);
286 
287 		con = m->con;
288 		e = nil;
289 
290 		/*
291 		 * If the message has been flushed before
292 		 * any 9P processing has started, mark it so
293 		 * none will be attempted.
294 		 */
295 		vtLock(con->mlock);
296 		if(m->state == MsgF)
297 			e = "flushed";
298 		else
299 			m->state = Msg9;
300 		vtUnlock(con->mlock);
301 
302 		if(e == nil){
303 			/*
304 			 * explain this
305 			 */
306 			vtLock(con->lock);
307 			if(m->t.type == Tversion){
308 				con->version = m;
309 				con->state = ConDown;
310 				while(con->mhead != m)
311 					vtSleep(con->rendez);
312 				assert(con->state == ConDown);
313 				if(con->version == m){
314 					con->version = nil;
315 					con->state = ConInit;
316 				}
317 				else
318 					e = "Tversion aborted";
319 			}
320 			else if(con->state != ConUp)
321 				e = "connection not ready";
322 			vtUnlock(con->lock);
323 		}
324 
325 		/*
326 		 * Dispatch if not error already.
327 		 */
328 		m->r.tag = m->t.tag;
329 		if(e == nil && !(*rFcall[m->t.type])(m))
330 			e = vtGetError();
331 		if(e != nil){
332 			m->r.type = Rerror;
333 			m->r.ename = e;
334 		}
335 		else
336 			m->r.type = m->t.type+1;
337 
338 		/*
339 		 * Put the message (with reply) on the
340 		 * write queue and wakeup the write process.
341 		 */
342 		if(!m->nowq){
343 			vtLock(con->wlock);
344 			if(con->whead == nil)
345 				con->whead = m;
346 			else
347 				con->wtail->rwnext = m;
348 			con->wtail = m;
349 			vtWakeup(con->wrendez);
350 			vtUnlock(con->wlock);
351 		}
352 	}
353 }
354 
355 static void
356 msgRead(void* v)
357 {
358 	Msg *m;
359 	Con *con;
360 	int eof, fd, n;
361 
362 	vtThreadSetName("msgRead");
363 
364 	con = v;
365 	fd = con->fd;
366 	eof = 0;
367 
368 	while(!eof){
369 		m = msgAlloc(con);
370 
371 		while((n = read9pmsg(fd, m->data, con->msize)) == 0)
372 			;
373 		if(n < 0){
374 			m->t.type = Tversion;
375 			m->t.fid = NOFID;
376 			m->t.tag = NOTAG;
377 			m->t.msize = con->msize;
378 			m->t.version = "9PEoF";
379 			eof = 1;
380 		}
381 		else if(convM2S(m->data, n, &m->t) != n){
382 			if(Dflag)
383 				fprint(2, "msgRead: convM2S error: %s\n",
384 					con->name);
385 			msgFree(m);
386 			continue;
387 		}
388 		if(Dflag)
389 			fprint(2, "msgRead %p: t %F\n", con, &m->t);
390 
391 		vtLock(con->mlock);
392 		if(con->mtail != nil){
393 			m->mprev = con->mtail;
394 			con->mtail->mnext = m;
395 		}
396 		else{
397 			con->mhead = m;
398 			m->mprev = nil;
399 		}
400 		con->mtail = m;
401 		vtUnlock(con->mlock);
402 
403 		vtLock(mbox.rlock);
404 		if(mbox.rhead == nil){
405 			mbox.rhead = m;
406 			if(!vtWakeup(mbox.rrendez)){
407 				if(mbox.nproc < mbox.maxproc){
408 					if(vtThread(msgProc, nil) > 0)
409 						mbox.nproc++;
410 				}
411 				else
412 					mbox.nprocstarve++;
413 			}
414 			/*
415 			 * don't need this surely?
416 			vtWakeup(mbox.rrendez);
417 			 */
418 		}
419 		else
420 			mbox.rtail->rwnext = m;
421 		mbox.rtail = m;
422 		vtUnlock(mbox.rlock);
423 	}
424 }
425 
426 static void
427 msgWrite(void* v)
428 {
429 	Con *con;
430 	int eof, n;
431 	Msg *flush, *m;
432 
433 	vtThreadSetName("msgWrite");
434 
435 	con = v;
436 	if(vtThread(msgRead, con) < 0){
437 		conFree(con);
438 		return;
439 	}
440 
441 	for(;;){
442 		/*
443 		 * Wait for and pull a message off the write queue.
444 		 */
445 		vtLock(con->wlock);
446 		while(con->whead == nil)
447 			vtSleep(con->wrendez);
448 		m = con->whead;
449 		con->whead = m->rwnext;
450 		m->rwnext = nil;
451 		assert(!m->nowq);
452 		vtUnlock(con->wlock);
453 
454 		eof = 0;
455 
456 		/*
457 		 * Write each message (if it hasn't been flushed)
458 		 * followed by any messages waiting for it to complete.
459 		 */
460 		vtLock(con->mlock);
461 		while(m != nil){
462 			msgMunlink(m);
463 
464 			if(Dflag)
465 				fprint(2, "msgWrite %d: r %F\n",
466 					m->state, &m->r);
467 
468 			if(m->state != MsgF){
469 				m->state = MsgW;
470 				vtUnlock(con->mlock);
471 
472 				n = convS2M(&m->r, con->data, con->msize);
473 				if(write(con->fd, con->data, n) != n)
474 					eof = 1;
475 
476 				vtLock(con->mlock);
477 			}
478 
479 			if((flush = m->flush) != nil){
480 				assert(flush->nowq);
481 				m->flush = nil;
482 			}
483 			msgFree(m);
484 			m = flush;
485 		}
486 		vtUnlock(con->mlock);
487 
488 		vtLock(con->lock);
489 		if(eof && con->fd >= 0){
490 			close(con->fd);
491 			con->fd = -1;
492 		}
493 		if(con->state == ConDown)
494 			vtWakeup(con->rendez);
495 		if(con->state == ConMoribund && con->mhead == nil){
496 			vtUnlock(con->lock);
497 			conFree(con);
498 			break;
499 		}
500 		vtUnlock(con->lock);
501 	}
502 }
503 
504 Con*
505 conAlloc(int fd, char* name)
506 {
507 	Con *con;
508 
509 	vtLock(cbox.alock);
510 	while(cbox.ahead == nil){
511 		if(cbox.ncon >= cbox.maxcon){
512 			cbox.nconstarve++;
513 			vtSleep(cbox.arendez);
514 			continue;
515 		}
516 		con = vtMemAllocZ(sizeof(Con));
517 		con->lock = vtLockAlloc();
518 		con->rendez = vtRendezAlloc(con->lock);
519 		con->data = vtMemAlloc(cbox.msize);
520 		con->msize = cbox.msize;
521 		con->alock = vtLockAlloc();
522 		con->mlock = vtLockAlloc();
523 		con->mrendez = vtRendezAlloc(con->mlock);
524 		con->wlock = vtLockAlloc();
525 		con->wrendez = vtRendezAlloc(con->wlock);
526 		con->fidlock = vtLockAlloc();
527 
528 		cbox.ncon++;
529 		cbox.ahead = con;
530 		break;
531 	}
532 	con = cbox.ahead;
533 	cbox.ahead = con->anext;
534 	con->anext = nil;
535 
536 	if(cbox.ctail != nil){
537 		con->cprev = cbox.ctail;
538 		cbox.ctail->cnext = con;
539 	}
540 	else{
541 		cbox.chead = con;
542 		con->cprev = nil;
543 	}
544 	cbox.ctail = con;
545 
546 	assert(con->mhead == nil);
547 	assert(con->whead == nil);
548 	assert(con->fhead == nil);
549 	assert(con->nfid == 0);
550 
551 	con->state = ConNew;
552 	con->fd = fd;
553 	if(con->name != nil){
554 		vtMemFree(con->name);
555 		con->name = nil;
556 	}
557 	if(name != nil)
558 		con->name = vtStrDup(name);
559 	else
560 		con->name = vtStrDup("unknown");
561 	con->aok = 0;
562 	con->noauth = con->noperm = con->wstatallow = 0;
563 	con->isconsole = 0;
564 	vtUnlock(cbox.alock);
565 
566 	if(vtThread(msgWrite, con) < 0){
567 		conFree(con);
568 		return nil;
569 	}
570 
571 	return con;
572 }
573 
574 static int
575 cmdMsg(int argc, char* argv[])
576 {
577 	char *p;
578 	char *usage = "usage: msg [-m nmsg] [-p nproc]";
579 	int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
580 
581 	maxmsg = maxproc = 0;
582 
583 	ARGBEGIN{
584 	default:
585 		return cliError(usage);
586 	case 'm':
587 		p = ARGF();
588 		if(p == nil)
589 			return cliError(usage);
590 		maxmsg = strtol(argv[0], &p, 0);
591 		if(maxmsg <= 0 || p == argv[0] || *p != '\0')
592 			return cliError(usage);
593 		break;
594 	case 'p':
595 		p = ARGF();
596 		if(p == nil)
597 			return cliError(usage);
598 		maxproc = strtol(argv[0], &p, 0);
599 		if(maxproc <= 0 || p == argv[0] || *p != '\0')
600 			return cliError(usage);
601 		break;
602 	}ARGEND
603 	if(argc)
604 		return cliError(usage);
605 
606 	vtLock(mbox.alock);
607 	if(maxmsg)
608 		mbox.maxmsg = maxmsg;
609 	maxmsg = mbox.maxmsg;
610 	nmsg = mbox.nmsg;
611 	nmsgstarve = mbox.nmsgstarve;
612 	vtUnlock(mbox.alock);
613 
614 	vtLock(mbox.rlock);
615 	if(maxproc)
616 		mbox.maxproc = maxproc;
617 	maxproc = mbox.maxproc;
618 	nproc = mbox.nproc;
619 	nprocstarve = mbox.nprocstarve;
620 	vtUnlock(mbox.rlock);
621 
622 	consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
623 	consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
624 		nmsg, nmsgstarve, nproc, nprocstarve);
625 
626 	return 1;
627 }
628 
629 static int
630 scmp(Fid *a, Fid *b)
631 {
632 	if(a == 0)
633 		return 1;
634 	if(b == 0)
635 		return -1;
636 	return strcmp(a->uname, b->uname);
637 }
638 
639 static Fid*
640 fidMerge(Fid *a, Fid *b)
641 {
642 	Fid *s, **l;
643 
644 	l = &s;
645 	while(a || b){
646 		if(scmp(a, b) < 0){
647 			*l = a;
648 			l = &a->sort;
649 			a = a->sort;
650 		}else{
651 			*l = b;
652 			l = &b->sort;
653 			b = b->sort;
654 		}
655 	}
656 	*l = 0;
657 	return s;
658 }
659 
660 static Fid*
661 fidMergeSort(Fid *f)
662 {
663 	int delay;
664 	Fid *a, *b;
665 
666 	if(f == nil)
667 		return nil;
668 	if(f->sort == nil)
669 		return f;
670 
671 	a = b = f;
672 	delay = 1;
673 	while(a && b){
674 		if(delay)	/* easy way to handle 2-element list */
675 			delay = 0;
676 		else
677 			a = a->sort;
678 		if(b = b->sort)
679 			b = b->sort;
680 	}
681 
682 	b = a->sort;
683 	a->sort = nil;
684 
685 	a = fidMergeSort(f);
686 	b = fidMergeSort(b);
687 
688 	return fidMerge(a, b);
689 }
690 
691 static int
692 cmdWho(int argc, char* argv[])
693 {
694 	char *usage = "usage: who";
695 	int i;
696 	Con *con;
697 	Fid *fid, *last;
698 
699 	ARGBEGIN{
700 	default:
701 		return cliError(usage);
702 	}ARGEND
703 
704 	if(argc > 0)
705 		return cliError(usage);
706 
707 	vtRLock(cbox.clock);
708 	for(con=cbox.chead; con; con=con->cnext){
709 		consPrint("\t%q:", con->name);
710 		vtLock(con->fidlock);
711 		last = nil;
712 		for(i=0; i<NFidHash; i++)
713 			for(fid=con->fidhash[i]; fid; fid=fid->hash)
714 				if(fid->fidno != NOFID && fid->uname){
715 					fid->sort = last;
716 					last = fid;
717 				}
718 		fid = fidMergeSort(last);
719 		last = nil;
720 		for(; fid; last=fid, fid=fid->sort)
721 			if(last==nil || strcmp(fid->uname, last->uname) != 0)
722 				consPrint(" %q", fid->uname);
723 		vtUnlock(con->fidlock);
724 		consPrint("\n");
725 	}
726 	vtRUnlock(cbox.clock);
727 	return 1;
728 }
729 
730 void
731 msgInit(void)
732 {
733 	mbox.alock = vtLockAlloc();
734 	mbox.arendez = vtRendezAlloc(mbox.alock);
735 
736 	mbox.rlock = vtLockAlloc();
737 	mbox.rrendez = vtRendezAlloc(mbox.rlock);
738 
739 	mbox.maxmsg = NMsgInit;
740 	mbox.maxproc = NMsgProcInit;
741 	mbox.msize = NMsizeInit;
742 
743 	cliAddCmd("msg", cmdMsg);
744 }
745 
746 static int
747 cmdCon(int argc, char* argv[])
748 {
749 	char *p;
750 	Con *con;
751 	char *usage = "usage: con [-m ncon]";
752 	int maxcon, ncon, nconstarve;
753 
754 	maxcon = 0;
755 
756 	ARGBEGIN{
757 	default:
758 		return cliError(usage);
759 	case 'm':
760 		p = ARGF();
761 		if(p == nil)
762 			return cliError(usage);
763 		maxcon = strtol(argv[0], &p, 0);
764 		if(maxcon <= 0 || p == argv[0] || *p != '\0')
765 			return cliError(usage);
766 		break;
767 	}ARGEND
768 	if(argc)
769 		return cliError(usage);
770 
771 	vtLock(cbox.clock);
772 	if(maxcon)
773 		cbox.maxcon = maxcon;
774 	maxcon = cbox.maxcon;
775 	ncon = cbox.ncon;
776 	nconstarve = cbox.nconstarve;
777 	vtUnlock(cbox.clock);
778 
779 	consPrint("\tcon -m %d\n", maxcon);
780 	consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
781 
782 	vtRLock(cbox.clock);
783 	for(con = cbox.chead; con != nil; con = con->cnext){
784 		consPrint("\t%s\n", con->name);
785 	}
786 	vtRUnlock(cbox.clock);
787 
788 	return 1;
789 }
790 
791 void
792 conInit(void)
793 {
794 	cbox.alock = vtLockAlloc();
795 	cbox.arendez = vtRendezAlloc(cbox.alock);
796 
797 	cbox.clock = vtLockAlloc();
798 
799 	cbox.maxcon = NConInit;
800 	cbox.msize = NMsizeInit;
801 
802 	cliAddCmd("con", cmdCon);
803 	cliAddCmd("who", cmdWho);
804 }
805