1*5184Sek110237 /* 2*5184Sek110237 * CDDL HEADER START 3*5184Sek110237 * 4*5184Sek110237 * The contents of this file are subject to the terms of the 5*5184Sek110237 * Common Development and Distribution License (the "License"). 6*5184Sek110237 * You may not use this file except in compliance with the License. 7*5184Sek110237 * 8*5184Sek110237 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9*5184Sek110237 * or http://www.opensolaris.org/os/licensing. 10*5184Sek110237 * See the License for the specific language governing permissions 11*5184Sek110237 * and limitations under the License. 12*5184Sek110237 * 13*5184Sek110237 * When distributing Covered Code, include this CDDL HEADER in each 14*5184Sek110237 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15*5184Sek110237 * If applicable, add the following below this CDDL HEADER, with the 16*5184Sek110237 * fields enclosed by brackets "[]" replaced with your own identifying 17*5184Sek110237 * information: Portions Copyright [yyyy] [name of copyright owner] 18*5184Sek110237 * 19*5184Sek110237 * CDDL HEADER END 20*5184Sek110237 */ 21*5184Sek110237 /* 22*5184Sek110237 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 23*5184Sek110237 * Use is subject to license terms. 24*5184Sek110237 */ 25*5184Sek110237 26*5184Sek110237 #pragma ident "%Z%%M% %I% %E% SMI" 27*5184Sek110237 28*5184Sek110237 #include "config.h" 29*5184Sek110237 30*5184Sek110237 #include <sys/types.h> 31*5184Sek110237 #ifdef HAVE_SYS_ASYNCH_H 32*5184Sek110237 #include <sys/asynch.h> 33*5184Sek110237 #endif 34*5184Sek110237 #include <sys/ipc.h> 35*5184Sek110237 #include <sys/sem.h> 36*5184Sek110237 #include <sys/errno.h> 37*5184Sek110237 #include <sys/time.h> 38*5184Sek110237 #include <inttypes.h> 39*5184Sek110237 #include <fcntl.h> 40*5184Sek110237 41*5184Sek110237 #ifdef HAVE_UTILITY_H 42*5184Sek110237 #include <utility.h> 43*5184Sek110237 #endif /* HAVE_UTILITY_H */ 44*5184Sek110237 45*5184Sek110237 #ifdef HAVE_AIO 46*5184Sek110237 #include <aio.h> 47*5184Sek110237 #endif /* HAVE_AIO */ 48*5184Sek110237 49*5184Sek110237 #ifdef HAVE_LIBAIO_H 50*5184Sek110237 #include <libaio.h> 51*5184Sek110237 #endif /* HAVE_LIBAIO_H */ 52*5184Sek110237 53*5184Sek110237 #ifdef HAVE_SYS_ASYNC_H 54*5184Sek110237 #include <sys/asynch.h> 55*5184Sek110237 #endif /* HAVE_SYS_ASYNC_H */ 56*5184Sek110237 57*5184Sek110237 #ifdef HAVE_AIO_H 58*5184Sek110237 #include <aio.h> 59*5184Sek110237 #endif /* HAVE_AIO_H */ 60*5184Sek110237 61*5184Sek110237 #ifndef HAVE_UINT_T 62*5184Sek110237 #define uint_t unsigned int 63*5184Sek110237 #endif /* HAVE_UINT_T */ 64*5184Sek110237 65*5184Sek110237 #ifndef HAVE_AIOCB64_T 66*5184Sek110237 #define aiocb64 aiocb 67*5184Sek110237 #endif /* HAVE_AIOCB64_T */ 68*5184Sek110237 69*5184Sek110237 #ifndef HAVE_SYSV_SEM 70*5184Sek110237 #include <semaphore.h> 71*5184Sek110237 #endif /* HAVE_SYSV_SEM */ 72*5184Sek110237 73*5184Sek110237 #include "filebench.h" 74*5184Sek110237 #include "flowop.h" 75*5184Sek110237 #include "fileset.h" 76*5184Sek110237 77*5184Sek110237 /* 78*5184Sek110237 * These routines implement the flowops from the f language. Each 79*5184Sek110237 * flowop has has a name such as "read", and a set of function pointers 80*5184Sek110237 * to call for initialization, execution and destruction of the flowop. 81*5184Sek110237 * The table flowoplib_funcs[] contains a flowoplib struct for each 82*5184Sek110237 * implemented flowop. Most flowops use a generic initialization function 83*5184Sek110237 * and all currently use a generic destruction function. All flowop 84*5184Sek110237 * functions referenced from the table are in this file, though, of 85*5184Sek110237 * course, they often call functions from other files. 86*5184Sek110237 * 87*5184Sek110237 * The flowop_init() routine uses the flowoplib_funcs[] table to 88*5184Sek110237 * create an initial set of "instance 0" flowops, one for each type of 89*5184Sek110237 * flowop, from which all other flowops are derived. These "instance 0" 90*5184Sek110237 * flowops are initialized with information from the table including 91*5184Sek110237 * pointers for their fo_init, fo_func and fo_destroy functions. When 92*5184Sek110237 * a flowop definition is encountered in an f language script, the 93*5184Sek110237 * "type" of flowop, such as "read" is used to search for the 94*5184Sek110237 * "instance 0" flowop named "read", then a new flowop is allocated 95*5184Sek110237 * which inherits its function pointers and other initial properties 96*5184Sek110237 * from the instance 0 flowop, and is given a new name as specified 97*5184Sek110237 * by the "name=" attribute. 98*5184Sek110237 */ 99*5184Sek110237 100*5184Sek110237 static int flowoplib_init_generic(flowop_t *flowop); 101*5184Sek110237 static void flowoplib_destruct_generic(flowop_t *flowop); 102*5184Sek110237 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop); 103*5184Sek110237 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop); 104*5184Sek110237 #ifdef HAVE_AIO 105*5184Sek110237 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop); 106*5184Sek110237 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop); 107*5184Sek110237 #endif 108*5184Sek110237 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop); 109*5184Sek110237 static int flowoplib_block_init(flowop_t *flowop); 110*5184Sek110237 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop); 111*5184Sek110237 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop); 112*5184Sek110237 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop); 113*5184Sek110237 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop); 114*5184Sek110237 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop); 115*5184Sek110237 static int flowoplib_sempost_init(flowop_t *flowop); 116*5184Sek110237 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop); 117*5184Sek110237 static int flowoplib_semblock_init(flowop_t *flowop); 118*5184Sek110237 static void flowoplib_semblock_destruct(flowop_t *flowop); 119*5184Sek110237 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop); 120*5184Sek110237 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop); 121*5184Sek110237 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop); 122*5184Sek110237 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop); 123*5184Sek110237 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop); 124*5184Sek110237 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd); 125*5184Sek110237 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop); 126*5184Sek110237 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop); 127*5184Sek110237 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop); 128*5184Sek110237 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop); 129*5184Sek110237 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop); 130*5184Sek110237 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop); 131*5184Sek110237 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop); 132*5184Sek110237 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop); 133*5184Sek110237 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop); 134*5184Sek110237 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop); 135*5184Sek110237 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop); 136*5184Sek110237 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop); 137*5184Sek110237 138*5184Sek110237 typedef struct flowoplib { 139*5184Sek110237 int fl_type; 140*5184Sek110237 int fl_attrs; 141*5184Sek110237 char *fl_name; 142*5184Sek110237 int (*fl_init)(); 143*5184Sek110237 int (*fl_func)(); 144*5184Sek110237 void (*fl_destruct)(); 145*5184Sek110237 } flowoplib_t; 146*5184Sek110237 147*5184Sek110237 static flowoplib_t flowoplib_funcs[] = { 148*5184Sek110237 FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic, 149*5184Sek110237 flowoplib_write, flowoplib_destruct_generic, 150*5184Sek110237 FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic, 151*5184Sek110237 flowoplib_read, flowoplib_destruct_generic, 152*5184Sek110237 #ifdef HAVE_AIO 153*5184Sek110237 FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic, 154*5184Sek110237 flowoplib_aiowrite, flowoplib_destruct_generic, 155*5184Sek110237 FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic, 156*5184Sek110237 flowoplib_aiowait, flowoplib_destruct_generic, 157*5184Sek110237 #endif 158*5184Sek110237 FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init, 159*5184Sek110237 flowoplib_block, flowoplib_destruct_generic, 160*5184Sek110237 FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic, 161*5184Sek110237 flowoplib_wakeup, flowoplib_destruct_generic, 162*5184Sek110237 FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init, 163*5184Sek110237 flowoplib_semblock, flowoplib_semblock_destruct, 164*5184Sek110237 FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init, 165*5184Sek110237 flowoplib_sempost, flowoplib_destruct_generic, 166*5184Sek110237 FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic, 167*5184Sek110237 flowoplib_hog, flowoplib_destruct_generic, 168*5184Sek110237 FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic, 169*5184Sek110237 flowoplib_delay, flowoplib_destruct_generic, 170*5184Sek110237 FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic, 171*5184Sek110237 flowoplib_eventlimit, flowoplib_destruct_generic, 172*5184Sek110237 FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic, 173*5184Sek110237 flowoplib_bwlimit, flowoplib_destruct_generic, 174*5184Sek110237 FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic, 175*5184Sek110237 flowoplib_iopslimit, flowoplib_destruct_generic, 176*5184Sek110237 FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic, 177*5184Sek110237 flowoplib_opslimit, flowoplib_destruct_generic, 178*5184Sek110237 FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic, 179*5184Sek110237 flowoplib_finishoncount, flowoplib_destruct_generic, 180*5184Sek110237 FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic, 181*5184Sek110237 flowoplib_finishonbytes, flowoplib_destruct_generic, 182*5184Sek110237 FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic, 183*5184Sek110237 flowoplib_openfile, flowoplib_destruct_generic, 184*5184Sek110237 FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic, 185*5184Sek110237 flowoplib_createfile, flowoplib_destruct_generic, 186*5184Sek110237 FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic, 187*5184Sek110237 flowoplib_closefile, flowoplib_destruct_generic, 188*5184Sek110237 FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic, 189*5184Sek110237 flowoplib_fsync, flowoplib_destruct_generic, 190*5184Sek110237 FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic, 191*5184Sek110237 flowoplib_fsyncset, flowoplib_destruct_generic, 192*5184Sek110237 FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic, 193*5184Sek110237 flowoplib_statfile, flowoplib_destruct_generic, 194*5184Sek110237 FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic, 195*5184Sek110237 flowoplib_readwholefile, flowoplib_destruct_generic, 196*5184Sek110237 FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic, 197*5184Sek110237 flowoplib_appendfile, flowoplib_destruct_generic, 198*5184Sek110237 FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic, 199*5184Sek110237 flowoplib_appendfilerand, flowoplib_destruct_generic, 200*5184Sek110237 FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic, 201*5184Sek110237 flowoplib_deletefile, flowoplib_destruct_generic, 202*5184Sek110237 FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic, 203*5184Sek110237 flowoplib_writewholefile, flowoplib_destruct_generic 204*5184Sek110237 }; 205*5184Sek110237 206*5184Sek110237 /* 207*5184Sek110237 * Loops through the master list of flowops defined in this 208*5184Sek110237 * module, and creates and initializes a flowop for each one 209*5184Sek110237 * by calling flowop_define. As a side effect of calling 210*5184Sek110237 * flowop define, the created flowops are placed on the 211*5184Sek110237 * master flowop list. All created flowops are set to 212*5184Sek110237 * instance "0". 213*5184Sek110237 */ 214*5184Sek110237 void 215*5184Sek110237 flowoplib_init() 216*5184Sek110237 { 217*5184Sek110237 int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t); 218*5184Sek110237 int i; 219*5184Sek110237 220*5184Sek110237 for (i = 0; i < nops; i++) { 221*5184Sek110237 flowop_t *flowop; 222*5184Sek110237 flowoplib_t *fl; 223*5184Sek110237 224*5184Sek110237 fl = &flowoplib_funcs[i]; 225*5184Sek110237 226*5184Sek110237 if ((flowop = flowop_define(NULL, 227*5184Sek110237 fl->fl_name, NULL, 0, fl->fl_type)) == 0) { 228*5184Sek110237 filebench_log(LOG_ERROR, 229*5184Sek110237 "failed to create flowop %s\n", 230*5184Sek110237 fl->fl_name); 231*5184Sek110237 filebench_shutdown(1); 232*5184Sek110237 } 233*5184Sek110237 234*5184Sek110237 flowop->fo_func = fl->fl_func; 235*5184Sek110237 flowop->fo_init = fl->fl_init; 236*5184Sek110237 flowop->fo_destruct = fl->fl_destruct; 237*5184Sek110237 flowop->fo_attrs = fl->fl_attrs; 238*5184Sek110237 } 239*5184Sek110237 } 240*5184Sek110237 241*5184Sek110237 static int 242*5184Sek110237 flowoplib_init_generic(flowop_t *flowop) 243*5184Sek110237 { 244*5184Sek110237 (void) ipc_mutex_unlock(&flowop->fo_lock); 245*5184Sek110237 return (0); 246*5184Sek110237 } 247*5184Sek110237 248*5184Sek110237 /* ARGSUSED */ 249*5184Sek110237 static void 250*5184Sek110237 flowoplib_destruct_generic(flowop_t *flowop) 251*5184Sek110237 { 252*5184Sek110237 } 253*5184Sek110237 254*5184Sek110237 /* 255*5184Sek110237 * Generates a file attribute from flags in the supplied flowop. 256*5184Sek110237 * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed. 257*5184Sek110237 */ 258*5184Sek110237 static int 259*5184Sek110237 flowoplib_fileattrs(flowop_t *flowop) 260*5184Sek110237 { 261*5184Sek110237 int attrs = 0; 262*5184Sek110237 263*5184Sek110237 if (*flowop->fo_directio) 264*5184Sek110237 attrs |= FLOW_ATTR_DIRECTIO; 265*5184Sek110237 266*5184Sek110237 if (*flowop->fo_dsync) 267*5184Sek110237 attrs |= FLOW_ATTR_DSYNC; 268*5184Sek110237 269*5184Sek110237 return (attrs); 270*5184Sek110237 } 271*5184Sek110237 272*5184Sek110237 /* 273*5184Sek110237 * Searches for a file descriptor. Tries the flowop's 274*5184Sek110237 * fo_fdnumber first and returns with it if it has been 275*5184Sek110237 * explicitly set (greater than 0). It next checks to 276*5184Sek110237 * see if a rotating file descriptor policy is in effect, 277*5184Sek110237 * and if not returns the fdnumber regardless of what 278*5184Sek110237 * it is. (note that if it is 0, it just selects to the 279*5184Sek110237 * default file descriptor in the threadflow's tf_fd 280*5184Sek110237 * array). If the rotating fd policy is in effect, it 281*5184Sek110237 * cycles from the end of the tf_fd array to one location 282*5184Sek110237 * beyond the maximum needed by the number of entries in 283*5184Sek110237 * the associated fileset on each invocation, then starts 284*5184Sek110237 * over from the end. 285*5184Sek110237 * 286*5184Sek110237 * The routine returns an index into the threadflow's 287*5184Sek110237 * tf_fd table where the actual file descriptor will be 288*5184Sek110237 * found. Note: the calling routine must not call this 289*5184Sek110237 * routine if the flowop does not have a fileset, and the 290*5184Sek110237 * flowop's fo_fdnumber is zero and fo_rotatefd is 291*5184Sek110237 * asserted, or an addressing fault may occur. 292*5184Sek110237 */ 293*5184Sek110237 int 294*5184Sek110237 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop) 295*5184Sek110237 { 296*5184Sek110237 /* If the script sets the fd explicitly */ 297*5184Sek110237 if (flowop->fo_fdnumber > 0) 298*5184Sek110237 return (flowop->fo_fdnumber); 299*5184Sek110237 300*5184Sek110237 /* If the flowop defaults to persistent fd */ 301*5184Sek110237 if (!integer_isset(flowop->fo_rotatefd)) 302*5184Sek110237 return (flowop->fo_fdnumber); 303*5184Sek110237 304*5184Sek110237 /* Rotate the fd on each flowop invocation */ 305*5184Sek110237 if (*(flowop->fo_fileset->fs_entries) > (THREADFLOW_MAXFD / 2)) { 306*5184Sek110237 filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s" 307*5184Sek110237 " (too many files : %d", flowop->fo_name, 308*5184Sek110237 *(flowop->fo_fileset->fs_entries)); 309*5184Sek110237 return (-1); 310*5184Sek110237 } 311*5184Sek110237 312*5184Sek110237 /* First time around */ 313*5184Sek110237 if (threadflow->tf_fdrotor == 0) 314*5184Sek110237 threadflow->tf_fdrotor = THREADFLOW_MAXFD; 315*5184Sek110237 316*5184Sek110237 /* One fd for every file in the set */ 317*5184Sek110237 if (*(flowop->fo_fileset->fs_entries) == 318*5184Sek110237 (THREADFLOW_MAXFD - threadflow->tf_fdrotor)) 319*5184Sek110237 threadflow->tf_fdrotor = THREADFLOW_MAXFD; 320*5184Sek110237 321*5184Sek110237 322*5184Sek110237 threadflow->tf_fdrotor--; 323*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "selected fd = %d", 324*5184Sek110237 threadflow->tf_fdrotor); 325*5184Sek110237 return (threadflow->tf_fdrotor); 326*5184Sek110237 } 327*5184Sek110237 328*5184Sek110237 /* 329*5184Sek110237 * Emulate posix read / pread. If the flowop has a fileset, 330*5184Sek110237 * a file descriptor number index is fetched, otherwise a 331*5184Sek110237 * supplied fileobj file is used. In either case the specified 332*5184Sek110237 * file will be opened if not already open. If the flowop has 333*5184Sek110237 * neither a fileset or fileobj, an error is logged and -1 334*5184Sek110237 * returned. 335*5184Sek110237 * 336*5184Sek110237 * The actual read is done to a random offset in the 337*5184Sek110237 * threadflow's thread memory (tf_mem), with a size set by 338*5184Sek110237 * fo_iosize and at either a random disk offset within the 339*5184Sek110237 * working set size, or at the next sequential location. If 340*5184Sek110237 * any errors are encountered, -1 is returned, if successful, 341*5184Sek110237 * 0 is returned. 342*5184Sek110237 */ 343*5184Sek110237 static int 344*5184Sek110237 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop) 345*5184Sek110237 { 346*5184Sek110237 size_t memoffset; 347*5184Sek110237 vinteger_t wss; 348*5184Sek110237 long memsize, round; 349*5184Sek110237 int filedesc; 350*5184Sek110237 int ret; 351*5184Sek110237 352*5184Sek110237 if (flowop->fo_fileset || flowop->fo_fdnumber) { 353*5184Sek110237 int fd = flowoplib_fdnum(threadflow, flowop); 354*5184Sek110237 355*5184Sek110237 if (fd == -1) 356*5184Sek110237 return (-1); 357*5184Sek110237 358*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 359*5184Sek110237 (void) flowoplib_openfile_common(threadflow, 360*5184Sek110237 flowop, fd); 361*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "read opened file %s", 362*5184Sek110237 threadflow->tf_fse[fd]->fse_path); 363*5184Sek110237 } 364*5184Sek110237 filedesc = threadflow->tf_fd[fd]; 365*5184Sek110237 if (*flowop->fo_wss == 0) 366*5184Sek110237 wss = threadflow->tf_fse[fd]->fse_size; 367*5184Sek110237 else 368*5184Sek110237 wss = *flowop->fo_wss; 369*5184Sek110237 } else { 370*5184Sek110237 if (flowop->fo_file == NULL) { 371*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 372*5184Sek110237 return (-1); 373*5184Sek110237 } 374*5184Sek110237 if (flowop->fo_fd < 0) 375*5184Sek110237 flowop->fo_fd = fileobj_open(flowop->fo_file, 376*5184Sek110237 flowoplib_fileattrs(flowop)); 377*5184Sek110237 378*5184Sek110237 if (flowop->fo_fd < 0) { 379*5184Sek110237 filebench_log(LOG_ERROR, "failed to open file %s", 380*5184Sek110237 flowop->fo_file->fo_name); 381*5184Sek110237 return (-1); 382*5184Sek110237 } 383*5184Sek110237 filedesc = flowop->fo_fd; 384*5184Sek110237 if (*flowop->fo_wss == 0) 385*5184Sek110237 wss = *flowop->fo_file->fo_size; 386*5184Sek110237 else 387*5184Sek110237 wss = *flowop->fo_wss; 388*5184Sek110237 } 389*5184Sek110237 390*5184Sek110237 if (*flowop->fo_iosize == 0) { 391*5184Sek110237 filebench_log(LOG_ERROR, "zero iosize for thread %s", 392*5184Sek110237 flowop->fo_name); 393*5184Sek110237 return (-1); 394*5184Sek110237 } 395*5184Sek110237 396*5184Sek110237 memsize = *threadflow->tf_memsize; 397*5184Sek110237 round = *flowop->fo_iosize; 398*5184Sek110237 399*5184Sek110237 if (filebench_randomno(&memoffset, memsize, round) == -1) { 400*5184Sek110237 filebench_log(LOG_ERROR, 401*5184Sek110237 "tf_memsize smaller than IO size for thread %s", 402*5184Sek110237 flowop->fo_name); 403*5184Sek110237 return (-1); 404*5184Sek110237 } 405*5184Sek110237 406*5184Sek110237 if (*flowop->fo_random) { 407*5184Sek110237 uint64_t fileoffset; 408*5184Sek110237 409*5184Sek110237 if (filebench_randomno64(&fileoffset, wss, 410*5184Sek110237 *flowop->fo_iosize) == -1) { 411*5184Sek110237 filebench_log(LOG_ERROR, 412*5184Sek110237 "file size smaller than IO size for thread %s", 413*5184Sek110237 flowop->fo_name); 414*5184Sek110237 return (-1); 415*5184Sek110237 } 416*5184Sek110237 417*5184Sek110237 (void) flowop_beginop(threadflow, flowop); 418*5184Sek110237 if ((ret = pread64(filedesc, threadflow->tf_mem + memoffset, 419*5184Sek110237 *flowop->fo_iosize, (off64_t)fileoffset)) == -1) { 420*5184Sek110237 (void) flowop_endop(threadflow, flowop); 421*5184Sek110237 filebench_log(LOG_ERROR, 422*5184Sek110237 "read file %s failed, offset %lld " 423*5184Sek110237 "memoffset %zd: %s", 424*5184Sek110237 flowop->fo_file->fo_name, 425*5184Sek110237 fileoffset, memoffset, strerror(errno)); 426*5184Sek110237 flowop_endop(threadflow, flowop); 427*5184Sek110237 return (-1); 428*5184Sek110237 } 429*5184Sek110237 (void) flowop_endop(threadflow, flowop); 430*5184Sek110237 431*5184Sek110237 if ((ret == 0)) 432*5184Sek110237 (void) lseek64(filedesc, 0, SEEK_SET); 433*5184Sek110237 434*5184Sek110237 } else { 435*5184Sek110237 (void) flowop_beginop(threadflow, flowop); 436*5184Sek110237 if ((ret = read(filedesc, threadflow->tf_mem + memoffset, 437*5184Sek110237 *flowop->fo_iosize)) == -1) { 438*5184Sek110237 filebench_log(LOG_ERROR, 439*5184Sek110237 "read file %s failed, memoffset %zd: %s", 440*5184Sek110237 flowop->fo_file->fo_name, 441*5184Sek110237 memoffset, strerror(errno)); 442*5184Sek110237 (void) flowop_endop(threadflow, flowop); 443*5184Sek110237 return (-1); 444*5184Sek110237 } 445*5184Sek110237 (void) flowop_endop(threadflow, flowop); 446*5184Sek110237 447*5184Sek110237 if ((ret == 0)) 448*5184Sek110237 (void) lseek64(filedesc, 0, SEEK_SET); 449*5184Sek110237 } 450*5184Sek110237 451*5184Sek110237 return (0); 452*5184Sek110237 } 453*5184Sek110237 454*5184Sek110237 #ifdef HAVE_AIO 455*5184Sek110237 456*5184Sek110237 /* 457*5184Sek110237 * Asynchronous write section. An Asynchronous IO element 458*5184Sek110237 * (aiolist_t) is used to associate the asynchronous write request with 459*5184Sek110237 * its subsequent completion. This element includes a aiocb64 struct 460*5184Sek110237 * that is used by posix aio_xxx calls to track the asynchronous writes. 461*5184Sek110237 * The flowops aiowrite and aiowait result in calls to these posix 462*5184Sek110237 * aio_xxx system routines to do the actual asynchronous write IO 463*5184Sek110237 * operations. 464*5184Sek110237 */ 465*5184Sek110237 466*5184Sek110237 467*5184Sek110237 /* 468*5184Sek110237 * Allocates an asynchronous I/O list (aio, of type 469*5184Sek110237 * aiolist_t) element. Adds it to the flowop thread's 470*5184Sek110237 * threadflow aio list. Returns a pointer to the element. 471*5184Sek110237 */ 472*5184Sek110237 static aiolist_t * 473*5184Sek110237 aio_allocate(flowop_t *flowop) 474*5184Sek110237 { 475*5184Sek110237 aiolist_t *aiolist; 476*5184Sek110237 477*5184Sek110237 if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) { 478*5184Sek110237 filebench_log(LOG_ERROR, "malloc aiolist failed"); 479*5184Sek110237 filebench_shutdown(1); 480*5184Sek110237 } 481*5184Sek110237 482*5184Sek110237 /* Add to list */ 483*5184Sek110237 if (flowop->fo_thread->tf_aiolist == NULL) { 484*5184Sek110237 flowop->fo_thread->tf_aiolist = aiolist; 485*5184Sek110237 aiolist->al_next = NULL; 486*5184Sek110237 } else { 487*5184Sek110237 aiolist->al_next = flowop->fo_thread->tf_aiolist; 488*5184Sek110237 flowop->fo_thread->tf_aiolist = aiolist; 489*5184Sek110237 } 490*5184Sek110237 return (aiolist); 491*5184Sek110237 } 492*5184Sek110237 493*5184Sek110237 /* 494*5184Sek110237 * Searches for the aiolist element that has a matching 495*5184Sek110237 * completion block, aiocb. If none found returns -1. If 496*5184Sek110237 * found, removes the aiolist element from flowop thread's 497*5184Sek110237 * list and returns 0. 498*5184Sek110237 */ 499*5184Sek110237 static int 500*5184Sek110237 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb) 501*5184Sek110237 { 502*5184Sek110237 aiolist_t *aiolist = flowop->fo_thread->tf_aiolist; 503*5184Sek110237 aiolist_t *previous = NULL; 504*5184Sek110237 aiolist_t *match = NULL; 505*5184Sek110237 506*5184Sek110237 if (aiocb == NULL) { 507*5184Sek110237 filebench_log(LOG_ERROR, "null aiocb deallocate"); 508*5184Sek110237 return (0); 509*5184Sek110237 } 510*5184Sek110237 511*5184Sek110237 while (aiolist) { 512*5184Sek110237 if (aiocb == &(aiolist->al_aiocb)) { 513*5184Sek110237 match = aiolist; 514*5184Sek110237 break; 515*5184Sek110237 } 516*5184Sek110237 previous = aiolist; 517*5184Sek110237 aiolist = aiolist->al_next; 518*5184Sek110237 } 519*5184Sek110237 520*5184Sek110237 if (match == NULL) 521*5184Sek110237 return (-1); 522*5184Sek110237 523*5184Sek110237 /* Remove from the list */ 524*5184Sek110237 if (previous) 525*5184Sek110237 previous->al_next = match->al_next; 526*5184Sek110237 else 527*5184Sek110237 flowop->fo_thread->tf_aiolist = match->al_next; 528*5184Sek110237 529*5184Sek110237 return (0); 530*5184Sek110237 } 531*5184Sek110237 532*5184Sek110237 /* 533*5184Sek110237 * Emulate posix aiowrite(). Determines which file to use, 534*5184Sek110237 * either one file of a fileset, or the file associated 535*5184Sek110237 * with a fileobj, allocates and fills an aiolist_t element 536*5184Sek110237 * for the write, and issues the asynchronous write. This 537*5184Sek110237 * operation is only valid for random IO, and returns an 538*5184Sek110237 * error if the flowop is set for sequential IO. Returns 0 539*5184Sek110237 * on success, -1 on any encountered error. 540*5184Sek110237 */ 541*5184Sek110237 static int 542*5184Sek110237 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop) 543*5184Sek110237 { 544*5184Sek110237 size_t memoffset; 545*5184Sek110237 vinteger_t wss; 546*5184Sek110237 long memsize, round; 547*5184Sek110237 int filedesc; 548*5184Sek110237 549*5184Sek110237 if (flowop->fo_fileset || flowop->fo_fdnumber) { 550*5184Sek110237 int fd = flowoplib_fdnum(threadflow, flowop); 551*5184Sek110237 552*5184Sek110237 if (fd == -1) 553*5184Sek110237 return (-1); 554*5184Sek110237 555*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 556*5184Sek110237 (void) flowoplib_openfile_common(threadflow, 557*5184Sek110237 flowop, fd); 558*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 559*5184Sek110237 "writefile opened file %s", 560*5184Sek110237 threadflow->tf_fse[fd]->fse_path); 561*5184Sek110237 } 562*5184Sek110237 filedesc = threadflow->tf_fd[fd]; 563*5184Sek110237 if (*flowop->fo_wss == 0) 564*5184Sek110237 wss = threadflow->tf_fse[fd]->fse_size; 565*5184Sek110237 else 566*5184Sek110237 wss = *flowop->fo_wss; 567*5184Sek110237 } else { 568*5184Sek110237 if (flowop->fo_file == NULL) { 569*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 570*5184Sek110237 return (-1); 571*5184Sek110237 } 572*5184Sek110237 if (flowop->fo_fd < 0) 573*5184Sek110237 flowop->fo_fd = fileobj_open(flowop->fo_file, 574*5184Sek110237 flowoplib_fileattrs(flowop)); 575*5184Sek110237 576*5184Sek110237 if (flowop->fo_fd < 0) { 577*5184Sek110237 filebench_log(LOG_ERROR, "failed to open file %s", 578*5184Sek110237 flowop->fo_file->fo_name); 579*5184Sek110237 return (-1); 580*5184Sek110237 } 581*5184Sek110237 filedesc = flowop->fo_fd; 582*5184Sek110237 if (*flowop->fo_wss == 0) 583*5184Sek110237 wss = *flowop->fo_file->fo_size; 584*5184Sek110237 else 585*5184Sek110237 wss = *flowop->fo_wss; 586*5184Sek110237 } 587*5184Sek110237 588*5184Sek110237 if (*flowop->fo_iosize == 0) { 589*5184Sek110237 filebench_log(LOG_ERROR, "zero iosize for thread %s", 590*5184Sek110237 flowop->fo_name); 591*5184Sek110237 return (-1); 592*5184Sek110237 } 593*5184Sek110237 594*5184Sek110237 memsize = *threadflow->tf_memsize; 595*5184Sek110237 round = *flowop->fo_iosize; 596*5184Sek110237 597*5184Sek110237 /* Select memory offset for IO */ 598*5184Sek110237 if (filebench_randomno(&memoffset, memsize, round) == -1) { 599*5184Sek110237 filebench_log(LOG_ERROR, 600*5184Sek110237 "tf_memsize smaller than IO size for thread %s", 601*5184Sek110237 flowop->fo_name); 602*5184Sek110237 return (-1); 603*5184Sek110237 } 604*5184Sek110237 605*5184Sek110237 if (*flowop->fo_random) { 606*5184Sek110237 uint64_t fileoffset; 607*5184Sek110237 struct aiocb64 *aiocb; 608*5184Sek110237 aiolist_t *aiolist; 609*5184Sek110237 610*5184Sek110237 if (filebench_randomno64(&fileoffset, 611*5184Sek110237 wss, *flowop->fo_iosize) == -1) { 612*5184Sek110237 filebench_log(LOG_ERROR, 613*5184Sek110237 "file size smaller than IO size for thread %s", 614*5184Sek110237 flowop->fo_name); 615*5184Sek110237 return (-1); 616*5184Sek110237 } 617*5184Sek110237 618*5184Sek110237 aiolist = aio_allocate(flowop); 619*5184Sek110237 aiolist->al_type = AL_WRITE; 620*5184Sek110237 aiocb = &aiolist->al_aiocb; 621*5184Sek110237 622*5184Sek110237 aiocb->aio_fildes = filedesc; 623*5184Sek110237 aiocb->aio_buf = threadflow->tf_mem + memoffset; 624*5184Sek110237 aiocb->aio_nbytes = *flowop->fo_iosize; 625*5184Sek110237 aiocb->aio_offset = (off64_t)fileoffset; 626*5184Sek110237 aiocb->aio_reqprio = 0; 627*5184Sek110237 628*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 629*5184Sek110237 "aio fd=%d, bytes=%lld, offset=%lld", 630*5184Sek110237 filedesc, *flowop->fo_iosize, fileoffset); 631*5184Sek110237 632*5184Sek110237 flowop_beginop(threadflow, flowop); 633*5184Sek110237 if (aio_write64(aiocb) < 0) { 634*5184Sek110237 filebench_log(LOG_ERROR, "aiowrite failed: %s", 635*5184Sek110237 strerror(errno)); 636*5184Sek110237 filebench_shutdown(1); 637*5184Sek110237 } 638*5184Sek110237 flowop_endop(threadflow, flowop); 639*5184Sek110237 } else { 640*5184Sek110237 return (-1); 641*5184Sek110237 } 642*5184Sek110237 643*5184Sek110237 return (0); 644*5184Sek110237 } 645*5184Sek110237 646*5184Sek110237 647*5184Sek110237 648*5184Sek110237 #define MAXREAP 4096 649*5184Sek110237 650*5184Sek110237 /* 651*5184Sek110237 * Emulate posix aiowait(). Waits for the completion of half the 652*5184Sek110237 * outstanding asynchronous IOs, or a single IO, which ever is 653*5184Sek110237 * larger. The routine will return after a sufficient number of 654*5184Sek110237 * completed calls issued by any thread in the procflow have 655*5184Sek110237 * completed, or a 1 second timout elapses. All completed 656*5184Sek110237 * IO operations are deleted from the thread's aiolist. 657*5184Sek110237 */ 658*5184Sek110237 static int 659*5184Sek110237 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop) 660*5184Sek110237 { 661*5184Sek110237 struct aiocb64 **worklist; 662*5184Sek110237 aiolist_t *aio = flowop->fo_thread->tf_aiolist; 663*5184Sek110237 int uncompleted = 0; 664*5184Sek110237 665*5184Sek110237 worklist = calloc(MAXREAP, sizeof (struct aiocb64 *)); 666*5184Sek110237 667*5184Sek110237 /* Count the list of pending aios */ 668*5184Sek110237 while (aio) { 669*5184Sek110237 uncompleted++; 670*5184Sek110237 aio = aio->al_next; 671*5184Sek110237 } 672*5184Sek110237 673*5184Sek110237 do { 674*5184Sek110237 uint_t ncompleted = 0; 675*5184Sek110237 uint_t todo; 676*5184Sek110237 struct timespec timeout; 677*5184Sek110237 int inprogress; 678*5184Sek110237 int i; 679*5184Sek110237 680*5184Sek110237 /* Wait for half of the outstanding requests */ 681*5184Sek110237 timeout.tv_sec = 1; 682*5184Sek110237 timeout.tv_nsec = 0; 683*5184Sek110237 684*5184Sek110237 if (uncompleted > MAXREAP) 685*5184Sek110237 todo = MAXREAP; 686*5184Sek110237 else 687*5184Sek110237 todo = uncompleted / 2; 688*5184Sek110237 689*5184Sek110237 if (todo == 0) 690*5184Sek110237 todo = 1; 691*5184Sek110237 692*5184Sek110237 flowop_beginop(threadflow, flowop); 693*5184Sek110237 694*5184Sek110237 #ifdef HAVE_AIOWAITN 695*5184Sek110237 if ((aio_waitn64((struct aiocb64 **)worklist, 696*5184Sek110237 MAXREAP, &todo, &timeout) == -1) && 697*5184Sek110237 errno && (errno != ETIME)) { 698*5184Sek110237 filebench_log(LOG_ERROR, 699*5184Sek110237 "aiowait failed: %s, outstanding = %d, " 700*5184Sek110237 "ncompleted = %d ", 701*5184Sek110237 strerror(errno), uncompleted, todo); 702*5184Sek110237 } 703*5184Sek110237 704*5184Sek110237 ncompleted = todo; 705*5184Sek110237 /* Take the completed I/Os from the list */ 706*5184Sek110237 inprogress = 0; 707*5184Sek110237 for (i = 0; i < ncompleted; i++) { 708*5184Sek110237 if ((aio_return64(worklist[i]) == -1) && 709*5184Sek110237 (errno == EINPROGRESS)) { 710*5184Sek110237 inprogress++; 711*5184Sek110237 continue; 712*5184Sek110237 } 713*5184Sek110237 if (aio_deallocate(flowop, worklist[i]) < 0) { 714*5184Sek110237 filebench_log(LOG_ERROR, "Could not remove " 715*5184Sek110237 "aio from list "); 716*5184Sek110237 flowop_endop(threadflow, flowop); 717*5184Sek110237 return (-1); 718*5184Sek110237 } 719*5184Sek110237 } 720*5184Sek110237 721*5184Sek110237 uncompleted -= ncompleted; 722*5184Sek110237 uncompleted += inprogress; 723*5184Sek110237 724*5184Sek110237 #else 725*5184Sek110237 726*5184Sek110237 for (ncompleted = 0, inprogress = 0, 727*5184Sek110237 aio = flowop->fo_thread->tf_aiolist; 728*5184Sek110237 ncompleted < todo, aio != NULL; aio = aio->al_next) { 729*5184Sek110237 730*5184Sek110237 result = aio_error64(&aio->al_aiocb); 731*5184Sek110237 732*5184Sek110237 if (result == EINPROGRESS) { 733*5184Sek110237 inprogress++; 734*5184Sek110237 continue; 735*5184Sek110237 } 736*5184Sek110237 737*5184Sek110237 if ((aio_return64(&aio->al_aiocb) == -1) || result) { 738*5184Sek110237 filebench_log(LOG_ERROR, "aio failed: %s", 739*5184Sek110237 strerror(result)); 740*5184Sek110237 continue; 741*5184Sek110237 } 742*5184Sek110237 743*5184Sek110237 ncompleted++; 744*5184Sek110237 745*5184Sek110237 if (aio_deallocate(flowop, &aio->al_aiocb) < 0) { 746*5184Sek110237 filebench_log(LOG_ERROR, "Could not remove aio " 747*5184Sek110237 "from list "); 748*5184Sek110237 flowop_endop(threadflow, flowop); 749*5184Sek110237 return (-1); 750*5184Sek110237 } 751*5184Sek110237 } 752*5184Sek110237 753*5184Sek110237 uncompleted -= ncompleted; 754*5184Sek110237 755*5184Sek110237 #endif 756*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, 757*5184Sek110237 "aio2 completed %d ios, uncompleted = %d, inprogress = %d", 758*5184Sek110237 ncompleted, uncompleted, inprogress); 759*5184Sek110237 760*5184Sek110237 } while (uncompleted > MAXREAP); 761*5184Sek110237 762*5184Sek110237 flowop_endop(threadflow, flowop); 763*5184Sek110237 764*5184Sek110237 free(worklist); 765*5184Sek110237 766*5184Sek110237 return (0); 767*5184Sek110237 } 768*5184Sek110237 769*5184Sek110237 #endif /* HAVE_AIO */ 770*5184Sek110237 771*5184Sek110237 /* 772*5184Sek110237 * Initializes a "flowop_block" flowop. Specifically, it 773*5184Sek110237 * initializes the flowop's fo_cv and unlocks the fo_lock. 774*5184Sek110237 */ 775*5184Sek110237 static int 776*5184Sek110237 flowoplib_block_init(flowop_t *flowop) 777*5184Sek110237 { 778*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx", 779*5184Sek110237 flowop->fo_name, flowop->fo_instance, &flowop->fo_cv); 780*5184Sek110237 (void) pthread_cond_init(&flowop->fo_cv, ipc_condattr()); 781*5184Sek110237 (void) ipc_mutex_unlock(&flowop->fo_lock); 782*5184Sek110237 783*5184Sek110237 return (0); 784*5184Sek110237 } 785*5184Sek110237 786*5184Sek110237 /* 787*5184Sek110237 * Blocks the threadflow until woken up by flowoplib_wakeup. 788*5184Sek110237 * The routine blocks on the flowop's fo_cv condition variable. 789*5184Sek110237 */ 790*5184Sek110237 static int 791*5184Sek110237 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop) 792*5184Sek110237 { 793*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx", 794*5184Sek110237 flowop->fo_name, flowop->fo_instance, &flowop->fo_cv); 795*5184Sek110237 (void) ipc_mutex_lock(&flowop->fo_lock); 796*5184Sek110237 797*5184Sek110237 flowop_beginop(threadflow, flowop); 798*5184Sek110237 (void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock); 799*5184Sek110237 flowop_endop(threadflow, flowop); 800*5184Sek110237 801*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking", 802*5184Sek110237 flowop->fo_name, flowop->fo_instance); 803*5184Sek110237 804*5184Sek110237 (void) ipc_mutex_unlock(&flowop->fo_lock); 805*5184Sek110237 806*5184Sek110237 return (0); 807*5184Sek110237 } 808*5184Sek110237 809*5184Sek110237 /* 810*5184Sek110237 * Wakes up one or more target blocking flowops. 811*5184Sek110237 * Sends broadcasts on the fo_cv condition variables of all 812*5184Sek110237 * flowops on the target list, except those that are 813*5184Sek110237 * FLOW_MASTER flowops. The target list consists of all 814*5184Sek110237 * flowops whose name matches this flowop's "fo_targetname" 815*5184Sek110237 * attribute. The target list is generated on the first 816*5184Sek110237 * invocation, and the run will be shutdown if no targets 817*5184Sek110237 * are found. Otherwise the routine always returns 0. 818*5184Sek110237 */ 819*5184Sek110237 static int 820*5184Sek110237 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop) 821*5184Sek110237 { 822*5184Sek110237 flowop_t *target; 823*5184Sek110237 824*5184Sek110237 /* if this is the first wakeup, create the wakeup list */ 825*5184Sek110237 if (flowop->fo_targets == NULL) { 826*5184Sek110237 flowop_t *result = flowop_find(flowop->fo_targetname); 827*5184Sek110237 828*5184Sek110237 flowop->fo_targets = result; 829*5184Sek110237 if (result == NULL) { 830*5184Sek110237 filebench_log(LOG_ERROR, 831*5184Sek110237 "wakeup: could not find op %s for thread %s", 832*5184Sek110237 flowop->fo_targetname, 833*5184Sek110237 threadflow->tf_name); 834*5184Sek110237 filebench_shutdown(1); 835*5184Sek110237 } 836*5184Sek110237 while (result) { 837*5184Sek110237 result->fo_targetnext = 838*5184Sek110237 result->fo_resultnext; 839*5184Sek110237 result = result->fo_resultnext; 840*5184Sek110237 } 841*5184Sek110237 } 842*5184Sek110237 843*5184Sek110237 target = flowop->fo_targets; 844*5184Sek110237 845*5184Sek110237 /* wakeup the targets */ 846*5184Sek110237 while (target) { 847*5184Sek110237 if (target->fo_instance == FLOW_MASTER) { 848*5184Sek110237 target = target->fo_targetnext; 849*5184Sek110237 continue; 850*5184Sek110237 } 851*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 852*5184Sek110237 "wakeup flow %s-%d at address %zx", 853*5184Sek110237 target->fo_name, 854*5184Sek110237 target->fo_instance, 855*5184Sek110237 &target->fo_cv); 856*5184Sek110237 857*5184Sek110237 flowop_beginop(threadflow, flowop); 858*5184Sek110237 (void) ipc_mutex_lock(&target->fo_lock); 859*5184Sek110237 (void) pthread_cond_broadcast(&target->fo_cv); 860*5184Sek110237 (void) ipc_mutex_unlock(&target->fo_lock); 861*5184Sek110237 flowop_endop(threadflow, flowop); 862*5184Sek110237 863*5184Sek110237 target = target->fo_targetnext; 864*5184Sek110237 } 865*5184Sek110237 866*5184Sek110237 return (0); 867*5184Sek110237 } 868*5184Sek110237 869*5184Sek110237 /* 870*5184Sek110237 * "think time" routines. the "hog" routine consumes cpu cycles as 871*5184Sek110237 * it "thinks", while the "delay" flowop simply calls sleep() to delay 872*5184Sek110237 * for a given number of seconds without consuming cpu cycles. 873*5184Sek110237 */ 874*5184Sek110237 875*5184Sek110237 876*5184Sek110237 /* 877*5184Sek110237 * Consumes CPU cycles and memory bandwidth by looping for 878*5184Sek110237 * flowop->fo_value times. With each loop sets memory location 879*5184Sek110237 * threadflow->tf_mem to 1. 880*5184Sek110237 */ 881*5184Sek110237 static int 882*5184Sek110237 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop) 883*5184Sek110237 { 884*5184Sek110237 uint64_t value = *flowop->fo_value; 885*5184Sek110237 int i; 886*5184Sek110237 887*5184Sek110237 flowop_beginop(threadflow, flowop); 888*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "hog enter"); 889*5184Sek110237 for (i = 0; i < value; i++) 890*5184Sek110237 *(threadflow->tf_mem) = 1; 891*5184Sek110237 flowop_endop(threadflow, flowop); 892*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "hog exit"); 893*5184Sek110237 return (0); 894*5184Sek110237 } 895*5184Sek110237 896*5184Sek110237 897*5184Sek110237 /* 898*5184Sek110237 * Delays for fo_value seconds. 899*5184Sek110237 */ 900*5184Sek110237 static int 901*5184Sek110237 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop) 902*5184Sek110237 { 903*5184Sek110237 int value = *flowop->fo_value; 904*5184Sek110237 905*5184Sek110237 flowop_beginop(threadflow, flowop); 906*5184Sek110237 (void) sleep(value); 907*5184Sek110237 flowop_endop(threadflow, flowop); 908*5184Sek110237 return (0); 909*5184Sek110237 } 910*5184Sek110237 911*5184Sek110237 /* 912*5184Sek110237 * Rate limiting routines. This is the event consuming half of the 913*5184Sek110237 * event system. Each of the four following routines will limit the rate 914*5184Sek110237 * to one unit of either calls, issued I/O operations, issued filebench 915*5184Sek110237 * operations, or I/O bandwidth. Since there is only one event generator, 916*5184Sek110237 * the events will be divided amoung multiple instances of an event 917*5184Sek110237 * consumer, and further divided among different consumers if more than 918*5184Sek110237 * one has been defined. There is no mechanism to enforce equal sharing 919*5184Sek110237 * of events. 920*5184Sek110237 */ 921*5184Sek110237 922*5184Sek110237 /* 923*5184Sek110237 * Completes one invocation per posted event. If eventgen_q 924*5184Sek110237 * has an event count greater than zero, one will be removed 925*5184Sek110237 * (count decremented), otherwise the calling thread will 926*5184Sek110237 * block until another event has been posted. Always returns 0 927*5184Sek110237 */ 928*5184Sek110237 static int 929*5184Sek110237 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop) 930*5184Sek110237 { 931*5184Sek110237 /* Immediately bail if not set/enabled */ 932*5184Sek110237 if (filebench_shm->eventgen_hz == 0) 933*5184Sek110237 return (0); 934*5184Sek110237 935*5184Sek110237 if (flowop->fo_initted == 0) { 936*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", 937*5184Sek110237 flowop, threadflow->tf_name, threadflow->tf_instance); 938*5184Sek110237 flowop->fo_initted = 1; 939*5184Sek110237 } 940*5184Sek110237 941*5184Sek110237 flowop_beginop(threadflow, flowop); 942*5184Sek110237 while (filebench_shm->eventgen_hz) { 943*5184Sek110237 (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); 944*5184Sek110237 if (filebench_shm->eventgen_q > 0) { 945*5184Sek110237 filebench_shm->eventgen_q--; 946*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 947*5184Sek110237 break; 948*5184Sek110237 } 949*5184Sek110237 (void) pthread_cond_wait(&filebench_shm->eventgen_cv, 950*5184Sek110237 &filebench_shm->eventgen_lock); 951*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 952*5184Sek110237 } 953*5184Sek110237 flowop_endop(threadflow, flowop); 954*5184Sek110237 return (0); 955*5184Sek110237 } 956*5184Sek110237 957*5184Sek110237 /* 958*5184Sek110237 * Blocks the calling thread if the number of issued I/O 959*5184Sek110237 * operations exceeds the number of posted events, thus 960*5184Sek110237 * limiting the average I/O operation rate to the rate 961*5184Sek110237 * specified by eventgen_hz. Always returns 0. 962*5184Sek110237 */ 963*5184Sek110237 static int 964*5184Sek110237 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop) 965*5184Sek110237 { 966*5184Sek110237 uint64_t iops; 967*5184Sek110237 uint64_t delta; 968*5184Sek110237 int events; 969*5184Sek110237 970*5184Sek110237 /* Immediately bail if not set/enabled */ 971*5184Sek110237 if (filebench_shm->eventgen_hz == 0) 972*5184Sek110237 return (0); 973*5184Sek110237 974*5184Sek110237 if (flowop->fo_initted == 0) { 975*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", 976*5184Sek110237 flowop, threadflow->tf_name, threadflow->tf_instance); 977*5184Sek110237 flowop->fo_initted = 1; 978*5184Sek110237 } 979*5184Sek110237 980*5184Sek110237 iops = (controlstats.fs_rcount + 981*5184Sek110237 controlstats.fs_wcount); 982*5184Sek110237 983*5184Sek110237 /* Is this the first time around */ 984*5184Sek110237 if (flowop->fo_tputlast == 0) { 985*5184Sek110237 flowop->fo_tputlast = iops; 986*5184Sek110237 return (0); 987*5184Sek110237 } 988*5184Sek110237 989*5184Sek110237 delta = iops - flowop->fo_tputlast; 990*5184Sek110237 flowop->fo_tputbucket -= delta; 991*5184Sek110237 flowop->fo_tputlast = iops; 992*5184Sek110237 993*5184Sek110237 /* No need to block if the q isn't empty */ 994*5184Sek110237 if (flowop->fo_tputbucket >= 0LL) { 995*5184Sek110237 flowop_endop(threadflow, flowop); 996*5184Sek110237 return (0); 997*5184Sek110237 } 998*5184Sek110237 999*5184Sek110237 iops = flowop->fo_tputbucket * -1; 1000*5184Sek110237 events = iops; 1001*5184Sek110237 1002*5184Sek110237 flowop_beginop(threadflow, flowop); 1003*5184Sek110237 while (filebench_shm->eventgen_hz) { 1004*5184Sek110237 1005*5184Sek110237 (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); 1006*5184Sek110237 if (filebench_shm->eventgen_q >= events) { 1007*5184Sek110237 filebench_shm->eventgen_q -= events; 1008*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 1009*5184Sek110237 flowop->fo_tputbucket += events; 1010*5184Sek110237 break; 1011*5184Sek110237 } 1012*5184Sek110237 (void) pthread_cond_wait(&filebench_shm->eventgen_cv, 1013*5184Sek110237 &filebench_shm->eventgen_lock); 1014*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 1015*5184Sek110237 } 1016*5184Sek110237 flowop_endop(threadflow, flowop); 1017*5184Sek110237 1018*5184Sek110237 return (0); 1019*5184Sek110237 } 1020*5184Sek110237 1021*5184Sek110237 /* 1022*5184Sek110237 * Blocks the calling thread if the number of issued filebench 1023*5184Sek110237 * operations exceeds the number of posted events, thus limiting 1024*5184Sek110237 * the average filebench operation rate to the rate specified by 1025*5184Sek110237 * eventgen_hz. Always returns 0. 1026*5184Sek110237 */ 1027*5184Sek110237 static int 1028*5184Sek110237 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop) 1029*5184Sek110237 { 1030*5184Sek110237 uint64_t ops; 1031*5184Sek110237 uint64_t delta; 1032*5184Sek110237 int events; 1033*5184Sek110237 1034*5184Sek110237 /* Immediately bail if not set/enabled */ 1035*5184Sek110237 if (filebench_shm->eventgen_hz == 0) 1036*5184Sek110237 return (0); 1037*5184Sek110237 1038*5184Sek110237 if (flowop->fo_initted == 0) { 1039*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", 1040*5184Sek110237 flowop, threadflow->tf_name, threadflow->tf_instance); 1041*5184Sek110237 flowop->fo_initted = 1; 1042*5184Sek110237 } 1043*5184Sek110237 1044*5184Sek110237 ops = controlstats.fs_count; 1045*5184Sek110237 1046*5184Sek110237 /* Is this the first time around */ 1047*5184Sek110237 if (flowop->fo_tputlast == 0) { 1048*5184Sek110237 flowop->fo_tputlast = ops; 1049*5184Sek110237 return (0); 1050*5184Sek110237 } 1051*5184Sek110237 1052*5184Sek110237 delta = ops - flowop->fo_tputlast; 1053*5184Sek110237 flowop->fo_tputbucket -= delta; 1054*5184Sek110237 flowop->fo_tputlast = ops; 1055*5184Sek110237 1056*5184Sek110237 /* No need to block if the q isn't empty */ 1057*5184Sek110237 if (flowop->fo_tputbucket >= 0LL) { 1058*5184Sek110237 flowop_endop(threadflow, flowop); 1059*5184Sek110237 return (0); 1060*5184Sek110237 } 1061*5184Sek110237 1062*5184Sek110237 ops = flowop->fo_tputbucket * -1; 1063*5184Sek110237 events = ops; 1064*5184Sek110237 1065*5184Sek110237 flowop_beginop(threadflow, flowop); 1066*5184Sek110237 while (filebench_shm->eventgen_hz) { 1067*5184Sek110237 (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); 1068*5184Sek110237 if (filebench_shm->eventgen_q >= events) { 1069*5184Sek110237 filebench_shm->eventgen_q -= events; 1070*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 1071*5184Sek110237 flowop->fo_tputbucket += events; 1072*5184Sek110237 break; 1073*5184Sek110237 } 1074*5184Sek110237 (void) pthread_cond_wait(&filebench_shm->eventgen_cv, 1075*5184Sek110237 &filebench_shm->eventgen_lock); 1076*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 1077*5184Sek110237 } 1078*5184Sek110237 flowop_endop(threadflow, flowop); 1079*5184Sek110237 1080*5184Sek110237 return (0); 1081*5184Sek110237 } 1082*5184Sek110237 1083*5184Sek110237 1084*5184Sek110237 /* 1085*5184Sek110237 * Blocks the calling thread if the number of bytes of I/O 1086*5184Sek110237 * issued exceeds one megabyte times the number of posted 1087*5184Sek110237 * events, thus limiting the average I/O byte rate to one 1088*5184Sek110237 * megabyte times the event rate as set by eventgen_hz. 1089*5184Sek110237 * Always retuns 0. 1090*5184Sek110237 */ 1091*5184Sek110237 static int 1092*5184Sek110237 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop) 1093*5184Sek110237 { 1094*5184Sek110237 uint64_t bytes; 1095*5184Sek110237 uint64_t delta; 1096*5184Sek110237 int events; 1097*5184Sek110237 1098*5184Sek110237 /* Immediately bail if not set/enabled */ 1099*5184Sek110237 if (filebench_shm->eventgen_hz == 0) 1100*5184Sek110237 return (0); 1101*5184Sek110237 1102*5184Sek110237 if (flowop->fo_initted == 0) { 1103*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking", 1104*5184Sek110237 flowop, threadflow->tf_name, threadflow->tf_instance); 1105*5184Sek110237 flowop->fo_initted = 1; 1106*5184Sek110237 } 1107*5184Sek110237 1108*5184Sek110237 bytes = (controlstats.fs_rbytes + 1109*5184Sek110237 controlstats.fs_wbytes); 1110*5184Sek110237 1111*5184Sek110237 /* Is this the first time around */ 1112*5184Sek110237 if (flowop->fo_tputlast == 0) { 1113*5184Sek110237 flowop->fo_tputlast = bytes; 1114*5184Sek110237 return (0); 1115*5184Sek110237 } 1116*5184Sek110237 1117*5184Sek110237 delta = bytes - flowop->fo_tputlast; 1118*5184Sek110237 flowop->fo_tputbucket -= delta; 1119*5184Sek110237 flowop->fo_tputlast = bytes; 1120*5184Sek110237 1121*5184Sek110237 /* No need to block if the q isn't empty */ 1122*5184Sek110237 if (flowop->fo_tputbucket >= 0LL) { 1123*5184Sek110237 flowop_endop(threadflow, flowop); 1124*5184Sek110237 return (0); 1125*5184Sek110237 } 1126*5184Sek110237 1127*5184Sek110237 bytes = flowop->fo_tputbucket * -1; 1128*5184Sek110237 events = (bytes / MB) + 1; 1129*5184Sek110237 1130*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "%lld bytes, %lld events", 1131*5184Sek110237 bytes, events); 1132*5184Sek110237 1133*5184Sek110237 flowop_beginop(threadflow, flowop); 1134*5184Sek110237 while (filebench_shm->eventgen_hz) { 1135*5184Sek110237 (void) ipc_mutex_lock(&filebench_shm->eventgen_lock); 1136*5184Sek110237 if (filebench_shm->eventgen_q >= events) { 1137*5184Sek110237 filebench_shm->eventgen_q -= events; 1138*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 1139*5184Sek110237 flowop->fo_tputbucket += (events * MB); 1140*5184Sek110237 break; 1141*5184Sek110237 } 1142*5184Sek110237 (void) pthread_cond_wait(&filebench_shm->eventgen_cv, 1143*5184Sek110237 &filebench_shm->eventgen_lock); 1144*5184Sek110237 (void) ipc_mutex_unlock(&filebench_shm->eventgen_lock); 1145*5184Sek110237 } 1146*5184Sek110237 flowop_endop(threadflow, flowop); 1147*5184Sek110237 1148*5184Sek110237 return (0); 1149*5184Sek110237 } 1150*5184Sek110237 1151*5184Sek110237 /* 1152*5184Sek110237 * These flowops terminate a benchmark run when either the specified 1153*5184Sek110237 * number of bytes of I/O (flowoplib_finishonbytes) or the specified 1154*5184Sek110237 * number of I/O operations (flowoplib_finishoncount) have been generated. 1155*5184Sek110237 */ 1156*5184Sek110237 1157*5184Sek110237 1158*5184Sek110237 /* 1159*5184Sek110237 * Stop filebench run when specified number of I/O bytes have been 1160*5184Sek110237 * transferred. Compares controlstats.fs_bytes with *flowop->value, 1161*5184Sek110237 * and if greater returns 1, stopping the run, if not, returns 0 1162*5184Sek110237 * to continue running. 1163*5184Sek110237 */ 1164*5184Sek110237 static int 1165*5184Sek110237 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop) 1166*5184Sek110237 { 1167*5184Sek110237 uint64_t b; 1168*5184Sek110237 uint64_t bytes = *flowop->fo_value; 1169*5184Sek110237 1170*5184Sek110237 b = controlstats.fs_bytes; 1171*5184Sek110237 1172*5184Sek110237 flowop_beginop(threadflow, flowop); 1173*5184Sek110237 if (b > bytes) { 1174*5184Sek110237 flowop_endop(threadflow, flowop); 1175*5184Sek110237 return (1); 1176*5184Sek110237 } 1177*5184Sek110237 flowop_endop(threadflow, flowop); 1178*5184Sek110237 1179*5184Sek110237 return (0); 1180*5184Sek110237 } 1181*5184Sek110237 1182*5184Sek110237 /* 1183*5184Sek110237 * Stop filebench run when specified number of I/O operations have 1184*5184Sek110237 * been performed. Compares controlstats.fs_count with *flowop->value, 1185*5184Sek110237 * and if greater returns 1, stopping the run, if not, returns 0 to 1186*5184Sek110237 * continue running. 1187*5184Sek110237 */ 1188*5184Sek110237 static int 1189*5184Sek110237 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop) 1190*5184Sek110237 { 1191*5184Sek110237 uint64_t ops; 1192*5184Sek110237 uint64_t count = *flowop->fo_value; 1193*5184Sek110237 1194*5184Sek110237 ops = controlstats.fs_count; 1195*5184Sek110237 1196*5184Sek110237 flowop_beginop(threadflow, flowop); 1197*5184Sek110237 if (ops > count) { 1198*5184Sek110237 flowop_endop(threadflow, flowop); 1199*5184Sek110237 return (1); 1200*5184Sek110237 } 1201*5184Sek110237 flowop_endop(threadflow, flowop); 1202*5184Sek110237 1203*5184Sek110237 return (0); 1204*5184Sek110237 } 1205*5184Sek110237 1206*5184Sek110237 /* 1207*5184Sek110237 * Semaphore synchronization using either System V semaphores or 1208*5184Sek110237 * posix semaphores. If System V semaphores are available, they will be 1209*5184Sek110237 * used, otherwise posix semaphores will be used. 1210*5184Sek110237 */ 1211*5184Sek110237 1212*5184Sek110237 1213*5184Sek110237 /* 1214*5184Sek110237 * Initializes the filebench "block on semaphore" flowop. 1215*5184Sek110237 * If System V semaphores are implemented, the routine 1216*5184Sek110237 * initializes the System V semaphore subsystem if it hasn't 1217*5184Sek110237 * already been initialized, also allocates a pair of semids 1218*5184Sek110237 * and initializes the highwater System V semaphore. 1219*5184Sek110237 * If no System V semaphores, then does nothing special. 1220*5184Sek110237 * Returns -1 if it cannot acquire a set of System V semphores 1221*5184Sek110237 * or if the initial post to the semaphore set fails. Returns 0 1222*5184Sek110237 * on success. 1223*5184Sek110237 */ 1224*5184Sek110237 static int 1225*5184Sek110237 flowoplib_semblock_init(flowop_t *flowop) 1226*5184Sek110237 { 1227*5184Sek110237 1228*5184Sek110237 #ifdef HAVE_SYSV_SEM 1229*5184Sek110237 int semid; 1230*5184Sek110237 struct sembuf sbuf[2]; 1231*5184Sek110237 int highwater; 1232*5184Sek110237 1233*5184Sek110237 ipc_seminit(); 1234*5184Sek110237 1235*5184Sek110237 flowop->fo_semid_lw = ipc_semidalloc(); 1236*5184Sek110237 flowop->fo_semid_hw = ipc_semidalloc(); 1237*5184Sek110237 1238*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x", 1239*5184Sek110237 flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw); 1240*5184Sek110237 1241*5184Sek110237 /* 1242*5184Sek110237 * Raise the number of the hw queue, causing the posting side to 1243*5184Sek110237 * block if queue is > 2 x blocking value 1244*5184Sek110237 */ 1245*5184Sek110237 if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) { 1246*5184Sek110237 filebench_log(LOG_ERROR, "semblock init lookup %x failed: %s", 1247*5184Sek110237 filebench_shm->semkey, 1248*5184Sek110237 strerror(errno)); 1249*5184Sek110237 return (-1); 1250*5184Sek110237 } 1251*5184Sek110237 1252*5184Sek110237 if ((highwater = flowop->fo_semid_hw) == 0) 1253*5184Sek110237 highwater = *flowop->fo_value; 1254*5184Sek110237 1255*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater); 1256*5184Sek110237 1257*5184Sek110237 sbuf[0].sem_num = highwater; 1258*5184Sek110237 sbuf[0].sem_op = *flowop->fo_highwater; 1259*5184Sek110237 sbuf[0].sem_flg = 0; 1260*5184Sek110237 if ((semop(semid, &sbuf[0], 1) == -1) && errno) { 1261*5184Sek110237 filebench_log(LOG_ERROR, "semblock init post failed: %s (%d," 1262*5184Sek110237 "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op); 1263*5184Sek110237 return (-1); 1264*5184Sek110237 } 1265*5184Sek110237 #else 1266*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 1267*5184Sek110237 "flow %s-%d semblock init with posix semaphore", 1268*5184Sek110237 flowop->fo_name, flowop->fo_instance); 1269*5184Sek110237 1270*5184Sek110237 sem_init(&flowop->fo_sem, 1, 0); 1271*5184Sek110237 #endif /* HAVE_SYSV_SEM */ 1272*5184Sek110237 1273*5184Sek110237 if (!(*flowop->fo_blocking)) 1274*5184Sek110237 (void) ipc_mutex_unlock(&flowop->fo_lock); 1275*5184Sek110237 1276*5184Sek110237 return (0); 1277*5184Sek110237 } 1278*5184Sek110237 1279*5184Sek110237 /* 1280*5184Sek110237 * Releases the semids for the System V semaphore allocated 1281*5184Sek110237 * to this flowop. If not using System V semaphores, then 1282*5184Sek110237 * it is effectively just a no-op. Always returns 0. 1283*5184Sek110237 */ 1284*5184Sek110237 static void 1285*5184Sek110237 flowoplib_semblock_destruct(flowop_t *flowop) 1286*5184Sek110237 { 1287*5184Sek110237 #ifdef HAVE_SYSV_SEM 1288*5184Sek110237 ipc_semidfree(flowop->fo_semid_lw); 1289*5184Sek110237 ipc_semidfree(flowop->fo_semid_hw); 1290*5184Sek110237 #else 1291*5184Sek110237 sem_destroy(&flowop->fo_sem); 1292*5184Sek110237 #endif /* HAVE_SYSV_SEM */ 1293*5184Sek110237 } 1294*5184Sek110237 1295*5184Sek110237 /* 1296*5184Sek110237 * Attempts to pass a System V or posix semaphore as appropriate, 1297*5184Sek110237 * and blocks if necessary. Returns -1 if a set of System V 1298*5184Sek110237 * semphores is not available or cannot be acquired, or if the initial 1299*5184Sek110237 * post to the semaphore set fails. Returns 0 on success. 1300*5184Sek110237 */ 1301*5184Sek110237 static int 1302*5184Sek110237 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop) 1303*5184Sek110237 { 1304*5184Sek110237 1305*5184Sek110237 #ifdef HAVE_SYSV_SEM 1306*5184Sek110237 struct sembuf sbuf[2]; 1307*5184Sek110237 int value = *flowop->fo_value; 1308*5184Sek110237 int semid; 1309*5184Sek110237 struct timespec timeout; 1310*5184Sek110237 1311*5184Sek110237 if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) { 1312*5184Sek110237 filebench_log(LOG_ERROR, "lookup semop %x failed: %s", 1313*5184Sek110237 filebench_shm->semkey, 1314*5184Sek110237 strerror(errno)); 1315*5184Sek110237 return (-1); 1316*5184Sek110237 } 1317*5184Sek110237 1318*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 1319*5184Sek110237 "flow %s-%d sem blocking on id %x num %x value %d", 1320*5184Sek110237 flowop->fo_name, flowop->fo_instance, semid, 1321*5184Sek110237 flowop->fo_semid_hw, value); 1322*5184Sek110237 1323*5184Sek110237 /* Post, decrement the increment the hw queue */ 1324*5184Sek110237 sbuf[0].sem_num = flowop->fo_semid_hw; 1325*5184Sek110237 sbuf[0].sem_op = value; 1326*5184Sek110237 sbuf[0].sem_flg = 0; 1327*5184Sek110237 sbuf[1].sem_num = flowop->fo_semid_lw; 1328*5184Sek110237 sbuf[1].sem_op = value * -1; 1329*5184Sek110237 sbuf[1].sem_flg = 0; 1330*5184Sek110237 timeout.tv_sec = 600; 1331*5184Sek110237 timeout.tv_nsec = 0; 1332*5184Sek110237 1333*5184Sek110237 if (*flowop->fo_blocking) 1334*5184Sek110237 (void) ipc_mutex_unlock(&flowop->fo_lock); 1335*5184Sek110237 1336*5184Sek110237 flowop_beginop(threadflow, flowop); 1337*5184Sek110237 1338*5184Sek110237 #ifdef HAVE_SEMTIMEDOP 1339*5184Sek110237 (void) semtimedop(semid, &sbuf[0], 1, &timeout); 1340*5184Sek110237 (void) semtimedop(semid, &sbuf[1], 1, &timeout); 1341*5184Sek110237 #else 1342*5184Sek110237 (void) semop(semid, &sbuf[0], 1); 1343*5184Sek110237 (void) semop(semid, &sbuf[1], 1); 1344*5184Sek110237 #endif /* HAVE_SEMTIMEDOP */ 1345*5184Sek110237 1346*5184Sek110237 if (*flowop->fo_blocking) 1347*5184Sek110237 (void) ipc_mutex_lock(&flowop->fo_lock); 1348*5184Sek110237 1349*5184Sek110237 flowop_endop(threadflow, flowop); 1350*5184Sek110237 1351*5184Sek110237 #else 1352*5184Sek110237 int value = *flowop->fo_value; 1353*5184Sek110237 int i; 1354*5184Sek110237 1355*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 1356*5184Sek110237 "flow %s-%d sem blocking on posix semaphore", 1357*5184Sek110237 flowop->fo_name, flowop->fo_instance); 1358*5184Sek110237 1359*5184Sek110237 /* Decrement sem by value */ 1360*5184Sek110237 for (i = 0; i < value; i++) { 1361*5184Sek110237 if (sem_wait(&flowop->fo_sem) == -1) { 1362*5184Sek110237 filebench_log(LOG_ERROR, "semop wait failed"); 1363*5184Sek110237 return (-1); 1364*5184Sek110237 } 1365*5184Sek110237 } 1366*5184Sek110237 1367*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking", 1368*5184Sek110237 flowop->fo_name, flowop->fo_instance); 1369*5184Sek110237 #endif /* HAVE_SYSV_SEM */ 1370*5184Sek110237 1371*5184Sek110237 return (0); 1372*5184Sek110237 } 1373*5184Sek110237 1374*5184Sek110237 /* 1375*5184Sek110237 * Calls ipc_seminit(), and does so whether System V semaphores 1376*5184Sek110237 * are available or not. Hence it will cause ipc_seminit to log errors 1377*5184Sek110237 * if they are not. Always returns 0. 1378*5184Sek110237 */ 1379*5184Sek110237 /* ARGSUSED */ 1380*5184Sek110237 static int 1381*5184Sek110237 flowoplib_sempost_init(flowop_t *flowop) 1382*5184Sek110237 { 1383*5184Sek110237 #ifdef HAVE_SYSV_SEM 1384*5184Sek110237 ipc_seminit(); 1385*5184Sek110237 #endif /* HAVE_SYSV_SEM */ 1386*5184Sek110237 return (0); 1387*5184Sek110237 } 1388*5184Sek110237 1389*5184Sek110237 /* 1390*5184Sek110237 * Post to a System V or posix semaphore as appropriate. 1391*5184Sek110237 * On the first call for a given flowop instance, this routine 1392*5184Sek110237 * will use the fo_targetname attribute to locate all semblock 1393*5184Sek110237 * flowops that are expecting posts from this flowop. All 1394*5184Sek110237 * target flowops on this list will have a post operation done 1395*5184Sek110237 * to their semaphores on each call. 1396*5184Sek110237 */ 1397*5184Sek110237 static int 1398*5184Sek110237 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop) 1399*5184Sek110237 { 1400*5184Sek110237 flowop_t *target; 1401*5184Sek110237 1402*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 1403*5184Sek110237 "sempost flow %s-%d", 1404*5184Sek110237 flowop->fo_name, 1405*5184Sek110237 flowop->fo_instance); 1406*5184Sek110237 1407*5184Sek110237 /* if this is the first post, create the post list */ 1408*5184Sek110237 if (flowop->fo_targets == NULL) { 1409*5184Sek110237 flowop_t *result = flowop_find(flowop->fo_targetname); 1410*5184Sek110237 1411*5184Sek110237 flowop->fo_targets = result; 1412*5184Sek110237 1413*5184Sek110237 if (result == NULL) { 1414*5184Sek110237 filebench_log(LOG_ERROR, 1415*5184Sek110237 "sempost: could not find op %s for thread %s", 1416*5184Sek110237 flowop->fo_targetname, 1417*5184Sek110237 threadflow->tf_name); 1418*5184Sek110237 filebench_shutdown(1); 1419*5184Sek110237 } 1420*5184Sek110237 1421*5184Sek110237 while (result) { 1422*5184Sek110237 result->fo_targetnext = 1423*5184Sek110237 result->fo_resultnext; 1424*5184Sek110237 result = result->fo_resultnext; 1425*5184Sek110237 } 1426*5184Sek110237 } 1427*5184Sek110237 1428*5184Sek110237 target = flowop->fo_targets; 1429*5184Sek110237 1430*5184Sek110237 flowop_beginop(threadflow, flowop); 1431*5184Sek110237 /* post to the targets */ 1432*5184Sek110237 while (target) { 1433*5184Sek110237 #ifdef HAVE_SYSV_SEM 1434*5184Sek110237 struct sembuf sbuf[2]; 1435*5184Sek110237 int semid; 1436*5184Sek110237 int blocking; 1437*5184Sek110237 #else 1438*5184Sek110237 int i; 1439*5184Sek110237 #endif /* HAVE_SYSV_SEM */ 1440*5184Sek110237 int value = *flowop->fo_value; 1441*5184Sek110237 struct timespec timeout; 1442*5184Sek110237 1443*5184Sek110237 if (target->fo_instance == FLOW_MASTER) { 1444*5184Sek110237 target = target->fo_targetnext; 1445*5184Sek110237 continue; 1446*5184Sek110237 } 1447*5184Sek110237 1448*5184Sek110237 #ifdef HAVE_SYSV_SEM 1449*5184Sek110237 1450*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 1451*5184Sek110237 "sempost flow %s-%d num %x", 1452*5184Sek110237 target->fo_name, 1453*5184Sek110237 target->fo_instance, 1454*5184Sek110237 target->fo_semid_lw); 1455*5184Sek110237 /* ipc_mutex_lock(&target->fo_lock); */ 1456*5184Sek110237 1457*5184Sek110237 if ((semid = semget(filebench_shm->semkey, 1458*5184Sek110237 FILEBENCH_NSEMS, 0)) == -1) { 1459*5184Sek110237 filebench_log(LOG_ERROR, 1460*5184Sek110237 "lookup semop %x failed: %s", 1461*5184Sek110237 filebench_shm->semkey, 1462*5184Sek110237 strerror(errno)); 1463*5184Sek110237 /* ipc_mutex_unlock(&target->fo_lock); */ 1464*5184Sek110237 return (-1); 1465*5184Sek110237 } 1466*5184Sek110237 1467*5184Sek110237 sbuf[0].sem_num = target->fo_semid_lw; 1468*5184Sek110237 sbuf[0].sem_op = value; 1469*5184Sek110237 sbuf[0].sem_flg = 0; 1470*5184Sek110237 sbuf[1].sem_num = target->fo_semid_hw; 1471*5184Sek110237 sbuf[1].sem_op = value * -1; 1472*5184Sek110237 sbuf[1].sem_flg = 0; 1473*5184Sek110237 timeout.tv_sec = 600; 1474*5184Sek110237 timeout.tv_nsec = 0; 1475*5184Sek110237 1476*5184Sek110237 if (*flowop->fo_blocking) 1477*5184Sek110237 blocking = 1; 1478*5184Sek110237 else 1479*5184Sek110237 blocking = 0; 1480*5184Sek110237 1481*5184Sek110237 #ifdef HAVE_SEMTIMEDOP 1482*5184Sek110237 if ((semtimedop(semid, &sbuf[0], blocking + 1, 1483*5184Sek110237 &timeout) == -1) && (errno && (errno != EAGAIN))) { 1484*5184Sek110237 #else 1485*5184Sek110237 if ((semop(semid, &sbuf[0], blocking + 1) == -1) && 1486*5184Sek110237 (errno && (errno != EAGAIN))) { 1487*5184Sek110237 #endif /* HAVE_SEMTIMEDOP */ 1488*5184Sek110237 filebench_log(LOG_ERROR, "semop post failed: %s", 1489*5184Sek110237 strerror(errno)); 1490*5184Sek110237 /* ipc_mutex_unlock(&target->fo_lock); */ 1491*5184Sek110237 return (-1); 1492*5184Sek110237 } 1493*5184Sek110237 1494*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 1495*5184Sek110237 "flow %s-%d finished posting", 1496*5184Sek110237 target->fo_name, target->fo_instance); 1497*5184Sek110237 #else 1498*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, 1499*5184Sek110237 "sempost flow %s-%d to posix semaphore", 1500*5184Sek110237 target->fo_name, 1501*5184Sek110237 target->fo_instance); 1502*5184Sek110237 1503*5184Sek110237 /* Increment sem by value */ 1504*5184Sek110237 for (i = 0; i < value; i++) { 1505*5184Sek110237 if (sem_post(&target->fo_sem) == -1) { 1506*5184Sek110237 filebench_log(LOG_ERROR, "semop post failed"); 1507*5184Sek110237 /* ipc_mutex_unlock(&target->fo_lock); */ 1508*5184Sek110237 return (-1); 1509*5184Sek110237 } 1510*5184Sek110237 } 1511*5184Sek110237 1512*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking", 1513*5184Sek110237 target->fo_name, target->fo_instance); 1514*5184Sek110237 #endif /* HAVE_SYSV_SEM */ 1515*5184Sek110237 1516*5184Sek110237 target = target->fo_targetnext; 1517*5184Sek110237 } 1518*5184Sek110237 flowop_endop(threadflow, flowop); 1519*5184Sek110237 1520*5184Sek110237 return (0); 1521*5184Sek110237 } 1522*5184Sek110237 1523*5184Sek110237 1524*5184Sek110237 /* 1525*5184Sek110237 * Section for exercising create / open / close / delete operations 1526*5184Sek110237 * on files within a fileset. For proper operation, the flowop attribute 1527*5184Sek110237 * "fd", which sets the fo_fdnumber field in the flowop, must be used 1528*5184Sek110237 * so that the same file is opened and later closed. "fd" is an index 1529*5184Sek110237 * into a pair of arrays maintained by threadflows, one of which 1530*5184Sek110237 * contains the operating system assigned file descriptors and the other 1531*5184Sek110237 * a pointer to the filesetentry whose file the file descriptor 1532*5184Sek110237 * references. An openfile flowop defined without fd being set will use 1533*5184Sek110237 * the default (0) fd or, if specified, rotate through fd indices, but 1534*5184Sek110237 * createfile and closefile must use the default or a specified fd. 1535*5184Sek110237 * Meanwhile deletefile picks and arbitrary file to delete, regardless 1536*5184Sek110237 * of fd attribute. 1537*5184Sek110237 */ 1538*5184Sek110237 1539*5184Sek110237 /* 1540*5184Sek110237 * XXX Making file selection more consistent among the flowops might good 1541*5184Sek110237 */ 1542*5184Sek110237 1543*5184Sek110237 1544*5184Sek110237 /* 1545*5184Sek110237 * Emulates (and actually does) file open. Obtains a file descriptor 1546*5184Sek110237 * index, then calls flowoplib_openfile_common() to open. Returns -1 1547*5184Sek110237 * if not file descriptor is found or flowoplib_openfile_common 1548*5184Sek110237 * encounters an error, otherwise 0. 1549*5184Sek110237 */ 1550*5184Sek110237 static int 1551*5184Sek110237 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop) 1552*5184Sek110237 { 1553*5184Sek110237 int fd = flowoplib_fdnum(threadflow, flowop); 1554*5184Sek110237 1555*5184Sek110237 if (fd == -1) 1556*5184Sek110237 return (-1); 1557*5184Sek110237 1558*5184Sek110237 return (flowoplib_openfile_common(threadflow, flowop, fd)); 1559*5184Sek110237 } 1560*5184Sek110237 1561*5184Sek110237 /* 1562*5184Sek110237 * Common file opening code for filesets. Uses the supplied 1563*5184Sek110237 * file descriptor index to determine the tf_fd entry to use. 1564*5184Sek110237 * If the entry is empty (0) and the fileset exists, fileset 1565*5184Sek110237 * pick is called to select a fileset entry to use. The file 1566*5184Sek110237 * specified in the filesetentry is opened, and the returned 1567*5184Sek110237 * operating system file descriptor and a pointer to the 1568*5184Sek110237 * filesetentry are stored in tf_fd[fd] and tf_fse[fd], 1569*5184Sek110237 * respectively. Returns -1 on error, 0 on success. 1570*5184Sek110237 */ 1571*5184Sek110237 static int 1572*5184Sek110237 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd) 1573*5184Sek110237 { 1574*5184Sek110237 filesetentry_t *file; 1575*5184Sek110237 int tid = 0; 1576*5184Sek110237 1577*5184Sek110237 /* 1578*5184Sek110237 * If the flowop doesn't default to persistent fd 1579*5184Sek110237 * then get unique thread ID for use by fileset_pick 1580*5184Sek110237 */ 1581*5184Sek110237 if (integer_isset(flowop->fo_rotatefd)) 1582*5184Sek110237 tid = threadflow->tf_utid; 1583*5184Sek110237 1584*5184Sek110237 if (threadflow->tf_fd[fd] != 0) { 1585*5184Sek110237 filebench_log(LOG_ERROR, 1586*5184Sek110237 "flowop %s attempted to open without closing on fd %d", 1587*5184Sek110237 flowop->fo_name, fd); 1588*5184Sek110237 return (-1); 1589*5184Sek110237 } 1590*5184Sek110237 1591*5184Sek110237 if (flowop->fo_fileset == NULL) { 1592*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 1593*5184Sek110237 return (-1); 1594*5184Sek110237 } 1595*5184Sek110237 1596*5184Sek110237 if ((file = fileset_pick(flowop->fo_fileset, 1597*5184Sek110237 FILESET_PICKEXISTS, tid)) == NULL) { 1598*5184Sek110237 filebench_log(LOG_ERROR, 1599*5184Sek110237 "flowop %s failed to pick file from %s on fd %d", 1600*5184Sek110237 flowop->fo_name, 1601*5184Sek110237 flowop->fo_fileset->fs_name, fd); 1602*5184Sek110237 return (-1); 1603*5184Sek110237 } 1604*5184Sek110237 1605*5184Sek110237 threadflow->tf_fse[fd] = file; 1606*5184Sek110237 1607*5184Sek110237 flowop_beginop(threadflow, flowop); 1608*5184Sek110237 threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset, 1609*5184Sek110237 file, O_RDWR, 0666, flowoplib_fileattrs(flowop)); 1610*5184Sek110237 flowop_endop(threadflow, flowop); 1611*5184Sek110237 1612*5184Sek110237 if (threadflow->tf_fd[fd] < 0) { 1613*5184Sek110237 filebench_log(LOG_ERROR, "failed to open file %s", 1614*5184Sek110237 flowop->fo_name); 1615*5184Sek110237 return (-1); 1616*5184Sek110237 } 1617*5184Sek110237 1618*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, 1619*5184Sek110237 "flowop %s: opened %s fd[%d] = %d", 1620*5184Sek110237 flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]); 1621*5184Sek110237 1622*5184Sek110237 return (0); 1623*5184Sek110237 } 1624*5184Sek110237 1625*5184Sek110237 /* 1626*5184Sek110237 * Emulate create of a file. Uses the flowop's fdnumber to select 1627*5184Sek110237 * tf_fd and tf_fse array locations to put the created file's file 1628*5184Sek110237 * descriptor and filesetentry respectively. Uses fileset_pick() 1629*5184Sek110237 * to select a specific filesetentry whose file does not currently 1630*5184Sek110237 * exist for the file create operation. Then calls 1631*5184Sek110237 * fileset_openfile() with the O_CREATE flag set to create the 1632*5184Sek110237 * file. Returns -1 if the array index specified by fdnumber is 1633*5184Sek110237 * already in use, the flowop has no associated fileset, or 1634*5184Sek110237 * the create call fails. Returns 1 if a filesetentry with a 1635*5184Sek110237 * nonexistent file cannot be found. Returns 0 on success. 1636*5184Sek110237 */ 1637*5184Sek110237 static int 1638*5184Sek110237 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop) 1639*5184Sek110237 { 1640*5184Sek110237 filesetentry_t *file; 1641*5184Sek110237 int fd = flowop->fo_fdnumber; 1642*5184Sek110237 1643*5184Sek110237 if (threadflow->tf_fd[fd] != 0) { 1644*5184Sek110237 filebench_log(LOG_ERROR, 1645*5184Sek110237 "flowop %s attempted to create without closing on fd %d", 1646*5184Sek110237 flowop->fo_name, fd); 1647*5184Sek110237 return (-1); 1648*5184Sek110237 } 1649*5184Sek110237 1650*5184Sek110237 if (flowop->fo_fileset == NULL) { 1651*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 1652*5184Sek110237 return (-1); 1653*5184Sek110237 } 1654*5184Sek110237 1655*5184Sek110237 if ((file = fileset_pick(flowop->fo_fileset, 1656*5184Sek110237 FILESET_PICKNOEXIST, 0)) == NULL) { 1657*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file", 1658*5184Sek110237 flowop->fo_name); 1659*5184Sek110237 return (1); 1660*5184Sek110237 } 1661*5184Sek110237 1662*5184Sek110237 threadflow->tf_fse[fd] = file; 1663*5184Sek110237 1664*5184Sek110237 flowop_beginop(threadflow, flowop); 1665*5184Sek110237 threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset, 1666*5184Sek110237 file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop)); 1667*5184Sek110237 flowop_endop(threadflow, flowop); 1668*5184Sek110237 1669*5184Sek110237 if (threadflow->tf_fd[fd] < 0) { 1670*5184Sek110237 filebench_log(LOG_ERROR, "failed to create file %s", 1671*5184Sek110237 flowop->fo_name); 1672*5184Sek110237 return (-1); 1673*5184Sek110237 } 1674*5184Sek110237 1675*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, 1676*5184Sek110237 "flowop %s: created %s fd[%d] = %d", 1677*5184Sek110237 flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]); 1678*5184Sek110237 1679*5184Sek110237 return (0); 1680*5184Sek110237 } 1681*5184Sek110237 1682*5184Sek110237 /* 1683*5184Sek110237 * Emulates delete of a file. Picks an arbitrary filesetentry 1684*5184Sek110237 * whose file exists and uses unlink() to delete it. Clears 1685*5184Sek110237 * the FSE_EXISTS flag for the filesetentry. Returns -1 if the 1686*5184Sek110237 * flowop has no associated fileset. Returns 1 if an appropriate 1687*5184Sek110237 * filesetentry cannot be found, and 0 on success. 1688*5184Sek110237 */ 1689*5184Sek110237 static int 1690*5184Sek110237 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop) 1691*5184Sek110237 { 1692*5184Sek110237 filesetentry_t *file; 1693*5184Sek110237 fileset_t *fileset; 1694*5184Sek110237 char path[MAXPATHLEN]; 1695*5184Sek110237 char *pathtmp; 1696*5184Sek110237 1697*5184Sek110237 if (flowop->fo_fileset == NULL) { 1698*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 1699*5184Sek110237 return (-1); 1700*5184Sek110237 } 1701*5184Sek110237 1702*5184Sek110237 fileset = flowop->fo_fileset; 1703*5184Sek110237 1704*5184Sek110237 if ((file = fileset_pick(flowop->fo_fileset, 1705*5184Sek110237 FILESET_PICKEXISTS, 0)) == NULL) { 1706*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file", 1707*5184Sek110237 flowop->fo_name); 1708*5184Sek110237 return (1); 1709*5184Sek110237 } 1710*5184Sek110237 1711*5184Sek110237 *path = 0; 1712*5184Sek110237 (void) strcpy(path, *fileset->fs_path); 1713*5184Sek110237 (void) strcat(path, "/"); 1714*5184Sek110237 (void) strcat(path, fileset->fs_name); 1715*5184Sek110237 pathtmp = fileset_resolvepath(file); 1716*5184Sek110237 (void) strcat(path, pathtmp); 1717*5184Sek110237 free(pathtmp); 1718*5184Sek110237 1719*5184Sek110237 flowop_beginop(threadflow, flowop); 1720*5184Sek110237 (void) unlink(path); 1721*5184Sek110237 flowop_endop(threadflow, flowop); 1722*5184Sek110237 file->fse_flags &= ~FSE_EXISTS; 1723*5184Sek110237 (void) ipc_mutex_unlock(&file->fse_lock); 1724*5184Sek110237 1725*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path); 1726*5184Sek110237 1727*5184Sek110237 return (0); 1728*5184Sek110237 } 1729*5184Sek110237 1730*5184Sek110237 /* 1731*5184Sek110237 * Emulates fsync of a file. Obtains the file descriptor index 1732*5184Sek110237 * from the flowop, obtains the actual file descriptor from 1733*5184Sek110237 * the threadflow's table, checks to be sure it is still an 1734*5184Sek110237 * open file, then does an fsync operation on it. Returns -1 1735*5184Sek110237 * if the file no longer is open, 0 otherwise. 1736*5184Sek110237 */ 1737*5184Sek110237 static int 1738*5184Sek110237 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop) 1739*5184Sek110237 { 1740*5184Sek110237 filesetentry_t *file; 1741*5184Sek110237 int fd = flowop->fo_fdnumber; 1742*5184Sek110237 1743*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 1744*5184Sek110237 filebench_log(LOG_ERROR, 1745*5184Sek110237 "flowop %s attempted to fsync a closed fd %d", 1746*5184Sek110237 flowop->fo_name, fd); 1747*5184Sek110237 return (-1); 1748*5184Sek110237 } 1749*5184Sek110237 1750*5184Sek110237 /* Measure time to fsync */ 1751*5184Sek110237 flowop_beginop(threadflow, flowop); 1752*5184Sek110237 (void) fsync(threadflow->tf_fd[fd]); 1753*5184Sek110237 flowop_endop(threadflow, flowop); 1754*5184Sek110237 1755*5184Sek110237 file = threadflow->tf_fse[fd]; 1756*5184Sek110237 1757*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path); 1758*5184Sek110237 1759*5184Sek110237 return (0); 1760*5184Sek110237 } 1761*5184Sek110237 1762*5184Sek110237 /* 1763*5184Sek110237 * Emulate fsync of an entire fileset. Search through the 1764*5184Sek110237 * threadflow's file descriptor array, doing fsync() on each 1765*5184Sek110237 * open file that belongs to the flowop's fileset. Always 1766*5184Sek110237 * returns 0. 1767*5184Sek110237 */ 1768*5184Sek110237 static int 1769*5184Sek110237 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop) 1770*5184Sek110237 { 1771*5184Sek110237 int fd; 1772*5184Sek110237 1773*5184Sek110237 for (fd = 0; fd < THREADFLOW_MAXFD; fd++) { 1774*5184Sek110237 filesetentry_t *file; 1775*5184Sek110237 1776*5184Sek110237 /* Match the file set to fsync */ 1777*5184Sek110237 if ((threadflow->tf_fse[fd] == NULL) || 1778*5184Sek110237 (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset)) 1779*5184Sek110237 continue; 1780*5184Sek110237 1781*5184Sek110237 /* Measure time to fsync */ 1782*5184Sek110237 flowop_beginop(threadflow, flowop); 1783*5184Sek110237 (void) fsync(threadflow->tf_fd[fd]); 1784*5184Sek110237 flowop_endop(threadflow, flowop); 1785*5184Sek110237 1786*5184Sek110237 file = threadflow->tf_fse[fd]; 1787*5184Sek110237 1788*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", 1789*5184Sek110237 file->fse_path); 1790*5184Sek110237 } 1791*5184Sek110237 1792*5184Sek110237 return (0); 1793*5184Sek110237 } 1794*5184Sek110237 1795*5184Sek110237 /* 1796*5184Sek110237 * Emulate close of a file. Obtains the file descriptor index 1797*5184Sek110237 * from the flowop, obtains the actual file descriptor from the 1798*5184Sek110237 * threadflow's table, checks to be sure it is still an open 1799*5184Sek110237 * file, then does a close operation on it. Then sets the 1800*5184Sek110237 * threadflow file descriptor table entry to 0, and the file set 1801*5184Sek110237 * entry pointer to NULL. Returns -1 if the file was not open, 1802*5184Sek110237 * 0 otherwise. 1803*5184Sek110237 */ 1804*5184Sek110237 static int 1805*5184Sek110237 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop) 1806*5184Sek110237 { 1807*5184Sek110237 filesetentry_t *file; 1808*5184Sek110237 int fd = flowop->fo_fdnumber; 1809*5184Sek110237 1810*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 1811*5184Sek110237 filebench_log(LOG_ERROR, 1812*5184Sek110237 "flowop %s attempted to close an already closed fd %d", 1813*5184Sek110237 flowop->fo_name, fd); 1814*5184Sek110237 return (-1); 1815*5184Sek110237 } 1816*5184Sek110237 1817*5184Sek110237 /* Measure time to close */ 1818*5184Sek110237 flowop_beginop(threadflow, flowop); 1819*5184Sek110237 (void) close(threadflow->tf_fd[fd]); 1820*5184Sek110237 flowop_endop(threadflow, flowop); 1821*5184Sek110237 1822*5184Sek110237 file = threadflow->tf_fse[fd]; 1823*5184Sek110237 1824*5184Sek110237 threadflow->tf_fd[fd] = 0; 1825*5184Sek110237 threadflow->tf_fse[fd] = NULL; 1826*5184Sek110237 1827*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path); 1828*5184Sek110237 1829*5184Sek110237 return (0); 1830*5184Sek110237 } 1831*5184Sek110237 1832*5184Sek110237 /* 1833*5184Sek110237 * Emulate stat of a file. Picks an arbitrary filesetentry with 1834*5184Sek110237 * an existing file from the flowop's fileset, then performs a 1835*5184Sek110237 * stat() operation on it. Returns -1 if the flowop has no 1836*5184Sek110237 * associated fileset. Returns 1 if an appropriate filesetentry 1837*5184Sek110237 * cannot be found, and 0 on success. 1838*5184Sek110237 */ 1839*5184Sek110237 static int 1840*5184Sek110237 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop) 1841*5184Sek110237 { 1842*5184Sek110237 filesetentry_t *file; 1843*5184Sek110237 fileset_t *fileset; 1844*5184Sek110237 char path[MAXPATHLEN]; 1845*5184Sek110237 char *pathtmp; 1846*5184Sek110237 1847*5184Sek110237 if (flowop->fo_fileset == NULL) { 1848*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 1849*5184Sek110237 return (-1); 1850*5184Sek110237 } 1851*5184Sek110237 1852*5184Sek110237 fileset = flowop->fo_fileset; 1853*5184Sek110237 1854*5184Sek110237 if ((file = fileset_pick(flowop->fo_fileset, 1855*5184Sek110237 FILESET_PICKEXISTS, 0)) == NULL) { 1856*5184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file", 1857*5184Sek110237 flowop->fo_name); 1858*5184Sek110237 return (1); 1859*5184Sek110237 } 1860*5184Sek110237 1861*5184Sek110237 *path = 0; 1862*5184Sek110237 (void) strcpy(path, *fileset->fs_path); 1863*5184Sek110237 (void) strcat(path, "/"); 1864*5184Sek110237 (void) strcat(path, fileset->fs_name); 1865*5184Sek110237 pathtmp = fileset_resolvepath(file); 1866*5184Sek110237 (void) strcat(path, pathtmp); 1867*5184Sek110237 free(pathtmp); 1868*5184Sek110237 1869*5184Sek110237 flowop_beginop(threadflow, flowop); 1870*5184Sek110237 flowop_endop(threadflow, flowop); 1871*5184Sek110237 1872*5184Sek110237 (void) ipc_mutex_unlock(&file->fse_lock); 1873*5184Sek110237 1874*5184Sek110237 return (0); 1875*5184Sek110237 } 1876*5184Sek110237 1877*5184Sek110237 1878*5184Sek110237 /* 1879*5184Sek110237 * Additional reads and writes. Read and write whole files, write 1880*5184Sek110237 * and append to files. Some of these work with both fileobjs and 1881*5184Sek110237 * filesets, others only with filesets. The flowoplib_write routine 1882*5184Sek110237 * writes from thread memory, while the others read or write using 1883*5184Sek110237 * fo_buf memory. Note that both flowoplib_read() and 1884*5184Sek110237 * flowoplib_aiowrite() use thread memory as well. 1885*5184Sek110237 */ 1886*5184Sek110237 1887*5184Sek110237 1888*5184Sek110237 /* 1889*5184Sek110237 * Emulate a read of a whole file. The file must be open 1890*5184Sek110237 * with file descriptor and filesetentry stored at the 1891*5184Sek110237 * locations indexed by the flowop's fdnumber. It then seeks 1892*5184Sek110237 * to the beginning of the associated file, and reads 1893*5184Sek110237 * FILE_ALLOC_BLOCK bytes at a time until the end of the 1894*5184Sek110237 * file. Returns -1 on error, 0 on success. 1895*5184Sek110237 */ 1896*5184Sek110237 static int 1897*5184Sek110237 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop) 1898*5184Sek110237 { 1899*5184Sek110237 size_t memoffset; 1900*5184Sek110237 long memsize, round; 1901*5184Sek110237 off64_t bytes = 0; 1902*5184Sek110237 int fd = flowop->fo_fdnumber; 1903*5184Sek110237 int ret; 1904*5184Sek110237 1905*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 1906*5184Sek110237 filebench_log(LOG_ERROR, 1907*5184Sek110237 "flowop %s attempted to read a closed fd %d", 1908*5184Sek110237 flowop->fo_name, fd); 1909*5184Sek110237 return (-1); 1910*5184Sek110237 } 1911*5184Sek110237 1912*5184Sek110237 if ((flowop->fo_buf == NULL) && 1913*5184Sek110237 ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) 1914*5184Sek110237 return (-1); 1915*5184Sek110237 1916*5184Sek110237 if (threadflow->tf_fse[fd] == NULL) { 1917*5184Sek110237 filebench_log(LOG_ERROR, "flowop %s: NULL file", 1918*5184Sek110237 flowop->fo_name); 1919*5184Sek110237 return (-1); 1920*5184Sek110237 } 1921*5184Sek110237 1922*5184Sek110237 memsize = *threadflow->tf_memsize; 1923*5184Sek110237 round = *flowop->fo_iosize; 1924*5184Sek110237 if (filebench_randomno(&memoffset, memsize, round) == -1) { 1925*5184Sek110237 filebench_log(LOG_ERROR, 1926*5184Sek110237 "tf_memsize smaller than IO size for thread %s", 1927*5184Sek110237 flowop->fo_name); 1928*5184Sek110237 return (-1); 1929*5184Sek110237 } 1930*5184Sek110237 1931*5184Sek110237 /* Measure time to read bytes */ 1932*5184Sek110237 flowop_beginop(threadflow, flowop); 1933*5184Sek110237 (void) lseek64(threadflow->tf_fd[fd], 0, SEEK_SET); 1934*5184Sek110237 while ((ret = read(threadflow->tf_fd[fd], flowop->fo_buf, 1935*5184Sek110237 FILE_ALLOC_BLOCK)) > 0) 1936*5184Sek110237 bytes += ret; 1937*5184Sek110237 1938*5184Sek110237 flowop_endop(threadflow, flowop); 1939*5184Sek110237 1940*5184Sek110237 if (ret < 0) { 1941*5184Sek110237 filebench_log(LOG_ERROR, 1942*5184Sek110237 "Failed to read fd %d: %s", 1943*5184Sek110237 fd, strerror(errno)); 1944*5184Sek110237 return (-1); 1945*5184Sek110237 } 1946*5184Sek110237 1947*5184Sek110237 if (flowop->fo_iosize == NULL) 1948*5184Sek110237 flowop->fo_iosize = integer_alloc(bytes); 1949*5184Sek110237 *(flowop->fo_iosize) = bytes; 1950*5184Sek110237 1951*5184Sek110237 return (0); 1952*5184Sek110237 } 1953*5184Sek110237 1954*5184Sek110237 /* 1955*5184Sek110237 * Emulate a write to a file of size fo_iosize. Will write 1956*5184Sek110237 * to a file from a fileset if the flowop's fo_fileset field 1957*5184Sek110237 * specifies one or its fdnumber is non zero. Otherwise it 1958*5184Sek110237 * will write to a fileobj file, if one exists. If the file 1959*5184Sek110237 * is not currently open, the routine will attempt to open 1960*5184Sek110237 * it. The flowop's fo_wss parameter will be used to set the 1961*5184Sek110237 * maximum file size if it is non-zero, otherwise the 1962*5184Sek110237 * filesetentry's fse_size will be used. A random memory 1963*5184Sek110237 * buffer offset is calculated, and, if fo_random is TRUE, 1964*5184Sek110237 * a random file offset is used for the write. Otherwise the 1965*5184Sek110237 * write is to the next sequential location. Returns 1 on 1966*5184Sek110237 * errors, 0 on success. 1967*5184Sek110237 */ 1968*5184Sek110237 static int 1969*5184Sek110237 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop) 1970*5184Sek110237 { 1971*5184Sek110237 size_t memoffset; 1972*5184Sek110237 vinteger_t wss; 1973*5184Sek110237 long memsize, round; 1974*5184Sek110237 int filedesc; 1975*5184Sek110237 1976*5184Sek110237 if (flowop->fo_fileset || flowop->fo_fdnumber) { 1977*5184Sek110237 int fd = flowoplib_fdnum(threadflow, flowop); 1978*5184Sek110237 1979*5184Sek110237 if (fd == -1) 1980*5184Sek110237 return (-1); 1981*5184Sek110237 1982*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 1983*5184Sek110237 (void) flowoplib_openfile_common(threadflow, 1984*5184Sek110237 flowop, fd); 1985*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "read opened file %s", 1986*5184Sek110237 threadflow->tf_fse[fd]->fse_path); 1987*5184Sek110237 } 1988*5184Sek110237 filedesc = threadflow->tf_fd[fd]; 1989*5184Sek110237 if (*flowop->fo_wss == 0) 1990*5184Sek110237 wss = threadflow->tf_fse[fd]->fse_size; 1991*5184Sek110237 else 1992*5184Sek110237 wss = *flowop->fo_wss; 1993*5184Sek110237 } else { 1994*5184Sek110237 if (flowop->fo_file == NULL) { 1995*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 1996*5184Sek110237 return (-1); 1997*5184Sek110237 } 1998*5184Sek110237 if (flowop->fo_fd < 0) 1999*5184Sek110237 flowop->fo_fd = fileobj_open(flowop->fo_file, 2000*5184Sek110237 flowoplib_fileattrs(flowop)); 2001*5184Sek110237 2002*5184Sek110237 if (flowop->fo_fd < 0) { 2003*5184Sek110237 filebench_log(LOG_ERROR, "failed to open file %s", 2004*5184Sek110237 flowop->fo_file->fo_name); 2005*5184Sek110237 return (-1); 2006*5184Sek110237 } 2007*5184Sek110237 filedesc = flowop->fo_fd; 2008*5184Sek110237 if (*flowop->fo_wss == 0) 2009*5184Sek110237 wss = *flowop->fo_file->fo_size; 2010*5184Sek110237 else 2011*5184Sek110237 wss = *flowop->fo_wss; 2012*5184Sek110237 } 2013*5184Sek110237 2014*5184Sek110237 if ((flowop->fo_buf == NULL) && 2015*5184Sek110237 ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { 2016*5184Sek110237 return (-1); 2017*5184Sek110237 } 2018*5184Sek110237 2019*5184Sek110237 if (*flowop->fo_iosize == 0) { 2020*5184Sek110237 filebench_log(LOG_ERROR, "zero iosize for thread %s", 2021*5184Sek110237 flowop->fo_name); 2022*5184Sek110237 return (-1); 2023*5184Sek110237 } 2024*5184Sek110237 2025*5184Sek110237 memsize = *threadflow->tf_memsize; 2026*5184Sek110237 round = *flowop->fo_iosize; 2027*5184Sek110237 2028*5184Sek110237 /* Select memory offset for IO */ 2029*5184Sek110237 if (filebench_randomno(&memoffset, memsize, round) == -1) { 2030*5184Sek110237 filebench_log(LOG_ERROR, 2031*5184Sek110237 "tf_memsize smaller than IO size for thread %s", 2032*5184Sek110237 flowop->fo_name); 2033*5184Sek110237 return (-1); 2034*5184Sek110237 } 2035*5184Sek110237 2036*5184Sek110237 if (*flowop->fo_random) { 2037*5184Sek110237 uint64_t fileoffset; 2038*5184Sek110237 2039*5184Sek110237 if (filebench_randomno64(&fileoffset, 2040*5184Sek110237 wss, *flowop->fo_iosize) == -1) { 2041*5184Sek110237 filebench_log(LOG_ERROR, 2042*5184Sek110237 "file size smaller than IO size for thread %s", 2043*5184Sek110237 flowop->fo_name); 2044*5184Sek110237 return (-1); 2045*5184Sek110237 } 2046*5184Sek110237 flowop_beginop(threadflow, flowop); 2047*5184Sek110237 if (pwrite64(filedesc, threadflow->tf_mem + memoffset, 2048*5184Sek110237 *flowop->fo_iosize, (off64_t)fileoffset) == -1) { 2049*5184Sek110237 filebench_log(LOG_ERROR, "write failed, " 2050*5184Sek110237 "offset %lld memoffset %zd: %s", 2051*5184Sek110237 fileoffset, memoffset, strerror(errno)); 2052*5184Sek110237 flowop_endop(threadflow, flowop); 2053*5184Sek110237 return (-1); 2054*5184Sek110237 } 2055*5184Sek110237 flowop_endop(threadflow, flowop); 2056*5184Sek110237 } else { 2057*5184Sek110237 flowop_beginop(threadflow, flowop); 2058*5184Sek110237 if (write(filedesc, threadflow->tf_mem + memoffset, 2059*5184Sek110237 *flowop->fo_iosize) == -1) { 2060*5184Sek110237 filebench_log(LOG_ERROR, 2061*5184Sek110237 "write failed, memoffset %zd: %s", 2062*5184Sek110237 memoffset, strerror(errno)); 2063*5184Sek110237 flowop_endop(threadflow, flowop); 2064*5184Sek110237 return (-1); 2065*5184Sek110237 } 2066*5184Sek110237 flowop_endop(threadflow, flowop); 2067*5184Sek110237 } 2068*5184Sek110237 2069*5184Sek110237 return (0); 2070*5184Sek110237 } 2071*5184Sek110237 2072*5184Sek110237 /* 2073*5184Sek110237 * Emulate a write of a whole file. The size of the file 2074*5184Sek110237 * is taken from a filesetentry identified by fo_srcfdnumber, 2075*5184Sek110237 * while the file descriptor used is identified by 2076*5184Sek110237 * fo_fdnumber. Does multiple writes of FILE_ALLOC_BLOCK 2077*5184Sek110237 * length until full file has been written. Returns -1 on 2078*5184Sek110237 * error, 0 on success and sets flowop->fo_iosize to the 2079*5184Sek110237 * number of bytes actually written. 2080*5184Sek110237 */ 2081*5184Sek110237 static int 2082*5184Sek110237 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop) 2083*5184Sek110237 { 2084*5184Sek110237 size_t memoffset; 2085*5184Sek110237 filesetentry_t *file; 2086*5184Sek110237 int wsize; 2087*5184Sek110237 off64_t seek; 2088*5184Sek110237 off64_t bytes = 0; 2089*5184Sek110237 long memsize, round; 2090*5184Sek110237 int fd = flowop->fo_fdnumber; 2091*5184Sek110237 int srcfd = flowop->fo_srcfdnumber; 2092*5184Sek110237 int ret; 2093*5184Sek110237 2094*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 2095*5184Sek110237 filebench_log(LOG_ERROR, 2096*5184Sek110237 "flowop %s attempted to write a closed fd %d", 2097*5184Sek110237 flowop->fo_name, fd); 2098*5184Sek110237 return (-1); 2099*5184Sek110237 } 2100*5184Sek110237 2101*5184Sek110237 if ((flowop->fo_buf == NULL) && 2102*5184Sek110237 ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { 2103*5184Sek110237 return (-1); 2104*5184Sek110237 } 2105*5184Sek110237 2106*5184Sek110237 memsize = *threadflow->tf_memsize; 2107*5184Sek110237 round = *flowop->fo_iosize; 2108*5184Sek110237 if (filebench_randomno(&memoffset, memsize, round) == -1) { 2109*5184Sek110237 filebench_log(LOG_ERROR, 2110*5184Sek110237 "tf_memsize smaller than IO size for thread %s", 2111*5184Sek110237 flowop->fo_name); 2112*5184Sek110237 } 2113*5184Sek110237 2114*5184Sek110237 file = threadflow->tf_fse[srcfd]; 2115*5184Sek110237 if (((srcfd != 0) && (file == NULL)) || 2116*5184Sek110237 ((file = threadflow->tf_fse[fd]) == NULL)) { 2117*5184Sek110237 filebench_log(LOG_ERROR, "flowop %s: NULL file", 2118*5184Sek110237 flowop->fo_name); 2119*5184Sek110237 return (-1); 2120*5184Sek110237 } 2121*5184Sek110237 2122*5184Sek110237 wsize = MIN(file->fse_size, FILE_ALLOC_BLOCK); 2123*5184Sek110237 2124*5184Sek110237 /* Measure time to write bytes */ 2125*5184Sek110237 flowop_beginop(threadflow, flowop); 2126*5184Sek110237 for (seek = 0; seek < file->fse_size; seek += FILE_ALLOC_BLOCK) { 2127*5184Sek110237 ret = write(threadflow->tf_fd[fd], flowop->fo_buf, wsize); 2128*5184Sek110237 if (ret != wsize) { 2129*5184Sek110237 filebench_log(LOG_ERROR, 2130*5184Sek110237 "Failed to write %d bytes on fd %d: %s", 2131*5184Sek110237 threadflow->tf_fd[fd], fd, strerror(errno)); 2132*5184Sek110237 flowop_endop(threadflow, flowop); 2133*5184Sek110237 return (-1); 2134*5184Sek110237 } 2135*5184Sek110237 bytes += ret; 2136*5184Sek110237 } 2137*5184Sek110237 flowop_endop(threadflow, flowop); 2138*5184Sek110237 2139*5184Sek110237 if (flowop->fo_iosize == NULL) 2140*5184Sek110237 flowop->fo_iosize = integer_alloc(bytes); 2141*5184Sek110237 *(flowop->fo_iosize) = bytes; 2142*5184Sek110237 2143*5184Sek110237 return (0); 2144*5184Sek110237 } 2145*5184Sek110237 2146*5184Sek110237 2147*5184Sek110237 /* 2148*5184Sek110237 * Emulate a fixed size append to a file. Will append data to 2149*5184Sek110237 * a file chosen from a fileset if the flowop's fo_fileset 2150*5184Sek110237 * field specifies one or if its fdnumber is non zero. 2151*5184Sek110237 * Otherwise it will write to a fileobj file, if one exists. 2152*5184Sek110237 * The flowop's fo_wss parameter will be used to set the 2153*5184Sek110237 * maximum file size if it is non-zero, otherwise the 2154*5184Sek110237 * filesetentry's fse_size will be used. A random memory 2155*5184Sek110237 * buffer offset is calculated, then a logical seek to the 2156*5184Sek110237 * end of file is done followed by a write of fo_iosize 2157*5184Sek110237 * bytes. Writes are actually done from fo_buf, rather than 2158*5184Sek110237 * tf_mem as is done with flowoplib_write(), and no check 2159*5184Sek110237 * is made to see if fo_iosize exceeds the size of fo_buf. 2160*5184Sek110237 * Returns -1 on error, 0 on success. 2161*5184Sek110237 */ 2162*5184Sek110237 static int 2163*5184Sek110237 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop) 2164*5184Sek110237 { 2165*5184Sek110237 size_t memoffset; 2166*5184Sek110237 off64_t wsize; 2167*5184Sek110237 long memsize; 2168*5184Sek110237 int fd, filedesc; 2169*5184Sek110237 /* LINTED E_FUNC_SET_NOT_USED */ 2170*5184Sek110237 vinteger_t wss; 2171*5184Sek110237 int ret; 2172*5184Sek110237 2173*5184Sek110237 if (flowop->fo_fileset || flowop->fo_fdnumber) { 2174*5184Sek110237 fd = flowoplib_fdnum(threadflow, flowop); 2175*5184Sek110237 2176*5184Sek110237 if (fd == -1) 2177*5184Sek110237 return (-1); 2178*5184Sek110237 2179*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 2180*5184Sek110237 (void) flowoplib_openfile_common(threadflow, 2181*5184Sek110237 flowop, fd); 2182*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "read opened file %s", 2183*5184Sek110237 threadflow->tf_fse[fd]->fse_path); 2184*5184Sek110237 } 2185*5184Sek110237 filedesc = threadflow->tf_fd[fd]; 2186*5184Sek110237 if (*flowop->fo_wss == 0) 2187*5184Sek110237 wss = threadflow->tf_fse[fd]->fse_size; 2188*5184Sek110237 else 2189*5184Sek110237 wss = *flowop->fo_wss; 2190*5184Sek110237 } else { 2191*5184Sek110237 if (flowop->fo_file == NULL) { 2192*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 2193*5184Sek110237 return (-1); 2194*5184Sek110237 } 2195*5184Sek110237 if (flowop->fo_fd < 0) 2196*5184Sek110237 flowop->fo_fd = fileobj_open(flowop->fo_file, 2197*5184Sek110237 flowoplib_fileattrs(flowop)); 2198*5184Sek110237 2199*5184Sek110237 if (flowop->fo_fd < 0) { 2200*5184Sek110237 filebench_log(LOG_ERROR, "failed to open file %s", 2201*5184Sek110237 flowop->fo_file->fo_name); 2202*5184Sek110237 return (-1); 2203*5184Sek110237 } 2204*5184Sek110237 filedesc = flowop->fo_fd; 2205*5184Sek110237 if (*flowop->fo_wss == 0) 2206*5184Sek110237 wss = *flowop->fo_file->fo_size; 2207*5184Sek110237 else 2208*5184Sek110237 wss = *flowop->fo_wss; 2209*5184Sek110237 } 2210*5184Sek110237 /* XXX wss is not being used */ 2211*5184Sek110237 2212*5184Sek110237 if ((flowop->fo_buf == NULL) && 2213*5184Sek110237 ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { 2214*5184Sek110237 return (-1); 2215*5184Sek110237 } 2216*5184Sek110237 2217*5184Sek110237 memsize = *threadflow->tf_memsize; 2218*5184Sek110237 wsize = *flowop->fo_iosize; 2219*5184Sek110237 if (filebench_randomno(&memoffset, memsize, wsize) == -1) { 2220*5184Sek110237 filebench_log(LOG_ERROR, 2221*5184Sek110237 "tf_memsize smaller than IO size for thread %s", 2222*5184Sek110237 flowop->fo_name); 2223*5184Sek110237 return (-1); 2224*5184Sek110237 } 2225*5184Sek110237 2226*5184Sek110237 /* Measure time to write bytes */ 2227*5184Sek110237 flowop_beginop(threadflow, flowop); 2228*5184Sek110237 (void) lseek64(filedesc, 0, SEEK_END); 2229*5184Sek110237 ret = write(filedesc, flowop->fo_buf, wsize); 2230*5184Sek110237 if (ret != wsize) { 2231*5184Sek110237 filebench_log(LOG_ERROR, 2232*5184Sek110237 "Failed to write %d bytes on fd %d: %s", 2233*5184Sek110237 wsize, fd, strerror(errno)); 2234*5184Sek110237 flowop_endop(threadflow, flowop); 2235*5184Sek110237 return (-1); 2236*5184Sek110237 } 2237*5184Sek110237 flowop_endop(threadflow, flowop); 2238*5184Sek110237 2239*5184Sek110237 return (0); 2240*5184Sek110237 } 2241*5184Sek110237 2242*5184Sek110237 /* 2243*5184Sek110237 * Emulate a random size append to a file. Will append data 2244*5184Sek110237 * to a file chosen from a fileset if the flowop's fo_fileset 2245*5184Sek110237 * field specifies one or if its fdnumber is non zero. Otherwise 2246*5184Sek110237 * it will write to a fileobj file, if one exists. The flowop's 2247*5184Sek110237 * fo_wss parameter will be used to set the maximum file size 2248*5184Sek110237 * if it is non-zero, otherwise the filesetentry's fse_size 2249*5184Sek110237 * will be used. A random transfer size (but at most fo_iosize 2250*5184Sek110237 * bytes) and a random memory offset are calculated. A logical 2251*5184Sek110237 * seek to the end of file is done, then writes of up to 2252*5184Sek110237 * FILE_ALLOC_BLOCK in size are done until the full transfer 2253*5184Sek110237 * size has been written. Writes are actually done from fo_buf, 2254*5184Sek110237 * rather than tf_mem as is done with flowoplib_write(). 2255*5184Sek110237 * Returns -1 on error, 0 on success. 2256*5184Sek110237 */ 2257*5184Sek110237 static int 2258*5184Sek110237 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop) 2259*5184Sek110237 { 2260*5184Sek110237 size_t memoffset; 2261*5184Sek110237 uint64_t appendsize; 2262*5184Sek110237 off64_t seek; 2263*5184Sek110237 long memsize, round; 2264*5184Sek110237 int fd, filedesc; 2265*5184Sek110237 /* LINTED E_FUNC_SET_NOT_USED */ 2266*5184Sek110237 vinteger_t wss; 2267*5184Sek110237 2268*5184Sek110237 if (flowop->fo_fileset || flowop->fo_fdnumber) { 2269*5184Sek110237 fd = flowoplib_fdnum(threadflow, flowop); 2270*5184Sek110237 2271*5184Sek110237 if (fd == -1) 2272*5184Sek110237 return (-1); 2273*5184Sek110237 2274*5184Sek110237 if (threadflow->tf_fd[fd] == 0) { 2275*5184Sek110237 (void) flowoplib_openfile_common(threadflow, 2276*5184Sek110237 flowop, fd); 2277*5184Sek110237 filebench_log(LOG_DEBUG_IMPL, "append opened file %s", 2278*5184Sek110237 threadflow->tf_fse[fd]->fse_path); 2279*5184Sek110237 } 2280*5184Sek110237 filedesc = threadflow->tf_fd[fd]; 2281*5184Sek110237 if (*flowop->fo_wss == 0) 2282*5184Sek110237 wss = threadflow->tf_fse[fd]->fse_size; 2283*5184Sek110237 else 2284*5184Sek110237 wss = *flowop->fo_wss; 2285*5184Sek110237 } else { 2286*5184Sek110237 if (flowop->fo_file == NULL) { 2287*5184Sek110237 filebench_log(LOG_ERROR, "flowop NULL file"); 2288*5184Sek110237 return (-1); 2289*5184Sek110237 } 2290*5184Sek110237 if (flowop->fo_fd < 0) 2291*5184Sek110237 flowop->fo_fd = fileobj_open(flowop->fo_file, 2292*5184Sek110237 flowoplib_fileattrs(flowop)); 2293*5184Sek110237 2294*5184Sek110237 if (flowop->fo_fd < 0) { 2295*5184Sek110237 filebench_log(LOG_ERROR, "failed to open file %s", 2296*5184Sek110237 flowop->fo_file->fo_name); 2297*5184Sek110237 return (-1); 2298*5184Sek110237 } 2299*5184Sek110237 filedesc = flowop->fo_fd; 2300*5184Sek110237 if (*flowop->fo_wss == 0) 2301*5184Sek110237 wss = *flowop->fo_file->fo_size; 2302*5184Sek110237 else 2303*5184Sek110237 wss = *flowop->fo_wss; 2304*5184Sek110237 } 2305*5184Sek110237 /* XXX wss is not being used */ 2306*5184Sek110237 2307*5184Sek110237 if ((flowop->fo_buf == NULL) && 2308*5184Sek110237 ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) { 2309*5184Sek110237 return (-1); 2310*5184Sek110237 } 2311*5184Sek110237 2312*5184Sek110237 memsize = *threadflow->tf_memsize; 2313*5184Sek110237 round = *flowop->fo_iosize; 2314*5184Sek110237 if (filebench_randomno(&memoffset, memsize, round) == -1) { 2315*5184Sek110237 filebench_log(LOG_ERROR, "tf_memsize smaller than IO size" 2316*5184Sek110237 "for thread %s", flowop->fo_name); 2317*5184Sek110237 return (-1); 2318*5184Sek110237 } 2319*5184Sek110237 if (filebench_randomno64(&appendsize, *flowop->fo_iosize, 1LL) == -1) { 2320*5184Sek110237 filebench_log(LOG_ERROR, "tf_memsize smaller than IO size" 2321*5184Sek110237 "for thread %s", flowop->fo_name); 2322*5184Sek110237 return (-1); 2323*5184Sek110237 } 2324*5184Sek110237 2325*5184Sek110237 /* Measure time to write bytes */ 2326*5184Sek110237 flowop_beginop(threadflow, flowop); 2327*5184Sek110237 for (seek = 0; seek < appendsize; seek += FILE_ALLOC_BLOCK) { 2328*5184Sek110237 off64_t wsize; 2329*5184Sek110237 int ret = 0; 2330*5184Sek110237 2331*5184Sek110237 (void) lseek64(filedesc, 0, SEEK_END); 2332*5184Sek110237 wsize = ((appendsize - seek) > FILE_ALLOC_BLOCK) ? 2333*5184Sek110237 FILE_ALLOC_BLOCK : (appendsize - seek); 2334*5184Sek110237 ret = write(filedesc, flowop->fo_buf, wsize); 2335*5184Sek110237 if (ret != wsize) { 2336*5184Sek110237 filebench_log(LOG_ERROR, 2337*5184Sek110237 "Failed to write %d bytes on fd %d: %s", 2338*5184Sek110237 wsize, fd, strerror(errno)); 2339*5184Sek110237 flowop_endop(threadflow, flowop); 2340*5184Sek110237 return (-1); 2341*5184Sek110237 } 2342*5184Sek110237 } 2343*5184Sek110237 flowop_endop(threadflow, flowop); 2344*5184Sek110237 2345*5184Sek110237 return (0); 2346*5184Sek110237 } 2347*5184Sek110237 2348*5184Sek110237 2349*5184Sek110237 /* 2350*5184Sek110237 * Prints usage information for flowop operations. 2351*5184Sek110237 */ 2352*5184Sek110237 void 2353*5184Sek110237 flowoplib_usage() 2354*5184Sek110237 { 2355*5184Sek110237 (void) fprintf(stderr, 2356*5184Sek110237 "flowop [openfile|createfile] name=<name>,fileset=<fname>\n"); 2357*5184Sek110237 (void) fprintf(stderr, 2358*5184Sek110237 " [,fd=<file desc num>]\n"); 2359*5184Sek110237 (void) fprintf(stderr, "\n"); 2360*5184Sek110237 (void) fprintf(stderr, 2361*5184Sek110237 "flowop closefile name=<name>,fd=<file desc num>]\n"); 2362*5184Sek110237 (void) fprintf(stderr, "\n"); 2363*5184Sek110237 (void) fprintf(stderr, "flowop deletefile name=<name>\n"); 2364*5184Sek110237 (void) fprintf(stderr, " [,fileset=<fname>]\n"); 2365*5184Sek110237 (void) fprintf(stderr, 2366*5184Sek110237 " [,fd=<file desc num>]\n"); 2367*5184Sek110237 (void) fprintf(stderr, "\n"); 2368*5184Sek110237 (void) fprintf(stderr, "flowop statfile name=<name>\n"); 2369*5184Sek110237 (void) fprintf(stderr, " [,fileset=<fname>]\n"); 2370*5184Sek110237 (void) fprintf(stderr, 2371*5184Sek110237 " [,fd=<file desc num>]\n"); 2372*5184Sek110237 (void) fprintf(stderr, "\n"); 2373*5184Sek110237 (void) fprintf(stderr, 2374*5184Sek110237 "flowop fsync name=<name>,fd=<file desc num>]\n"); 2375*5184Sek110237 (void) fprintf(stderr, "\n"); 2376*5184Sek110237 (void) fprintf(stderr, 2377*5184Sek110237 "flowop fsyncset name=<name>,fileset=<fname>]\n"); 2378*5184Sek110237 (void) fprintf(stderr, "\n"); 2379*5184Sek110237 (void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n"); 2380*5184Sek110237 (void) fprintf(stderr, 2381*5184Sek110237 " filename|fileset=<fname>,\n"); 2382*5184Sek110237 (void) fprintf(stderr, " iosize=<size>\n"); 2383*5184Sek110237 (void) fprintf(stderr, " [,directio]\n"); 2384*5184Sek110237 (void) fprintf(stderr, " [,dsync]\n"); 2385*5184Sek110237 (void) fprintf(stderr, " [,iters=<count>]\n"); 2386*5184Sek110237 (void) fprintf(stderr, " [,random]\n"); 2387*5184Sek110237 (void) fprintf(stderr, " [,opennext]\n"); 2388*5184Sek110237 (void) fprintf(stderr, " [,workingset=<size>]\n"); 2389*5184Sek110237 (void) fprintf(stderr, 2390*5184Sek110237 "flowop [appendfile|appendfilerand] name=<name>, \n"); 2391*5184Sek110237 (void) fprintf(stderr, 2392*5184Sek110237 " filename|fileset=<fname>,\n"); 2393*5184Sek110237 (void) fprintf(stderr, " iosize=<size>\n"); 2394*5184Sek110237 (void) fprintf(stderr, " [,dsync]\n"); 2395*5184Sek110237 (void) fprintf(stderr, " [,iters=<count>]\n"); 2396*5184Sek110237 (void) fprintf(stderr, " [,workingset=<size>]\n"); 2397*5184Sek110237 (void) fprintf(stderr, 2398*5184Sek110237 "flowop [readwholefile|writewholefile] name=<name>, \n"); 2399*5184Sek110237 (void) fprintf(stderr, 2400*5184Sek110237 " filename|fileset=<fname>,\n"); 2401*5184Sek110237 (void) fprintf(stderr, " iosize=<size>\n"); 2402*5184Sek110237 (void) fprintf(stderr, " [,dsync]\n"); 2403*5184Sek110237 (void) fprintf(stderr, " [,iters=<count>]\n"); 2404*5184Sek110237 (void) fprintf(stderr, "\n"); 2405*5184Sek110237 (void) fprintf(stderr, "flowop aiowait name=<name>,target=" 2406*5184Sek110237 "<aiowrite-flowop>\n"); 2407*5184Sek110237 (void) fprintf(stderr, "\n"); 2408*5184Sek110237 (void) fprintf(stderr, "flowop sempost name=<name>," 2409*5184Sek110237 "target=<semblock-flowop>,\n"); 2410*5184Sek110237 (void) fprintf(stderr, 2411*5184Sek110237 " value=<increment-to-post>\n"); 2412*5184Sek110237 (void) fprintf(stderr, "\n"); 2413*5184Sek110237 (void) fprintf(stderr, "flowop semblock name=<name>,value=" 2414*5184Sek110237 "<decrement-to-receive>,\n"); 2415*5184Sek110237 (void) fprintf(stderr, " highwater=" 2416*5184Sek110237 "<inbound-queue-max>\n"); 2417*5184Sek110237 (void) fprintf(stderr, "\n"); 2418*5184Sek110237 (void) fprintf(stderr, "flowop block name=<name>\n"); 2419*5184Sek110237 (void) fprintf(stderr, "\n"); 2420*5184Sek110237 (void) fprintf(stderr, 2421*5184Sek110237 "flowop wakeup name=<name>,target=<block-flowop>,\n"); 2422*5184Sek110237 (void) fprintf(stderr, "\n"); 2423*5184Sek110237 (void) fprintf(stderr, 2424*5184Sek110237 "flowop hog name=<name>,value=<number-of-mem-ops>\n"); 2425*5184Sek110237 (void) fprintf(stderr, 2426*5184Sek110237 "flowop delay name=<name>,value=<number-of-seconds>\n"); 2427*5184Sek110237 (void) fprintf(stderr, "\n"); 2428*5184Sek110237 (void) fprintf(stderr, "flowop eventlimit name=<name>\n"); 2429*5184Sek110237 (void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n"); 2430*5184Sek110237 (void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n"); 2431*5184Sek110237 (void) fprintf(stderr, 2432*5184Sek110237 "flowop finishoncount name=<name>,value=<ops/s>\n"); 2433*5184Sek110237 (void) fprintf(stderr, 2434*5184Sek110237 "flowop finishonbytes name=<name>,value=<bytes>\n"); 2435*5184Sek110237 (void) fprintf(stderr, "\n"); 2436*5184Sek110237 (void) fprintf(stderr, "\n"); 2437*5184Sek110237 } 2438