1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 #include "error.h" 5 6 static void diskThread(void *a); 7 8 enum { 9 QueueSize = 100, /* maximum block to queue */ 10 }; 11 12 struct Disk { 13 VtLock *lk; 14 int ref; 15 16 int fd; 17 Header h; 18 19 VtRendez *flow; 20 VtRendez *starve; 21 VtRendez *flush; 22 VtRendez *die; 23 24 int nqueue; 25 26 Block *cur; /* block to do on current scan */ 27 Block *next; /* blocks to do next scan */ 28 }; 29 30 31 Disk * 32 diskAlloc(int fd) 33 { 34 u8int buf[HeaderSize]; 35 Header h; 36 Disk *disk; 37 38 if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){ 39 vtSetError("short read: %r"); 40 vtOSError(); 41 return nil; 42 } 43 44 if(!headerUnpack(&h, buf)){ 45 vtSetError("bad disk header"); 46 return nil; 47 } 48 disk = vtMemAllocZ(sizeof(Disk)); 49 disk->lk = vtLockAlloc(); 50 disk->starve = vtRendezAlloc(disk->lk); 51 disk->flow = vtRendezAlloc(disk->lk); 52 disk->flush = vtRendezAlloc(disk->lk); 53 disk->fd = fd; 54 disk->h = h; 55 56 disk->ref = 2; 57 vtThread(diskThread, disk); 58 59 return disk; 60 } 61 62 void 63 diskFree(Disk *disk) 64 { 65 diskFlush(disk); 66 67 /* kill slave */ 68 vtLock(disk->lk); 69 disk->die = vtRendezAlloc(disk->lk); 70 vtWakeup(disk->starve); 71 while(disk->ref > 1) 72 vtSleep(disk->die); 73 vtUnlock(disk->lk); 74 vtRendezFree(disk->flow); 75 vtRendezFree(disk->starve); 76 vtRendezFree(disk->die); 77 vtLockFree(disk->lk); 78 close(disk->fd); 79 vtMemFree(disk); 80 } 81 82 static u32int 83 partStart(Disk *disk, int part) 84 { 85 switch(part){ 86 default: 87 assert(0); 88 case PartSuper: 89 return disk->h.super; 90 case PartLabel: 91 return disk->h.label; 92 case PartData: 93 return disk->h.data; 94 } 95 } 96 97 98 static u32int 99 partEnd(Disk *disk, int part) 100 { 101 switch(part){ 102 default: 103 assert(0); 104 case PartSuper: 105 return disk->h.super+1; 106 case PartLabel: 107 return disk->h.data; 108 case PartData: 109 return disk->h.end; 110 } 111 } 112 113 int 114 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf) 115 { 116 ulong start, end; 117 u64int offset; 118 int n, nn; 119 120 start = partStart(disk, part); 121 end = partEnd(disk, part); 122 123 if(addr >= end-start){ 124 vtSetError(EBadAddr); 125 return 0; 126 } 127 128 offset = ((u64int)(addr + start))*disk->h.blockSize; 129 n = disk->h.blockSize; 130 while(n > 0){ 131 nn = pread(disk->fd, buf, n, offset); 132 if(nn < 0){ 133 vtOSError(); 134 return 0; 135 } 136 if(nn == 0){ 137 vtSetError("eof reading disk"); 138 return 0; 139 } 140 n -= nn; 141 offset += nn; 142 buf += nn; 143 } 144 return 1; 145 } 146 147 int 148 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf) 149 { 150 ulong start, end; 151 u64int offset; 152 int n; 153 154 start = partStart(disk, part); 155 end = partEnd(disk, part); 156 157 if(addr >= end-start){ 158 vtSetError(EBadAddr); 159 return 0; 160 } 161 162 offset = ((u64int)(addr + start))*disk->h.blockSize; 163 n = pwrite(disk->fd, buf, disk->h.blockSize, offset); 164 if(n < 0){ 165 vtOSError(); 166 return 0; 167 } 168 if(n < disk->h.blockSize) { 169 vtSetError("short write"); 170 return 0; 171 } 172 173 return 1; 174 } 175 176 static void 177 diskQueue(Disk *disk, Block *b) 178 { 179 Block **bp, *bb; 180 181 vtLock(disk->lk); 182 while(disk->nqueue >= QueueSize) 183 vtSleep(disk->flow); 184 if(disk->cur == nil || b->addr > disk->cur->addr) 185 bp = &disk->cur; 186 else 187 bp = &disk->next; 188 189 for(bb=*bp; bb; bb=*bp){ 190 if(b->addr < bb->addr) 191 break; 192 bp = &bb->ionext; 193 } 194 b->ionext = bb; 195 *bp = b; 196 if(disk->nqueue == 0) 197 vtWakeup(disk->starve); 198 disk->nqueue++; 199 vtUnlock(disk->lk); 200 } 201 202 203 void 204 diskRead(Disk *disk, Block *b) 205 { 206 assert(b->iostate == BioEmpty || b->iostate == BioLabel); 207 blockSetIOState(b, BioReading); 208 diskQueue(disk, b); 209 } 210 211 void 212 diskWrite(Disk *disk, Block *b) 213 { 214 assert(b->nlock == 1); 215 assert(b->iostate == BioDirty); 216 blockSetIOState(b, BioWriting); 217 diskQueue(disk, b); 218 } 219 220 void 221 diskWriteAndWait(Disk *disk, Block *b) 222 { 223 int nlock; 224 225 /* 226 * If b->nlock > 1, the block is aliased within 227 * a single thread. That thread is us. 228 * DiskWrite does some funny stuff with VtLock 229 * and blockPut that basically assumes b->nlock==1. 230 * We humor diskWrite by temporarily setting 231 * nlock to 1. This needs to be revisited. 232 */ 233 nlock = b->nlock; 234 if(nlock > 1) 235 b->nlock = 1; 236 diskWrite(disk, b); 237 while(b->iostate != BioClean) 238 vtSleep(b->ioready); 239 b->nlock = nlock; 240 } 241 242 int 243 diskBlockSize(Disk *disk) 244 { 245 return disk->h.blockSize; /* immuttable */ 246 } 247 248 int 249 diskFlush(Disk *disk) 250 { 251 Dir dir; 252 253 vtLock(disk->lk); 254 while(disk->nqueue > 0) 255 vtSleep(disk->flush); 256 vtUnlock(disk->lk); 257 258 /* there really should be a cleaner interface to flush an fd */ 259 nulldir(&dir); 260 if(dirfwstat(disk->fd, &dir) < 0){ 261 vtOSError(); 262 return 0; 263 } 264 return 1; 265 } 266 267 u32int 268 diskSize(Disk *disk, int part) 269 { 270 return partEnd(disk, part) - partStart(disk, part); 271 } 272 273 static ulong 274 mypc(int x) 275 { 276 return getcallerpc(&x); 277 } 278 279 static void 280 diskThread(void *a) 281 { 282 Disk *disk = a; 283 Block *b; 284 uchar *buf, *p; 285 double t; 286 int nio; 287 288 vtThreadSetName("disk"); 289 290 //fprint(2, "diskThread %d\n", getpid()); 291 292 buf = vtMemAlloc(disk->h.blockSize); 293 294 vtLock(disk->lk); 295 nio = 0; 296 t = -nsec(); 297 for(;;){ 298 while(disk->nqueue == 0){ 299 t += nsec(); 300 //if(nio >= 10000){ 301 //fprint(2, "disk: io=%d at %.3fms\n", nio, t*1e-6/nio); 302 //nio = 0; 303 //t = 0.; 304 //} 305 if(disk->die != nil) 306 goto Done; 307 vtSleep(disk->starve); 308 t -= nsec(); 309 } 310 assert(disk->cur != nil || disk->next != nil); 311 312 if(disk->cur == nil){ 313 disk->cur = disk->next; 314 disk->next = nil; 315 } 316 b = disk->cur; 317 disk->cur = b->ionext; 318 vtUnlock(disk->lk); 319 320 /* 321 * no one should hold onto blocking in the 322 * reading or writing state, so this lock should 323 * not cause deadlock. 324 */ 325 if(0)fprint(2, "diskThread: %d:%d %x\n", getpid(), b->part, b->addr); 326 bwatchLock(b); 327 vtLock(b->lk); 328 b->pc = mypc(0); 329 assert(b->nlock == 1); 330 switch(b->iostate){ 331 default: 332 abort(); 333 case BioReading: 334 if(!diskReadRaw(disk, b->part, b->addr, b->data)){ 335 fprint(2, "diskReadRaw failed: part=%d addr=%ux: %r\n", b->part, b->addr); 336 blockSetIOState(b, BioReadError); 337 }else 338 blockSetIOState(b, BioClean); 339 break; 340 case BioWriting: 341 p = blockRollback(b, buf); 342 if(!diskWriteRaw(disk, b->part, b->addr, p)){ 343 fprint(2, "diskWriteRaw failed: date=%s part=%d addr=%ux: %r\n", ctime(times(0)), b->part, b->addr); 344 break; 345 } 346 if(p != buf) 347 blockSetIOState(b, BioClean); 348 else 349 blockSetIOState(b, BioDirty); 350 break; 351 } 352 353 blockPut(b); /* remove extra reference, unlock */ 354 vtLock(disk->lk); 355 disk->nqueue--; 356 if(disk->nqueue == QueueSize-1) 357 vtWakeup(disk->flow); 358 if(disk->nqueue == 0) 359 vtWakeup(disk->flush); 360 nio++; 361 } 362 Done: 363 //fprint(2, "diskThread done\n"); 364 disk->ref--; 365 vtWakeup(disk->die); 366 vtUnlock(disk->lk); 367 vtMemFree(buf); 368 } 369