xref: /plan9/sys/src/libventi/server.c (revision 4f81ea256c7fac55da779894b772fbf5203568fa)
1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include <thread.h>
5 #include "queue.h"
6 
7 enum
8 {
9 	STACK = 8192
10 };
11 
12 typedef struct VtSconn VtSconn;
13 struct VtSconn
14 {
15 	int ctl;
16 	int ref;
17 	QLock lk;
18 	char dir[NETPATHLEN];
19 	VtSrv *srv;
20 	VtConn *c;
21 };
22 
23 struct VtSrv
24 {
25 	int afd;
26 	int dead;
27 	char adir[NETPATHLEN];
28 	Queue *q;	/* Queue(VtReq*) */
29 };
30 
31 static void listenproc(void*);
32 static void connproc(void*);
33 
34 static void
scincref(VtSconn * sc)35 scincref(VtSconn *sc)
36 {
37 	qlock(&sc->lk);
38 	sc->ref++;
39 	qunlock(&sc->lk);
40 }
41 
42 static void
scdecref(VtSconn * sc)43 scdecref(VtSconn *sc)
44 {
45 	qlock(&sc->lk);
46 	if(--sc->ref > 0){
47 		qunlock(&sc->lk);
48 		return;
49 	}
50 	if(sc->c)
51 		vtfreeconn(sc->c);
52 	vtfree(sc);
53 }
54 
55 VtSrv*
vtlisten(char * addr)56 vtlisten(char *addr)
57 {
58 	VtSrv *s;
59 
60 	s = vtmallocz(sizeof(VtSrv));
61 	s->afd = announce(addr, s->adir);
62 	if(s->afd < 0){
63 		free(s);
64 		return nil;
65 	}
66 	s->q = _vtqalloc();
67 	proccreate(listenproc, s, STACK);
68 	return s;
69 }
70 
71 static void
listenproc(void * v)72 listenproc(void *v)
73 {
74 	int ctl;
75 	char dir[NETPATHLEN];
76 	VtSrv *srv;
77 	VtSconn *sc;
78 
79 	srv = v;
80 	for(;;){
81 		ctl = listen(srv->adir, dir);
82 		if(ctl < 0){
83 			srv->dead = 1;
84 			break;
85 		}
86 		sc = vtmallocz(sizeof(VtSconn));
87 		sc->ref = 1;
88 		sc->ctl = ctl;
89 		sc->srv = srv;
90 		strcpy(sc->dir, dir);
91 		proccreate(connproc, sc, STACK);
92 	}
93 
94 	/* hangup */
95 }
96 
97 static void
connproc(void * v)98 connproc(void *v)
99 {
100 	VtSconn *sc;
101 	VtConn *c;
102 	Packet *p;
103 	VtReq *r;
104 	int fd;
105 static int first=1;
106 
107 if(first && chattyventi){
108 	first=0;
109 	fmtinstall('F', vtfcallfmt);
110 }
111 	r = nil;
112 	sc = v;
113 	sc->c = nil;
114 	if(0) fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);
115 	fd = accept(sc->ctl, sc->dir);
116 	close(sc->ctl);
117 	if(fd < 0){
118 		fprint(2, "accept %s: %r\n", sc->dir);
119 		goto out;
120 	}
121 
122 	c = vtconn(fd, fd);
123 	sc->c = c;
124 	if(vtversion(c) < 0){
125 		fprint(2, "vtversion %s: %r\n", sc->dir);
126 		goto out;
127 	}
128 	if(vtsrvhello(c) < 0){
129 		fprint(2, "vtsrvhello %s: %r\n", sc->dir);
130 		goto out;
131 	}
132 
133 	if(0) fprint(2, "new proc %s\n", sc->dir);
134 	proccreate(vtsendproc, c, STACK);
135 	qlock(&c->lk);
136 	while(!c->writeq)
137 		rsleep(&c->rpcfork);
138 	qunlock(&c->lk);
139 
140 	while((p = vtrecv(c)) != nil){
141 		r = vtmallocz(sizeof(VtReq));
142 		if(vtfcallunpack(&r->tx, p) < 0){
143 			vtlog(VtServerLog, "<font size=-1>%T %s:</font> recv bad packet %p: %r<br>\n", c->addr, p);
144 			fprint(2, "bad packet on %s: %r\n", sc->dir);
145 			packetfree(p);
146 			continue;
147 		}
148 		vtlog(VtServerLog, "<font size=-1>%T %s:</font> recv packet %p (%F)<br>\n", c->addr, p, &r->tx);
149 		if(chattyventi)
150 			fprint(2, "%s <- %F\n", argv0, &r->tx);
151 		packetfree(p);
152 		if(r->tx.msgtype == VtTgoodbye)
153 			break;
154 		r->rx.tag = r->tx.tag;
155 		r->sc = sc;
156 		scincref(sc);
157 		if(_vtqsend(sc->srv->q, r) < 0){
158 			scdecref(sc);
159 			fprint(2, "hungup queue\n");
160 			break;
161 		}
162 		r = nil;
163 	}
164 
165 	if(0) fprint(2, "eof on %s\n", sc->dir);
166 
167 out:
168 	if(r){
169 		vtfcallclear(&r->tx);
170 		vtfree(r);
171 	}
172 	if(0) fprint(2, "freed %s\n", sc->dir);
173 	scdecref(sc);
174 	return;
175 }
176 
177 VtReq*
vtgetreq(VtSrv * srv)178 vtgetreq(VtSrv *srv)
179 {
180 	VtReq *r;
181 
182 	r = _vtqrecv(srv->q);
183 	if (r != nil)
184 		vtlog(VtServerLog, "<font size=-1>%T %s:</font> vtgetreq %F<br>\n",
185 			((VtSconn*)r->sc)->c->addr, &r->tx);
186 	return r;
187 }
188 
189 void
vtrespond(VtReq * r)190 vtrespond(VtReq *r)
191 {
192 	Packet *p;
193 	VtSconn *sc;
194 
195 	sc = r->sc;
196 	if(r->rx.tag != r->tx.tag)
197 		abort();
198 	if(r->rx.msgtype != r->tx.msgtype+1 && r->rx.msgtype != VtRerror)
199 		abort();
200 	if(chattyventi)
201 		fprint(2, "%s -> %F\n", argv0, &r->rx);
202 	if((p = vtfcallpack(&r->rx)) == nil){
203 		vtlog(VtServerLog, "%s: vtfcallpack %F: %r<br>\n", sc->c->addr, &r->rx);
204 		fprint(2, "fcallpack on %s: %r\n", sc->dir);
205 		packetfree(p);
206 		vtfcallclear(&r->rx);
207 		return;
208 	}
209 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> send packet %p (%F)<br>\n", sc->c->addr, p, &r->rx);
210 	if(vtsend(sc->c, p) < 0)
211 		fprint(2, "vtsend %F: %r\n", &r->rx);
212 	scdecref(sc);
213 	vtfcallclear(&r->tx);
214 	vtfcallclear(&r->rx);
215 	vtfree(r);
216 }
217 
218