1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 #include "error.h" 5 6 static void diskThread(void *a); 7 8 enum { 9 /* 10 * disable measurement since it gets alignment faults on BG 11 * and the guts used to be commented out. 12 */ 13 Timing = 0, /* flag */ 14 QueueSize = 100, /* maximum block to queue */ 15 }; 16 17 struct Disk { 18 VtLock *lk; 19 int ref; 20 21 int fd; 22 Header h; 23 24 VtRendez *flow; 25 VtRendez *starve; 26 VtRendez *flush; 27 VtRendez *die; 28 29 int nqueue; 30 31 Block *cur; /* block to do on current scan */ 32 Block *next; /* blocks to do next scan */ 33 }; 34 35 /* keep in sync with Part* enum in dat.h */ 36 static char *partname[] = { 37 [PartError] "error", 38 [PartSuper] "super", 39 [PartLabel] "label", 40 [PartData] "data", 41 [PartVenti] "venti", 42 }; 43 44 Disk * 45 diskAlloc(int fd) 46 { 47 u8int buf[HeaderSize]; 48 Header h; 49 Disk *disk; 50 51 if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){ 52 vtSetError("short read: %r"); 53 vtOSError(); 54 return nil; 55 } 56 57 if(!headerUnpack(&h, buf)){ 58 vtSetError("bad disk header"); 59 return nil; 60 } 61 disk = vtMemAllocZ(sizeof(Disk)); 62 disk->lk = vtLockAlloc(); 63 disk->starve = vtRendezAlloc(disk->lk); 64 disk->flow = vtRendezAlloc(disk->lk); 65 disk->flush = vtRendezAlloc(disk->lk); 66 disk->fd = fd; 67 disk->h = h; 68 69 disk->ref = 2; 70 vtThread(diskThread, disk); 71 72 return disk; 73 } 74 75 void 76 diskFree(Disk *disk) 77 { 78 diskFlush(disk); 79 80 /* kill slave */ 81 vtLock(disk->lk); 82 disk->die = vtRendezAlloc(disk->lk); 83 vtWakeup(disk->starve); 84 while(disk->ref > 1) 85 vtSleep(disk->die); 86 vtUnlock(disk->lk); 87 vtRendezFree(disk->flow); 88 vtRendezFree(disk->starve); 89 vtRendezFree(disk->die); 90 vtLockFree(disk->lk); 91 close(disk->fd); 92 vtMemFree(disk); 93 } 94 95 static u32int 96 partStart(Disk *disk, int part) 97 { 98 switch(part){ 99 default: 100 assert(0); 101 case PartSuper: 102 return disk->h.super; 103 case PartLabel: 104 return disk->h.label; 105 case PartData: 106 return disk->h.data; 107 } 108 } 109 110 111 static u32int 112 partEnd(Disk *disk, int part) 113 { 114 switch(part){ 115 default: 116 assert(0); 117 case PartSuper: 118 return disk->h.super+1; 119 case PartLabel: 120 return disk->h.data; 121 case PartData: 122 return disk->h.end; 123 } 124 } 125 126 int 127 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf) 128 { 129 ulong start, end; 130 u64int offset; 131 int n, nn; 132 133 start = partStart(disk, part); 134 end = partEnd(disk, part); 135 136 if(addr >= end-start){ 137 vtSetError(EBadAddr); 138 return 0; 139 } 140 141 offset = ((u64int)(addr + start))*disk->h.blockSize; 142 n = disk->h.blockSize; 143 while(n > 0){ 144 nn = pread(disk->fd, buf, n, offset); 145 if(nn < 0){ 146 vtOSError(); 147 return 0; 148 } 149 if(nn == 0){ 150 vtSetError("eof reading disk"); 151 return 0; 152 } 153 n -= nn; 154 offset += nn; 155 buf += nn; 156 } 157 return 1; 158 } 159 160 int 161 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf) 162 { 163 ulong start, end; 164 u64int offset; 165 int n; 166 167 start = partStart(disk, part); 168 end = partEnd(disk, part); 169 170 if(addr >= end - start){ 171 vtSetError(EBadAddr); 172 return 0; 173 } 174 175 offset = ((u64int)(addr + start))*disk->h.blockSize; 176 n = pwrite(disk->fd, buf, disk->h.blockSize, offset); 177 if(n < 0){ 178 vtOSError(); 179 return 0; 180 } 181 if(n < disk->h.blockSize) { 182 vtSetError("short write"); 183 return 0; 184 } 185 186 return 1; 187 } 188 189 static void 190 diskQueue(Disk *disk, Block *b) 191 { 192 Block **bp, *bb; 193 194 vtLock(disk->lk); 195 while(disk->nqueue >= QueueSize) 196 vtSleep(disk->flow); 197 if(disk->cur == nil || b->addr > disk->cur->addr) 198 bp = &disk->cur; 199 else 200 bp = &disk->next; 201 202 for(bb=*bp; bb; bb=*bp){ 203 if(b->addr < bb->addr) 204 break; 205 bp = &bb->ionext; 206 } 207 b->ionext = bb; 208 *bp = b; 209 if(disk->nqueue == 0) 210 vtWakeup(disk->starve); 211 disk->nqueue++; 212 vtUnlock(disk->lk); 213 } 214 215 216 void 217 diskRead(Disk *disk, Block *b) 218 { 219 assert(b->iostate == BioEmpty || b->iostate == BioLabel); 220 blockSetIOState(b, BioReading); 221 diskQueue(disk, b); 222 } 223 224 void 225 diskWrite(Disk *disk, Block *b) 226 { 227 assert(b->nlock == 1); 228 assert(b->iostate == BioDirty); 229 blockSetIOState(b, BioWriting); 230 diskQueue(disk, b); 231 } 232 233 void 234 diskWriteAndWait(Disk *disk, Block *b) 235 { 236 int nlock; 237 238 /* 239 * If b->nlock > 1, the block is aliased within 240 * a single thread. That thread is us. 241 * DiskWrite does some funny stuff with VtLock 242 * and blockPut that basically assumes b->nlock==1. 243 * We humor diskWrite by temporarily setting 244 * nlock to 1. This needs to be revisited. 245 */ 246 nlock = b->nlock; 247 if(nlock > 1) 248 b->nlock = 1; 249 diskWrite(disk, b); 250 while(b->iostate != BioClean) 251 vtSleep(b->ioready); 252 b->nlock = nlock; 253 } 254 255 int 256 diskBlockSize(Disk *disk) 257 { 258 return disk->h.blockSize; /* immuttable */ 259 } 260 261 int 262 diskFlush(Disk *disk) 263 { 264 Dir dir; 265 266 vtLock(disk->lk); 267 while(disk->nqueue > 0) 268 vtSleep(disk->flush); 269 vtUnlock(disk->lk); 270 271 /* there really should be a cleaner interface to flush an fd */ 272 nulldir(&dir); 273 if(dirfwstat(disk->fd, &dir) < 0){ 274 vtOSError(); 275 return 0; 276 } 277 return 1; 278 } 279 280 u32int 281 diskSize(Disk *disk, int part) 282 { 283 return partEnd(disk, part) - partStart(disk, part); 284 } 285 286 static uintptr 287 mypc(int x) 288 { 289 return getcallerpc(&x); 290 } 291 292 static char * 293 disk2file(Disk *disk) 294 { 295 static char buf[256]; 296 297 if (fd2path(disk->fd, buf, sizeof buf) < 0) 298 strncpy(buf, "GOK", sizeof buf); 299 return buf; 300 } 301 302 static void 303 diskThread(void *a) 304 { 305 Disk *disk = a; 306 Block *b; 307 uchar *buf, *p; 308 double t; 309 int nio; 310 311 vtThreadSetName("disk"); 312 313 //fprint(2, "diskThread %d\n", getpid()); 314 315 buf = vtMemAlloc(disk->h.blockSize); 316 317 vtLock(disk->lk); 318 if (Timing) { 319 nio = 0; 320 t = -nsec(); 321 } 322 for(;;){ 323 while(disk->nqueue == 0){ 324 if (Timing) { 325 t += nsec(); 326 if(nio >= 10000){ 327 fprint(2, "disk: io=%d at %.3fms\n", 328 nio, t*1e-6/nio); 329 nio = 0; 330 t = 0; 331 } 332 } 333 if(disk->die != nil) 334 goto Done; 335 vtSleep(disk->starve); 336 if (Timing) 337 t -= nsec(); 338 } 339 assert(disk->cur != nil || disk->next != nil); 340 341 if(disk->cur == nil){ 342 disk->cur = disk->next; 343 disk->next = nil; 344 } 345 b = disk->cur; 346 disk->cur = b->ionext; 347 vtUnlock(disk->lk); 348 349 /* 350 * no one should hold onto blocking in the 351 * reading or writing state, so this lock should 352 * not cause deadlock. 353 */ 354 if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr); 355 bwatchLock(b); 356 vtLock(b->lk); 357 b->pc = mypc(0); 358 assert(b->nlock == 1); 359 switch(b->iostate){ 360 default: 361 abort(); 362 case BioReading: 363 if(!diskReadRaw(disk, b->part, b->addr, b->data)){ 364 fprint(2, "fossil: diskReadRaw failed: %s: " 365 "score %V: part=%s block %ud: %r\n", 366 disk2file(disk), b->score, 367 partname[b->part], b->addr); 368 blockSetIOState(b, BioReadError); 369 }else 370 blockSetIOState(b, BioClean); 371 break; 372 case BioWriting: 373 p = blockRollback(b, buf); 374 /* NB: ctime result ends with a newline */ 375 if(!diskWriteRaw(disk, b->part, b->addr, p)){ 376 fprint(2, "fossil: diskWriteRaw failed: %s: " 377 "score %V: date %s part=%s block %ud: %r\n", 378 disk2file(disk), b->score, 379 ctime(time(0)), 380 partname[b->part], b->addr); 381 break; 382 } 383 if(p != buf) 384 blockSetIOState(b, BioClean); 385 else 386 blockSetIOState(b, BioDirty); 387 break; 388 } 389 390 blockPut(b); /* remove extra reference, unlock */ 391 vtLock(disk->lk); 392 disk->nqueue--; 393 if(disk->nqueue == QueueSize-1) 394 vtWakeup(disk->flow); 395 if(disk->nqueue == 0) 396 vtWakeup(disk->flush); 397 if(Timing) 398 nio++; 399 } 400 Done: 401 //fprint(2, "diskThread done\n"); 402 disk->ref--; 403 vtWakeup(disk->die); 404 vtUnlock(disk->lk); 405 vtMemFree(buf); 406 } 407