1 /* 2 * BSD LICENSE 3 * 4 * Copyright (C) Cavium, Inc 2017. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in 14 * the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Cavium, Inc nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <stdio.h> 34 #include <unistd.h> 35 36 #include "test_order_common.h" 37 38 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */ 39 40 static inline __attribute__((always_inline)) void 41 order_queue_process_stage_0(struct rte_event *const ev) 42 { 43 ev->queue_id = 1; /* q1 atomic queue */ 44 ev->op = RTE_EVENT_OP_FORWARD; 45 ev->sched_type = RTE_SCHED_TYPE_ATOMIC; 46 ev->event_type = RTE_EVENT_TYPE_CPU; 47 } 48 49 static int 50 order_queue_worker(void *arg) 51 { 52 ORDER_WORKER_INIT; 53 struct rte_event ev; 54 55 while (t->err == false) { 56 uint16_t event = rte_event_dequeue_burst(dev_id, port, 57 &ev, 1, 0); 58 if (!event) { 59 if (rte_atomic64_read(outstand_pkts) <= 0) 60 break; 61 rte_pause(); 62 continue; 63 } 64 65 if (ev.queue_id == 0) { /* from ordered queue */ 66 order_queue_process_stage_0(&ev); 67 while (rte_event_enqueue_burst(dev_id, port, &ev, 1) 68 != 1) 69 rte_pause(); 70 } else if (ev.queue_id == 1) { /* from atomic queue */ 71 order_process_stage_1(t, &ev, nb_flows, 72 expected_flow_seq, outstand_pkts); 73 } else { 74 order_process_stage_invalid(t, &ev); 75 } 76 } 77 return 0; 78 } 79 80 static int 81 order_queue_worker_burst(void *arg) 82 { 83 ORDER_WORKER_INIT; 84 struct rte_event ev[BURST_SIZE]; 85 uint16_t i; 86 87 while (t->err == false) { 88 uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev, 89 BURST_SIZE, 0); 90 91 if (nb_rx == 0) { 92 if (rte_atomic64_read(outstand_pkts) <= 0) 93 break; 94 rte_pause(); 95 continue; 96 } 97 98 for (i = 0; i < nb_rx; i++) { 99 if (ev[i].queue_id == 0) { /* from ordered queue */ 100 order_queue_process_stage_0(&ev[i]); 101 } else if (ev[i].queue_id == 1) {/* from atomic queue */ 102 order_process_stage_1(t, &ev[i], nb_flows, 103 expected_flow_seq, outstand_pkts); 104 ev[i].op = RTE_EVENT_OP_RELEASE; 105 } else { 106 order_process_stage_invalid(t, &ev[i]); 107 } 108 } 109 110 uint16_t enq; 111 112 enq = rte_event_enqueue_burst(dev_id, port, ev, nb_rx); 113 while (enq < nb_rx) { 114 enq += rte_event_enqueue_burst(dev_id, port, 115 ev + enq, nb_rx - enq); 116 } 117 } 118 return 0; 119 } 120 121 static int 122 worker_wrapper(void *arg) 123 { 124 struct worker_data *w = arg; 125 const bool burst = evt_has_burst_mode(w->dev_id); 126 127 if (burst) 128 return order_queue_worker_burst(arg); 129 else 130 return order_queue_worker(arg); 131 } 132 133 static int 134 order_queue_launch_lcores(struct evt_test *test, struct evt_options *opt) 135 { 136 return order_launch_lcores(test, opt, worker_wrapper); 137 } 138 139 #define NB_QUEUES 2 140 static int 141 order_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt) 142 { 143 int ret; 144 145 const uint8_t nb_workers = evt_nr_active_lcores(opt->wlcores); 146 /* number of active worker cores + 1 producer */ 147 const uint8_t nb_ports = nb_workers + 1; 148 149 const struct rte_event_dev_config config = { 150 .nb_event_queues = NB_QUEUES,/* q0 ordered, q1 atomic */ 151 .nb_event_ports = nb_ports, 152 .nb_events_limit = 4096, 153 .nb_event_queue_flows = opt->nb_flows, 154 .nb_event_port_dequeue_depth = 128, 155 .nb_event_port_enqueue_depth = 128, 156 }; 157 158 ret = rte_event_dev_configure(opt->dev_id, &config); 159 if (ret) { 160 evt_err("failed to configure eventdev %d", opt->dev_id); 161 return ret; 162 } 163 164 /* q0 (ordered queue) configuration */ 165 struct rte_event_queue_conf q0_ordered_conf = { 166 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 167 .schedule_type = RTE_SCHED_TYPE_ORDERED, 168 .nb_atomic_flows = opt->nb_flows, 169 .nb_atomic_order_sequences = opt->nb_flows, 170 }; 171 ret = rte_event_queue_setup(opt->dev_id, 0, &q0_ordered_conf); 172 if (ret) { 173 evt_err("failed to setup queue0 eventdev %d", opt->dev_id); 174 return ret; 175 } 176 177 /* q1 (atomic queue) configuration */ 178 struct rte_event_queue_conf q1_atomic_conf = { 179 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 180 .schedule_type = RTE_SCHED_TYPE_ATOMIC, 181 .nb_atomic_flows = opt->nb_flows, 182 .nb_atomic_order_sequences = opt->nb_flows, 183 }; 184 ret = rte_event_queue_setup(opt->dev_id, 1, &q1_atomic_conf); 185 if (ret) { 186 evt_err("failed to setup queue1 eventdev %d", opt->dev_id); 187 return ret; 188 } 189 190 /* setup one port per worker, linking to all queues */ 191 ret = order_event_dev_port_setup(test, opt, nb_workers, NB_QUEUES); 192 if (ret) 193 return ret; 194 195 ret = evt_service_setup(opt->dev_id); 196 if (ret) { 197 evt_err("No service lcore found to run event dev."); 198 return ret; 199 } 200 201 ret = rte_event_dev_start(opt->dev_id); 202 if (ret) { 203 evt_err("failed to start eventdev %d", opt->dev_id); 204 return ret; 205 } 206 207 return 0; 208 } 209 210 static void 211 order_queue_opt_dump(struct evt_options *opt) 212 { 213 order_opt_dump(opt); 214 evt_dump("nb_evdev_queues", "%d", NB_QUEUES); 215 } 216 217 static bool 218 order_queue_capability_check(struct evt_options *opt) 219 { 220 struct rte_event_dev_info dev_info; 221 222 rte_event_dev_info_get(opt->dev_id, &dev_info); 223 if (dev_info.max_event_queues < NB_QUEUES || dev_info.max_event_ports < 224 order_nb_event_ports(opt)) { 225 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", 226 NB_QUEUES, dev_info.max_event_queues, 227 order_nb_event_ports(opt), dev_info.max_event_ports); 228 return false; 229 } 230 231 return true; 232 } 233 234 static const struct evt_test_ops order_queue = { 235 .cap_check = order_queue_capability_check, 236 .opt_check = order_opt_check, 237 .opt_dump = order_queue_opt_dump, 238 .test_setup = order_test_setup, 239 .mempool_setup = order_mempool_setup, 240 .eventdev_setup = order_queue_eventdev_setup, 241 .launch_lcores = order_queue_launch_lcores, 242 .eventdev_destroy = order_eventdev_destroy, 243 .mempool_destroy = order_mempool_destroy, 244 .test_result = order_test_result, 245 .test_destroy = order_test_destroy, 246 }; 247 248 EVT_TEST_REGISTER(order_queue); 249