xref: /plan9-contrib/sys/src/cmd/fossil/9proc.c (revision 2ec6491f4460014c01f3ea5c06b30b1a392b0bb2)
1 #include "stdinc.h"
2 
3 #include "9.h"
4 #include "dat.h"
5 #include "fns.h"
6 
7 enum {
8 	NConInit	= 128,
9 	NMsgInit	= 384,
10 	NMsgProcInit	= 64,
11 	NMsizeInit	= 16*1024+IOHDRSZ,
12 };
13 
14 static struct {
15 	QLock	alock;			/* alloc */
16 	Msg*	ahead;
17 	Rendez	arendez;
18 
19 	int	maxmsg;
20 	int	nmsg;
21 	int	nmsgstarve;
22 
23 	QLock	rlock;			/* read */
24 	Msg*	rhead;
25 	Msg*	rtail;
26 	Rendez	rrendez;
27 
28 	int	maxproc;
29 	int	nproc;
30 	int	nprocstarve;
31 
32 	u32int	msize;			/* immutable */
33 } mbox;
34 
35 static struct {
36 	QLock	alock;			/* alloc */
37 	Con*	ahead;
38 	Rendez	arendez;
39 
40 	RWLock	clock;
41 	Con*	chead;
42 	Con*	ctail;
43 
44 	int	maxcon;
45 	int	ncon;
46 	int	nconstarve;
47 
48 	u32int	msize;
49 } cbox;
50 
51 static void
conFree(Con * con)52 conFree(Con* con)
53 {
54 	assert(con->version == nil);
55 	assert(con->mhead == nil);
56 	assert(con->whead == nil);
57 	assert(con->nfid == 0);
58 	assert(con->state == ConMoribund);
59 
60 	if(con->fd >= 0){
61 		close(con->fd);
62 		con->fd = -1;
63 	}
64 	con->state = ConDead;
65 	con->aok = 0;
66 	con->flags = 0;
67 	con->isconsole = 0;
68 
69 	qlock(&cbox.alock);
70 	if(con->cprev != nil)
71 		con->cprev->cnext = con->cnext;
72 	else
73 		cbox.chead = con->cnext;
74 	if(con->cnext != nil)
75 		con->cnext->cprev = con->cprev;
76 	else
77 		cbox.ctail = con->cprev;
78 	con->cprev = con->cnext = nil;
79 
80 	if(cbox.ncon > cbox.maxcon){
81 		if(con->name != nil)
82 			vtfree(con->name);
83 		vtfree(con->data);
84 		vtfree(con);
85 		cbox.ncon--;
86 		qunlock(&cbox.alock);
87 		return;
88 	}
89 	con->anext = cbox.ahead;
90 	cbox.ahead = con;
91 	if(con->anext == nil)
92 		rwakeup(&cbox.arendez);
93 	qunlock(&cbox.alock);
94 }
95 
96 static void
msgFree(Msg * m)97 msgFree(Msg* m)
98 {
99 	assert(m->rwnext == nil);
100 	assert(m->flush == nil);
101 
102 	qlock(&mbox.alock);
103 	if(mbox.nmsg > mbox.maxmsg){
104 		vtfree(m->data);
105 		vtfree(m);
106 		mbox.nmsg--;
107 		qunlock(&mbox.alock);
108 		return;
109 	}
110 	m->anext = mbox.ahead;
111 	mbox.ahead = m;
112 	if(m->anext == nil)
113 		rwakeup(&mbox.arendez);
114 	qunlock(&mbox.alock);
115 }
116 
117 static Msg*
msgAlloc(Con * con)118 msgAlloc(Con* con)
119 {
120 	Msg *m;
121 
122 	qlock(&mbox.alock);
123 	while(mbox.ahead == nil){
124 		if(mbox.nmsg >= mbox.maxmsg){
125 			mbox.nmsgstarve++;
126 			rsleep(&mbox.arendez);
127 			continue;
128 		}
129 		m = vtmallocz(sizeof(Msg));
130 		m->data = vtmalloc(mbox.msize);
131 		m->msize = mbox.msize;
132 		mbox.nmsg++;
133 		mbox.ahead = m;
134 		break;
135 	}
136 	m = mbox.ahead;
137 	mbox.ahead = m->anext;
138 	m->anext = nil;
139 	qunlock(&mbox.alock);
140 
141 	m->con = con;
142 	m->state = MsgR;
143 	m->nowq = 0;
144 
145 	return m;
146 }
147 
148 static void
msgMunlink(Msg * m)149 msgMunlink(Msg* m)
150 {
151 	Con *con;
152 
153 	con = m->con;
154 
155 	if(m->mprev != nil)
156 		m->mprev->mnext = m->mnext;
157 	else
158 		con->mhead = m->mnext;
159 	if(m->mnext != nil)
160 		m->mnext->mprev = m->mprev;
161 	else
162 		con->mtail = m->mprev;
163 	m->mprev = m->mnext = nil;
164 }
165 
166 void
msgFlush(Msg * m)167 msgFlush(Msg* m)
168 {
169 	Con *con;
170 	Msg *flush, *old;
171 
172 	con = m->con;
173 
174 	if(Dflag)
175 		fprint(2, "msgFlush %F\n", &m->t);
176 
177 	/*
178 	 * If this Tflush has been flushed, nothing to do.
179 	 * Look for the message to be flushed in the
180 	 * queue of all messages still on this connection.
181 	 * If it's not found must assume Elvis has already
182 	 * left the building and reply normally.
183 	 */
184 	qlock(&con->mlock);
185 	if(m->state == MsgF){
186 		qunlock(&con->mlock);
187 		return;
188 	}
189 	for(old = con->mhead; old != nil; old = old->mnext)
190 		if(old->t.tag == m->t.oldtag)
191 			break;
192 	if(old == nil){
193 		if(Dflag)
194 			fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
195 		qunlock(&con->mlock);
196 		return;
197 	}
198 
199 	if(Dflag)
200 		fprint(2, "\tmsgFlush found %F\n", &old->t);
201 
202 	/*
203 	 * Found it.
204 	 * There are two cases where the old message can be
205 	 * truly flushed and no reply to the original message given.
206 	 * The first is when the old message is in MsgR state; no
207 	 * processing has been done yet and it is still on the read
208 	 * queue. The second is if old is a Tflush, which doesn't
209 	 * affect the server state. In both cases, put the old
210 	 * message into MsgF state and let MsgWrite toss it after
211 	 * pulling it off the queue.
212 	 */
213 	if(old->state == MsgR || old->t.type == Tflush){
214 		old->state = MsgF;
215 		if(Dflag)
216 			fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
217 				m->t.oldtag);
218 	}
219 
220 	/*
221 	 * Link this flush message and the old message
222 	 * so multiple flushes can be coalesced (if there are
223 	 * multiple Tflush messages for a particular pending
224 	 * request, it is only necessary to respond to the last
225 	 * one, so any previous can be removed) and to be
226 	 * sure flushes wait for their corresponding old
227 	 * message to go out first.
228 	 * Waiting flush messages do not go on the write queue,
229 	 * they are processed after the old message is dealt
230 	 * with. There's no real need to protect the setting of
231 	 * Msg.nowq, the only code to check it runs in this
232 	 * process after this routine returns.
233 	 */
234 	if((flush = old->flush) != nil){
235 		if(Dflag)
236 			fprint(2, "msgFlush: remove %d from %d list\n",
237 				old->flush->t.tag, old->t.tag);
238 		m->flush = flush->flush;
239 		flush->flush = nil;
240 		msgMunlink(flush);
241 		msgFree(flush);
242 	}
243 	old->flush = m;
244 	m->nowq = 1;
245 
246 	if(Dflag)
247 		fprint(2, "msgFlush: add %d to %d queue\n",
248 			m->t.tag, old->t.tag);
249 	qunlock(&con->mlock);
250 }
251 
252 static void
msgProc(void *)253 msgProc(void*)
254 {
255 	Msg *m;
256 	char e[ERRMAX];
257 	Con *con;
258 
259 	threadsetname("msgProc");
260 
261 	for(;;){
262 		/*
263 		 * If surplus to requirements, exit.
264 		 * If not, wait for and pull a message off
265 		 * the read queue.
266 		 */
267 		qlock(&mbox.rlock);
268 		if(mbox.nproc > mbox.maxproc){
269 			mbox.nproc--;
270 			qunlock(&mbox.rlock);
271 			break;
272 		}
273 		while(mbox.rhead == nil)
274 			rsleep(&mbox.rrendez);
275 		m = mbox.rhead;
276 		mbox.rhead = m->rwnext;
277 		m->rwnext = nil;
278 		qunlock(&mbox.rlock);
279 
280 		con = m->con;
281 		*e = 0;
282 
283 		/*
284 		 * If the message has been flushed before
285 		 * any 9P processing has started, mark it so
286 		 * none will be attempted.
287 		 */
288 		qlock(&con->mlock);
289 		if(m->state == MsgF)
290 			strcpy(e, "flushed");
291 		else
292 			m->state = Msg9;
293 		qunlock(&con->mlock);
294 
295 		if(*e == 0){
296 			/*
297 			 * explain this
298 			 */
299 			qlock(&con->lock);
300 			if(m->t.type == Tversion){
301 				con->version = m;
302 				con->state = ConDown;
303 				while(con->mhead != m)
304 					rsleep(&con->rendez);
305 				assert(con->state == ConDown);
306 				if(con->version == m){
307 					con->version = nil;
308 					con->state = ConInit;
309 				}
310 				else
311 					strcpy(e, "Tversion aborted");
312 			}
313 			else if(con->state != ConUp)
314 				strcpy(e, "connection not ready");
315 			qunlock(&con->lock);
316 		}
317 
318 		/*
319 		 * Dispatch if not error already.
320 		 */
321 		m->r.tag = m->t.tag;
322 		if(*e == 0 && !(*rFcall[m->t.type])(m))
323 			rerrstr(e, sizeof e);
324 		if(*e != 0){
325 			m->r.type = Rerror;
326 			m->r.ename = e;
327 		}
328 		else
329 			m->r.type = m->t.type+1;
330 
331 		/*
332 		 * Put the message (with reply) on the
333 		 * write queue and wakeup the write process.
334 		 */
335 		if(!m->nowq){
336 			qlock(&con->wlock);
337 			if(con->whead == nil)
338 				con->whead = m;
339 			else
340 				con->wtail->rwnext = m;
341 			con->wtail = m;
342 			rwakeup(&con->wrendez);
343 			qunlock(&con->wlock);
344 		}
345 	}
346 }
347 
348 static void
msgRead(void * v)349 msgRead(void* v)
350 {
351 	Msg *m;
352 	Con *con;
353 	int eof, fd, n;
354 
355 	threadsetname("msgRead");
356 
357 	con = v;
358 	fd = con->fd;
359 	eof = 0;
360 
361 	while(!eof){
362 		m = msgAlloc(con);
363 
364 		while((n = read9pmsg(fd, m->data, con->msize)) == 0)
365 			;
366 		if(n < 0){
367 			m->t.type = Tversion;
368 			m->t.fid = NOFID;
369 			m->t.tag = NOTAG;
370 			m->t.msize = con->msize;
371 			m->t.version = "9PEoF";
372 			eof = 1;
373 		}
374 		else if(convM2S(m->data, n, &m->t) != n){
375 			if(Dflag)
376 				fprint(2, "msgRead: convM2S error: %s\n",
377 					con->name);
378 			msgFree(m);
379 			continue;
380 		}
381 		if(Dflag)
382 			fprint(2, "msgRead %p: t %F\n", con, &m->t);
383 
384 		qlock(&con->mlock);
385 		if(con->mtail != nil){
386 			m->mprev = con->mtail;
387 			con->mtail->mnext = m;
388 		}
389 		else{
390 			con->mhead = m;
391 			m->mprev = nil;
392 		}
393 		con->mtail = m;
394 		qunlock(&con->mlock);
395 
396 		qlock(&mbox.rlock);
397 		if(mbox.rhead == nil){
398 			mbox.rhead = m;
399 			if(!rwakeup(&mbox.rrendez)){
400 				if(mbox.nproc < mbox.maxproc){
401 					if(proccreate(msgProc, nil, STACK) > 0)
402 						mbox.nproc++;
403 				}
404 				else
405 					mbox.nprocstarve++;
406 			}
407 			/*
408 			 * don't need this surely?
409 			rwakeup(&mbox.rrendez);
410 			 */
411 		}
412 		else
413 			mbox.rtail->rwnext = m;
414 		mbox.rtail = m;
415 		qunlock(&mbox.rlock);
416 	}
417 }
418 
419 static void
msgWrite(void * v)420 msgWrite(void* v)
421 {
422 	Con *con;
423 	int eof, n;
424 	Msg *flush, *m;
425 
426 	threadsetname("msgWrite");
427 
428 	con = v;
429 	if(proccreate(msgRead, con, STACK) < 0){
430 		conFree(con);
431 		return;
432 	}
433 
434 	for(;;){
435 		/*
436 		 * Wait for and pull a message off the write queue.
437 		 */
438 		qlock(&con->wlock);
439 		while(con->whead == nil)
440 			rsleep(&con->wrendez);
441 		m = con->whead;
442 		con->whead = m->rwnext;
443 		m->rwnext = nil;
444 		assert(!m->nowq);
445 		qunlock(&con->wlock);
446 
447 		eof = 0;
448 
449 		/*
450 		 * Write each message (if it hasn't been flushed)
451 		 * followed by any messages waiting for it to complete.
452 		 */
453 		qlock(&con->mlock);
454 		while(m != nil){
455 			msgMunlink(m);
456 
457 			if(Dflag)
458 				fprint(2, "msgWrite %d: r %F\n",
459 					m->state, &m->r);
460 
461 			if(m->state != MsgF){
462 				m->state = MsgW;
463 				qunlock(&con->mlock);
464 
465 				n = convS2M(&m->r, con->data, con->msize);
466 				if(write(con->fd, con->data, n) != n)
467 					eof = 1;
468 
469 				qlock(&con->mlock);
470 			}
471 
472 			if((flush = m->flush) != nil){
473 				assert(flush->nowq);
474 				m->flush = nil;
475 			}
476 			msgFree(m);
477 			m = flush;
478 		}
479 		qunlock(&con->mlock);
480 
481 		qlock(&con->lock);
482 		if(eof && con->fd >= 0){
483 			close(con->fd);
484 			con->fd = -1;
485 		}
486 		if(con->state == ConDown)
487 			rwakeup(&con->rendez);
488 		if(con->state == ConMoribund && con->mhead == nil){
489 			qunlock(&con->lock);
490 			conFree(con);
491 			break;
492 		}
493 		qunlock(&con->lock);
494 	}
495 }
496 
497 Con*
conAlloc(int fd,char * name,int flags)498 conAlloc(int fd, char* name, int flags)
499 {
500 	Con *con;
501 	char buf[128], *p;
502 	int rfd, n;
503 
504 	qlock(&cbox.alock);
505 	while(cbox.ahead == nil){
506 		if(cbox.ncon >= cbox.maxcon){
507 			cbox.nconstarve++;
508 			rsleep(&cbox.arendez);
509 			continue;
510 		}
511 		con = vtmallocz(sizeof(Con));
512 		con->rendez.l = &con->lock;
513 		con->data = vtmalloc(cbox.msize);
514 		con->msize = cbox.msize;
515 		con->mrendez.l = &con->mlock;
516 		con->wrendez.l = &con->wlock;
517 
518 		cbox.ncon++;
519 		cbox.ahead = con;
520 		break;
521 	}
522 	con = cbox.ahead;
523 	cbox.ahead = con->anext;
524 	con->anext = nil;
525 
526 	if(cbox.ctail != nil){
527 		con->cprev = cbox.ctail;
528 		cbox.ctail->cnext = con;
529 	}
530 	else{
531 		cbox.chead = con;
532 		con->cprev = nil;
533 	}
534 	cbox.ctail = con;
535 
536 	assert(con->mhead == nil);
537 	assert(con->whead == nil);
538 	assert(con->fhead == nil);
539 	assert(con->nfid == 0);
540 
541 	con->state = ConNew;
542 	con->fd = fd;
543 	if(con->name != nil){
544 		vtfree(con->name);
545 		con->name = nil;
546 	}
547 	if(name != nil)
548 		con->name = vtstrdup(name);
549 	else
550 		con->name = vtstrdup("unknown");
551 	con->remote[0] = 0;
552 	snprint(buf, sizeof buf, "%s/remote", con->name);
553 	if((rfd = open(buf, OREAD)) >= 0){
554 		n = read(rfd, buf, sizeof buf-1);
555 		close(rfd);
556 		if(n > 0){
557 			buf[n] = 0;
558 			if((p = strchr(buf, '\n')) != nil)
559 				*p = 0;
560 			strecpy(con->remote, con->remote+sizeof con->remote, buf);
561 		}
562 	}
563 	con->flags = flags;
564 	con->isconsole = 0;
565 	qunlock(&cbox.alock);
566 
567 	if(proccreate(msgWrite, con, STACK) < 0){
568 		conFree(con);
569 		return nil;
570 	}
571 
572 	return con;
573 }
574 
575 static int
cmdMsg(int argc,char * argv[])576 cmdMsg(int argc, char* argv[])
577 {
578 	char *p;
579 	char *usage = "usage: msg [-m nmsg] [-p nproc]";
580 	int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
581 
582 	maxmsg = maxproc = 0;
583 
584 	ARGBEGIN{
585 	default:
586 		return cliError(usage);
587 	case 'm':
588 		p = ARGF();
589 		if(p == nil)
590 			return cliError(usage);
591 		maxmsg = strtol(argv[0], &p, 0);
592 		if(maxmsg <= 0 || p == argv[0] || *p != '\0')
593 			return cliError(usage);
594 		break;
595 	case 'p':
596 		p = ARGF();
597 		if(p == nil)
598 			return cliError(usage);
599 		maxproc = strtol(argv[0], &p, 0);
600 		if(maxproc <= 0 || p == argv[0] || *p != '\0')
601 			return cliError(usage);
602 		break;
603 	}ARGEND
604 	if(argc)
605 		return cliError(usage);
606 
607 	qlock(&mbox.alock);
608 	if(maxmsg)
609 		mbox.maxmsg = maxmsg;
610 	maxmsg = mbox.maxmsg;
611 	nmsg = mbox.nmsg;
612 	nmsgstarve = mbox.nmsgstarve;
613 	qunlock(&mbox.alock);
614 
615 	qlock(&mbox.rlock);
616 	if(maxproc)
617 		mbox.maxproc = maxproc;
618 	maxproc = mbox.maxproc;
619 	nproc = mbox.nproc;
620 	nprocstarve = mbox.nprocstarve;
621 	qunlock(&mbox.rlock);
622 
623 	consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
624 	consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
625 		nmsg, nmsgstarve, nproc, nprocstarve);
626 
627 	return 1;
628 }
629 
630 static int
scmp(Fid * a,Fid * b)631 scmp(Fid *a, Fid *b)
632 {
633 	if(a == 0)
634 		return 1;
635 	if(b == 0)
636 		return -1;
637 	return strcmp(a->uname, b->uname);
638 }
639 
640 static Fid*
fidMerge(Fid * a,Fid * b)641 fidMerge(Fid *a, Fid *b)
642 {
643 	Fid *s, **l;
644 
645 	l = &s;
646 	while(a || b){
647 		if(scmp(a, b) < 0){
648 			*l = a;
649 			l = &a->sort;
650 			a = a->sort;
651 		}else{
652 			*l = b;
653 			l = &b->sort;
654 			b = b->sort;
655 		}
656 	}
657 	*l = 0;
658 	return s;
659 }
660 
661 static Fid*
fidMergeSort(Fid * f)662 fidMergeSort(Fid *f)
663 {
664 	int delay;
665 	Fid *a, *b;
666 
667 	if(f == nil)
668 		return nil;
669 	if(f->sort == nil)
670 		return f;
671 
672 	a = b = f;
673 	delay = 1;
674 	while(a && b){
675 		if(delay)	/* easy way to handle 2-element list */
676 			delay = 0;
677 		else
678 			a = a->sort;
679 		if(b = b->sort)
680 			b = b->sort;
681 	}
682 
683 	b = a->sort;
684 	a->sort = nil;
685 
686 	a = fidMergeSort(f);
687 	b = fidMergeSort(b);
688 
689 	return fidMerge(a, b);
690 }
691 
692 static int
cmdWho(int argc,char * argv[])693 cmdWho(int argc, char* argv[])
694 {
695 	char *usage = "usage: who";
696 	int i, l1, l2, l;
697 	Con *con;
698 	Fid *fid, *last;
699 
700 	ARGBEGIN{
701 	default:
702 		return cliError(usage);
703 	}ARGEND
704 
705 	if(argc > 0)
706 		return cliError(usage);
707 
708 	rlock(&cbox.clock);
709 	l1 = 0;
710 	l2 = 0;
711 	for(con=cbox.chead; con; con=con->cnext){
712 		if((l = strlen(con->name)) > l1)
713 			l1 = l;
714 		if((l = strlen(con->remote)) > l2)
715 			l2 = l;
716 	}
717 	for(con=cbox.chead; con; con=con->cnext){
718 		consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote);
719 		qlock(&con->fidlock);
720 		last = nil;
721 		for(i=0; i<NFidHash; i++)
722 			for(fid=con->fidhash[i]; fid; fid=fid->hash)
723 				if(fid->fidno != NOFID && fid->uname){
724 					fid->sort = last;
725 					last = fid;
726 				}
727 		fid = fidMergeSort(last);
728 		last = nil;
729 		for(; fid; last=fid, fid=fid->sort)
730 			if(last==nil || strcmp(fid->uname, last->uname) != 0)
731 				consPrint(" %q", fid->uname);
732 		qunlock(&con->fidlock);
733 		consPrint("\n");
734 	}
735 	runlock(&cbox.clock);
736 	return 1;
737 }
738 
739 void
msgInit(void)740 msgInit(void)
741 {
742 	mbox.arendez.l = &mbox.alock;
743 
744 	mbox.rrendez.l = &mbox.rlock;
745 
746 	mbox.maxmsg = NMsgInit;
747 	mbox.maxproc = NMsgProcInit;
748 	mbox.msize = NMsizeInit;
749 
750 	cliAddCmd("msg", cmdMsg);
751 }
752 
753 static int
cmdCon(int argc,char * argv[])754 cmdCon(int argc, char* argv[])
755 {
756 	char *p;
757 	Con *con;
758 	char *usage = "usage: con [-m ncon]";
759 	int maxcon, ncon, nconstarve;
760 
761 	maxcon = 0;
762 
763 	ARGBEGIN{
764 	default:
765 		return cliError(usage);
766 	case 'm':
767 		p = ARGF();
768 		if(p == nil)
769 			return cliError(usage);
770 		maxcon = strtol(argv[0], &p, 0);
771 		if(maxcon <= 0 || p == argv[0] || *p != '\0')
772 			return cliError(usage);
773 		break;
774 	}ARGEND
775 	if(argc)
776 		return cliError(usage);
777 
778 	wlock(&cbox.clock);
779 	if(maxcon)
780 		cbox.maxcon = maxcon;
781 	maxcon = cbox.maxcon;
782 	ncon = cbox.ncon;
783 	nconstarve = cbox.nconstarve;
784 	wunlock(&cbox.clock);
785 
786 	consPrint("\tcon -m %d\n", maxcon);
787 	consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
788 
789 	rlock(&cbox.clock);
790 	for(con = cbox.chead; con != nil; con = con->cnext){
791 		consPrint("\t%s\n", con->name);
792 	}
793 	runlock(&cbox.clock);
794 
795 	return 1;
796 }
797 
798 void
conInit(void)799 conInit(void)
800 {
801 	cbox.arendez.l = &cbox.alock;
802 
803 	cbox.maxcon = NConInit;
804 	cbox.msize = NMsizeInit;
805 
806 	cliAddCmd("con", cmdCon);
807 	cliAddCmd("who", cmdWho);
808 }
809