1 #include <u.h> 2 #include <libc.h> 3 #include <venti.h> 4 #include "queue.h" 5 6 long ventisendbytes, ventisendpackets; 7 long ventirecvbytes, ventirecvpackets; 8 9 static int 10 _vtsend(VtConn *z, Packet *p) 11 { 12 IOchunk ioc; 13 int n, tot; 14 uchar buf[2]; 15 16 if(z->state != VtStateConnected) { 17 werrstr("session not connected"); 18 return -1; 19 } 20 21 /* add framing */ 22 n = packetsize(p); 23 if(n >= (1<<16)) { 24 werrstr("packet too large"); 25 packetfree(p); 26 return -1; 27 } 28 buf[0] = n>>8; 29 buf[1] = n; 30 packetprefix(p, buf, 2); 31 ventisendbytes += n+2; 32 ventisendpackets++; 33 34 tot = 0; 35 for(;;){ 36 n = packetfragments(p, &ioc, 1, 0); 37 if(n == 0) 38 break; 39 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){ 40 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p); 41 packetfree(p); 42 return 0; 43 } 44 packetconsume(p, nil, ioc.len); 45 tot += ioc.len; 46 } 47 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot); 48 packetfree(p); 49 return 1; 50 } 51 52 static int 53 interrupted(void) 54 { 55 char e[ERRMAX]; 56 57 rerrstr(e, sizeof e); 58 return strstr(e, "interrupted") != nil; 59 } 60 61 62 static Packet* 63 _vtrecv(VtConn *z) 64 { 65 uchar buf[10], *b; 66 int n; 67 Packet *p; 68 int size, len; 69 70 if(z->state != VtStateConnected) { 71 werrstr("session not connected"); 72 return nil; 73 } 74 75 p = z->part; 76 /* get enough for head size */ 77 size = packetsize(p); 78 while(size < 2) { 79 b = packettrailer(p, 2); 80 assert(b != nil); 81 if(0) fprint(2, "%d read hdr\n", getpid()); 82 n = read(z->infd, b, 2); 83 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); 84 if(n==0 || (n<0 && !interrupted())) 85 goto Err; 86 size += n; 87 packettrim(p, 0, size); 88 } 89 90 if(packetconsume(p, buf, 2) < 0) 91 goto Err; 92 len = (buf[0] << 8) | buf[1]; 93 size -= 2; 94 95 while(size < len) { 96 n = len - size; 97 if(n > MaxFragSize) 98 n = MaxFragSize; 99 b = packettrailer(p, n); 100 if(0) fprint(2, "%d read body %d\n", getpid(), n); 101 n = read(z->infd, b, n); 102 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); 103 if(n > 0) 104 size += n; 105 packettrim(p, 0, size); 106 if(n==0 || (n<0 && !interrupted())) 107 goto Err; 108 } 109 ventirecvbytes += len; 110 ventirecvpackets++; 111 p = packetsplit(p, len); 112 vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len); 113 return p; 114 Err: 115 vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr); 116 return nil; 117 } 118 119 /* 120 * If you fork off two procs running vtrecvproc and vtsendproc, 121 * then vtrecv/vtsend (and thus vtrpc) will never block except on 122 * rendevouses, which is nice when it's running in one thread of many. 123 */ 124 void 125 vtrecvproc(void *v) 126 { 127 Packet *p; 128 VtConn *z; 129 Queue *q; 130 131 z = v; 132 q = _vtqalloc(); 133 134 qlock(&z->lk); 135 z->readq = q; 136 qlock(&z->inlk); 137 rwakeup(&z->rpcfork); 138 qunlock(&z->lk); 139 140 while((p = _vtrecv(z)) != nil) 141 if(_vtqsend(q, p) < 0){ 142 packetfree(p); 143 break; 144 } 145 qunlock(&z->inlk); 146 qlock(&z->lk); 147 _vtqhangup(q); 148 while((p = _vtnbqrecv(q)) != nil) 149 packetfree(p); 150 _vtqdecref(q); 151 z->readq = nil; 152 rwakeup(&z->rpcfork); 153 qunlock(&z->lk); 154 vthangup(z); 155 } 156 157 void 158 vtsendproc(void *v) 159 { 160 Queue *q; 161 Packet *p; 162 VtConn *z; 163 164 z = v; 165 q = _vtqalloc(); 166 167 qlock(&z->lk); 168 z->writeq = q; 169 qlock(&z->outlk); 170 rwakeup(&z->rpcfork); 171 qunlock(&z->lk); 172 173 while((p = _vtqrecv(q)) != nil) 174 if(_vtsend(z, p) < 0) 175 break; 176 qunlock(&z->outlk); 177 qlock(&z->lk); 178 _vtqhangup(q); 179 while((p = _vtnbqrecv(q)) != nil) 180 packetfree(p); 181 _vtqdecref(q); 182 z->writeq = nil; 183 rwakeup(&z->rpcfork); 184 qunlock(&z->lk); 185 return; 186 } 187 188 Packet* 189 vtrecv(VtConn *z) 190 { 191 Packet *p; 192 Queue *q; 193 194 qlock(&z->lk); 195 if(z->state != VtStateConnected){ 196 werrstr("not connected"); 197 qunlock(&z->lk); 198 return nil; 199 } 200 if(z->readq){ 201 q = _vtqincref(z->readq); 202 qunlock(&z->lk); 203 p = _vtqrecv(q); 204 _vtqdecref(q); 205 return p; 206 } 207 208 qlock(&z->inlk); 209 qunlock(&z->lk); 210 p = _vtrecv(z); 211 qunlock(&z->inlk); 212 if(!p) 213 vthangup(z); 214 return p; 215 } 216 217 int 218 vtsend(VtConn *z, Packet *p) 219 { 220 Queue *q; 221 222 qlock(&z->lk); 223 if(z->state != VtStateConnected){ 224 packetfree(p); 225 werrstr("not connected"); 226 qunlock(&z->lk); 227 return -1; 228 } 229 if(z->writeq){ 230 q = _vtqincref(z->writeq); 231 qunlock(&z->lk); 232 if(_vtqsend(q, p) < 0){ 233 _vtqdecref(q); 234 packetfree(p); 235 return -1; 236 } 237 _vtqdecref(q); 238 return 0; 239 } 240 241 qlock(&z->outlk); 242 qunlock(&z->lk); 243 if(_vtsend(z, p) < 0){ 244 qunlock(&z->outlk); 245 vthangup(z); 246 return -1; 247 } 248 qunlock(&z->outlk); 249 return 0; 250 } 251 252