1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 #include "error.h" 5 6 static void diskThread(void *a); 7 8 enum { 9 QueueSize = 100, /* maximum block to queue */ 10 }; 11 12 struct Disk { 13 VtLock *lk; 14 int ref; 15 16 int fd; 17 Header h; 18 19 VtRendez *flow; 20 VtRendez *starve; 21 VtRendez *flush; 22 VtRendez *die; 23 24 int nqueue; 25 26 Block *cur; /* block to do on current scan */ 27 Block *next; /* blocks to do next scan */ 28 }; 29 30 31 Disk * 32 diskAlloc(int fd) 33 { 34 u8int buf[HeaderSize]; 35 Header h; 36 Disk *disk; 37 38 if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){ 39 vtOSError(); 40 return nil; 41 } 42 43 if(!headerUnpack(&h, buf)) 44 return nil; 45 disk = vtMemAllocZ(sizeof(Disk)); 46 disk->lk = vtLockAlloc(); 47 disk->starve = vtRendezAlloc(disk->lk); 48 disk->flow = vtRendezAlloc(disk->lk); 49 disk->flush = vtRendezAlloc(disk->lk); 50 disk->fd = fd; 51 disk->h = h; 52 53 disk->ref = 2; 54 vtThread(diskThread, disk); 55 56 return disk; 57 } 58 59 void 60 diskFree(Disk *disk) 61 { 62 diskFlush(disk); 63 64 /* kill slave */ 65 vtLock(disk->lk); 66 disk->die = vtRendezAlloc(disk->lk); 67 vtWakeup(disk->starve); 68 while(disk->ref > 1) 69 vtSleep(disk->die); 70 vtUnlock(disk->lk); 71 vtRendezFree(disk->flow); 72 vtRendezFree(disk->starve); 73 vtRendezFree(disk->die); 74 vtLockFree(disk->lk); 75 close(disk->fd); 76 vtMemFree(disk); 77 } 78 79 static u32int 80 partStart(Disk *disk, int part) 81 { 82 switch(part){ 83 default: 84 assert(0); 85 case PartSuper: 86 return disk->h.super; 87 case PartLabel: 88 return disk->h.label; 89 case PartData: 90 return disk->h.data; 91 } 92 } 93 94 95 static u32int 96 partEnd(Disk *disk, int part) 97 { 98 switch(part){ 99 default: 100 assert(0); 101 case PartSuper: 102 return disk->h.super+1; 103 case PartLabel: 104 return disk->h.data; 105 case PartData: 106 return disk->h.end; 107 } 108 } 109 110 int 111 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf) 112 { 113 ulong start, end; 114 u64int offset; 115 int n, nn; 116 117 start = partStart(disk, part); 118 end = partEnd(disk, part); 119 120 if(addr >= end-start){ 121 vtSetError(EBadAddr); 122 return 0; 123 } 124 125 offset = ((u64int)(addr + start))*disk->h.blockSize; 126 n = disk->h.blockSize; 127 while(n > 0){ 128 nn = pread(disk->fd, buf, n, offset); 129 if(nn < 0){ 130 vtOSError(); 131 return 0; 132 } 133 if(nn == 0){ 134 vtSetError(EIO); 135 return 0; 136 } 137 n -= nn; 138 offset += nn; 139 buf += nn; 140 } 141 return 1; 142 } 143 144 int 145 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf) 146 { 147 ulong start, end; 148 u64int offset; 149 int n; 150 151 start = partStart(disk, part); 152 end = partEnd(disk, part); 153 154 if(addr >= end-start){ 155 vtSetError(EBadAddr); 156 return 0; 157 } 158 159 offset = ((u64int)(addr + start))*disk->h.blockSize; 160 n = pwrite(disk->fd, buf, disk->h.blockSize, offset); 161 if(n < 0){ 162 vtOSError(); 163 return 0; 164 } 165 if(n < disk->h.blockSize) { 166 vtSetError("short write"); 167 return 0; 168 } 169 170 return 1; 171 } 172 173 static void 174 diskQueue(Disk *disk, Block *b) 175 { 176 Block **bp, *bb; 177 178 vtLock(disk->lk); 179 while(disk->nqueue >= QueueSize) 180 vtSleep(disk->flow); 181 if(disk->cur == nil || b->addr > disk->cur->addr) 182 bp = &disk->cur; 183 else 184 bp = &disk->next; 185 186 for(bb=*bp; bb; bb=*bp){ 187 if(b->addr < bb->addr) 188 break; 189 bp = &bb->ionext; 190 } 191 b->ionext = bb; 192 *bp = b; 193 if(disk->nqueue == 0) 194 vtWakeup(disk->starve); 195 disk->nqueue++; 196 vtUnlock(disk->lk); 197 } 198 199 200 void 201 diskRead(Disk *disk, Block *b) 202 { 203 assert(b->iostate == BioEmpty || b->iostate == BioLabel); 204 blockSetIOState(b, BioReading); 205 diskQueue(disk, b); 206 } 207 208 void 209 diskWrite(Disk *disk, Block *b) 210 { 211 assert(b->iostate == BioDirty); 212 blockSetIOState(b, BioWriting); 213 diskQueue(disk, b); 214 } 215 216 int 217 diskBlockSize(Disk *disk) 218 { 219 return disk->h.blockSize; /* immuttable */ 220 } 221 222 int 223 diskFlush(Disk *disk) 224 { 225 Dir dir; 226 227 vtLock(disk->lk); 228 while(disk->nqueue > 0) 229 vtSleep(disk->flush); 230 vtUnlock(disk->lk); 231 232 /* there really should be a cleaner interface to flush an fd */ 233 nulldir(&dir); 234 if(dirfwstat(disk->fd, &dir) < 0){ 235 vtOSError(); 236 return 0; 237 } 238 return 1; 239 } 240 241 u32int 242 diskSize(Disk *disk, int part) 243 { 244 return partEnd(disk, part) - partStart(disk, part); 245 } 246 247 static void 248 diskThread(void *a) 249 { 250 Disk *disk = a; 251 Block *b; 252 uchar *buf, *p; 253 double t; 254 int nio; 255 256 vtThreadSetName("disk"); 257 258 fprint(2, "diskThread %d\n", getpid()); 259 260 buf = vtMemAlloc(disk->h.blockSize); 261 262 vtLock(disk->lk); 263 nio = 0; 264 t = -nsec(); 265 for(;;){ 266 while(disk->nqueue == 0){ 267 t += nsec(); 268 if(nio >= 10000){ 269 fprint(2, "disk: io=%d at %.3fms\n", nio, t*1e-6/nio); 270 nio = 0; 271 t = 0.; 272 } 273 if(disk->die != nil) 274 goto Done; 275 vtSleep(disk->starve); 276 t -= nsec(); 277 } 278 assert(disk->cur != nil || disk->next != nil); 279 280 if(disk->cur == nil){ 281 disk->cur = disk->next; 282 disk->next = nil; 283 } 284 b = disk->cur; 285 disk->cur = b->ionext; 286 vtUnlock(disk->lk); 287 288 /* 289 * no one should hold onto blocking in the 290 * reading or writing state, so this lock should 291 * not cause deadlock. 292 */ 293 if(0)fprint(2, "diskThread: %d:%d %x\n", getpid(), b->part, b->addr); 294 bwatchLock(b); 295 vtLock(b->lk); 296 assert(b->nlock == 1); 297 298 switch(b->iostate){ 299 default: 300 abort(); 301 case BioReading: 302 if(!diskReadRaw(disk, b->part, b->addr, b->data)){ 303 fprint(2, "diskReadRaw failed: part=%d addr=%ux: %r\n", b->part, b->addr); 304 blockSetIOState(b, BioReadError); 305 }else 306 blockSetIOState(b, BioClean); 307 break; 308 case BioWriting: 309 p = blockRollback(b, buf); 310 if(!diskWriteRaw(disk, b->part, b->addr, p)){ 311 fprint(2, "diskWriteRaw failed: date=%s part=%d addr=%ux: %r\n", ctime(times(0)), b->part, b->addr); 312 break; 313 } 314 if(p != buf) 315 blockSetIOState(b, BioClean); 316 else 317 blockSetIOState(b, BioDirty); 318 break; 319 } 320 321 blockPut(b); /* remove extra reference, unlock */ 322 vtLock(disk->lk); 323 disk->nqueue--; 324 if(disk->nqueue == QueueSize-1) 325 vtWakeup(disk->flow); 326 if(disk->nqueue == 0) 327 vtWakeup(disk->flush); 328 nio++; 329 } 330 Done: 331 fprint(2, "diskThread done\n"); 332 disk->ref--; 333 vtWakeup(disk->die); 334 vtUnlock(disk->lk); 335 vtMemFree(buf); 336 } 337