xref: /dpdk/examples/ntb/ntb_fwd.c (revision 68a03efeed657e6e05f281479b33b51102797e15)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2019 Intel Corporation
3  */
4 #include <stdint.h>
5 #include <stdio.h>
6 #include <inttypes.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <string.h>
10 #include <getopt.h>
11 
12 #include <cmdline_parse_string.h>
13 #include <cmdline_socket.h>
14 #include <cmdline.h>
15 #include <rte_common.h>
16 #include <rte_rawdev.h>
17 #include <rte_ethdev.h>
18 #include <rte_malloc.h>
19 #include <rte_lcore.h>
20 #include <rte_cycles.h>
21 #include <rte_pmd_ntb.h>
22 #include <rte_mbuf_pool_ops.h>
23 
24 /* Per-port statistics struct */
25 struct ntb_port_statistics {
26 	uint64_t tx;
27 	uint64_t rx;
28 } __rte_cache_aligned;
29 /* Port 0: NTB dev, Port 1: ethdev when iofwd. */
30 struct ntb_port_statistics ntb_port_stats[2];
31 
32 struct ntb_fwd_stream {
33 	uint16_t tx_port;
34 	uint16_t rx_port;
35 	uint16_t qp_id;
36 	uint8_t tx_ntb;  /* If ntb device is tx port. */
37 };
38 
39 struct ntb_fwd_lcore_conf {
40 	uint16_t stream_id;
41 	uint16_t nb_stream;
42 	uint8_t stopped;
43 };
44 
45 enum ntb_fwd_mode {
46 	FILE_TRANS = 0,
47 	RXONLY,
48 	TXONLY,
49 	IOFWD,
50 	MAX_FWD_MODE,
51 };
52 static const char *const fwd_mode_s[] = {
53 	"file-trans",
54 	"rxonly",
55 	"txonly",
56 	"iofwd",
57 	NULL,
58 };
59 static enum ntb_fwd_mode fwd_mode = MAX_FWD_MODE;
60 
61 static struct ntb_fwd_lcore_conf fwd_lcore_conf[RTE_MAX_LCORE];
62 static struct ntb_fwd_stream *fwd_streams;
63 
64 static struct rte_mempool *mbuf_pool;
65 
66 #define NTB_DRV_NAME_LEN 7
67 #define MEMPOOL_CACHE_SIZE 256
68 
69 static uint8_t in_test;
70 static uint8_t interactive = 1;
71 static uint16_t eth_port_id = RTE_MAX_ETHPORTS;
72 static uint16_t dev_id;
73 
74 /* Number of queues, default set as 1 */
75 static uint16_t num_queues = 1;
76 static uint16_t ntb_buf_size = RTE_MBUF_DEFAULT_BUF_SIZE;
77 
78 /* Configurable number of descriptors */
79 #define NTB_DEFAULT_NUM_DESCS 1024
80 static uint16_t nb_desc = NTB_DEFAULT_NUM_DESCS;
81 
82 static uint16_t tx_free_thresh;
83 
84 #define NTB_MAX_PKT_BURST 32
85 #define NTB_DFLT_PKT_BURST 32
86 static uint16_t pkt_burst = NTB_DFLT_PKT_BURST;
87 
88 #define BURST_TX_RETRIES 64
89 
90 static struct rte_eth_conf eth_port_conf = {
91 	.rxmode = {
92 		.mq_mode = ETH_MQ_RX_RSS,
93 		.split_hdr_size = 0,
94 	},
95 	.rx_adv_conf = {
96 		.rss_conf = {
97 			.rss_key = NULL,
98 			.rss_hf = ETH_RSS_IP,
99 		},
100 	},
101 	.txmode = {
102 		.mq_mode = ETH_MQ_TX_NONE,
103 	},
104 };
105 
106 /* *** Help command with introduction. *** */
107 struct cmd_help_result {
108 	cmdline_fixed_string_t help;
109 };
110 
111 static void
112 cmd_help_parsed(__rte_unused void *parsed_result,
113 		struct cmdline *cl,
114 		__rte_unused void *data)
115 {
116 	cmdline_printf(
117 		cl,
118 		"\n"
119 		"The following commands are currently available:\n\n"
120 		"Control:\n"
121 		"    quit                                      :"
122 		" Quit the application.\n"
123 		"\nTransmission:\n"
124 		"    send [path]                               :"
125 		" Send [path] file. Only take effect in file-trans mode\n"
126 		"    start                                     :"
127 		" Start transmissions.\n"
128 		"    stop                                      :"
129 		" Stop transmissions.\n"
130 		"    clear/show port stats                     :"
131 		" Clear/show port stats.\n"
132 		"    set fwd file-trans/rxonly/txonly/iofwd    :"
133 		" Set packet forwarding mode.\n"
134 	);
135 
136 }
137 
138 cmdline_parse_token_string_t cmd_help_help =
139 	TOKEN_STRING_INITIALIZER(struct cmd_help_result, help, "help");
140 
141 cmdline_parse_inst_t cmd_help = {
142 	.f = cmd_help_parsed,
143 	.data = NULL,
144 	.help_str = "show help",
145 	.tokens = {
146 		(void *)&cmd_help_help,
147 		NULL,
148 	},
149 };
150 
151 /* *** QUIT *** */
152 struct cmd_quit_result {
153 	cmdline_fixed_string_t quit;
154 };
155 
156 static void
157 cmd_quit_parsed(__rte_unused void *parsed_result,
158 		struct cmdline *cl,
159 		__rte_unused void *data)
160 {
161 	struct ntb_fwd_lcore_conf *conf;
162 	uint32_t lcore_id;
163 
164 	/* Stop transmission first. */
165 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
166 		conf = &fwd_lcore_conf[lcore_id];
167 
168 		if (!conf->nb_stream)
169 			continue;
170 
171 		if (conf->stopped)
172 			continue;
173 
174 		conf->stopped = 1;
175 	}
176 	printf("\nWaiting for lcores to finish...\n");
177 	rte_eal_mp_wait_lcore();
178 	in_test = 0;
179 
180 	/* Stop traffic and Close port. */
181 	rte_rawdev_stop(dev_id);
182 	rte_rawdev_close(dev_id);
183 	if (eth_port_id < RTE_MAX_ETHPORTS && fwd_mode == IOFWD) {
184 		rte_eth_dev_stop(eth_port_id);
185 		rte_eth_dev_close(eth_port_id);
186 	}
187 
188 	cmdline_quit(cl);
189 }
190 
191 cmdline_parse_token_string_t cmd_quit_quit =
192 		TOKEN_STRING_INITIALIZER(struct cmd_quit_result, quit, "quit");
193 
194 cmdline_parse_inst_t cmd_quit = {
195 	.f = cmd_quit_parsed,
196 	.data = NULL,
197 	.help_str = "exit application",
198 	.tokens = {
199 		(void *)&cmd_quit_quit,
200 		NULL,
201 	},
202 };
203 
204 /* *** SEND FILE PARAMETERS *** */
205 struct cmd_sendfile_result {
206 	cmdline_fixed_string_t send_string;
207 	char filepath[];
208 };
209 
210 static void
211 cmd_sendfile_parsed(void *parsed_result,
212 		    __rte_unused struct cmdline *cl,
213 		    __rte_unused void *data)
214 {
215 	struct cmd_sendfile_result *res = parsed_result;
216 	struct rte_rawdev_buf *pkts_send[NTB_MAX_PKT_BURST];
217 	struct rte_mbuf *mbuf_send[NTB_MAX_PKT_BURST];
218 	uint64_t size, count, i, j, nb_burst;
219 	uint16_t nb_tx, buf_size;
220 	unsigned int nb_pkt;
221 	size_t queue_id = 0;
222 	uint16_t retry = 0;
223 	uint32_t val;
224 	FILE *file;
225 	int ret;
226 
227 	if (num_queues != 1) {
228 		printf("File transmission only supports 1 queue.\n");
229 		num_queues = 1;
230 	}
231 
232 	file = fopen(res->filepath, "r");
233 	if (file == NULL) {
234 		printf("Fail to open the file.\n");
235 		return;
236 	}
237 
238 	if (fseek(file, 0, SEEK_END) < 0) {
239 		printf("Fail to get file size.\n");
240 		fclose(file);
241 		return;
242 	}
243 	size = ftell(file);
244 	if (fseek(file, 0, SEEK_SET) < 0) {
245 		printf("Fail to get file size.\n");
246 		fclose(file);
247 		return;
248 	}
249 
250 	/* Tell remote about the file size. */
251 	val = size >> 32;
252 	rte_rawdev_set_attr(dev_id, "spad_user_0", val);
253 	val = size;
254 	rte_rawdev_set_attr(dev_id, "spad_user_1", val);
255 	printf("Sending file, size is %"PRIu64"\n", size);
256 
257 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
258 		pkts_send[i] = (struct rte_rawdev_buf *)
259 				malloc(sizeof(struct rte_rawdev_buf));
260 
261 	buf_size = ntb_buf_size - RTE_PKTMBUF_HEADROOM;
262 	count = (size + buf_size - 1) / buf_size;
263 	nb_burst = (count + pkt_burst - 1) / pkt_burst;
264 
265 	for (i = 0; i < nb_burst; i++) {
266 		val = RTE_MIN(count, pkt_burst);
267 		if (rte_mempool_get_bulk(mbuf_pool, (void **)mbuf_send,
268 					val) == 0) {
269 			for (nb_pkt = 0; nb_pkt < val; nb_pkt++) {
270 				mbuf_send[nb_pkt]->port = dev_id;
271 				mbuf_send[nb_pkt]->data_len =
272 				fread(rte_pktmbuf_mtod(mbuf_send[nb_pkt],
273 					void *), 1, buf_size, file);
274 				mbuf_send[nb_pkt]->pkt_len =
275 					mbuf_send[nb_pkt]->data_len;
276 				pkts_send[nb_pkt]->buf_addr = mbuf_send[nb_pkt];
277 			}
278 		} else {
279 			for (nb_pkt = 0; nb_pkt < val; nb_pkt++) {
280 				mbuf_send[nb_pkt] =
281 					rte_mbuf_raw_alloc(mbuf_pool);
282 				if (mbuf_send[nb_pkt] == NULL)
283 					break;
284 				mbuf_send[nb_pkt]->port = dev_id;
285 				mbuf_send[nb_pkt]->data_len =
286 				fread(rte_pktmbuf_mtod(mbuf_send[nb_pkt],
287 					void *), 1, buf_size, file);
288 				mbuf_send[nb_pkt]->pkt_len =
289 					mbuf_send[nb_pkt]->data_len;
290 				pkts_send[nb_pkt]->buf_addr = mbuf_send[nb_pkt];
291 			}
292 		}
293 
294 		ret = rte_rawdev_enqueue_buffers(dev_id, pkts_send, nb_pkt,
295 						(void *)queue_id);
296 		if (ret < 0) {
297 			printf("Enqueue failed with err %d\n", ret);
298 			for (j = 0; j < nb_pkt; j++)
299 				rte_pktmbuf_free(mbuf_send[j]);
300 			goto clean;
301 		}
302 		nb_tx = ret;
303 		while (nb_tx != nb_pkt && retry < BURST_TX_RETRIES) {
304 			rte_delay_us(1);
305 			ret = rte_rawdev_enqueue_buffers(dev_id,
306 					&pkts_send[nb_tx], nb_pkt - nb_tx,
307 					(void *)queue_id);
308 			if (ret < 0) {
309 				printf("Enqueue failed with err %d\n", ret);
310 				for (j = nb_tx; j < nb_pkt; j++)
311 					rte_pktmbuf_free(mbuf_send[j]);
312 				goto clean;
313 			}
314 			nb_tx += ret;
315 		}
316 		count -= nb_pkt;
317 	}
318 
319 	/* Clear register after file sending done. */
320 	rte_rawdev_set_attr(dev_id, "spad_user_0", 0);
321 	rte_rawdev_set_attr(dev_id, "spad_user_1", 0);
322 	printf("Done sending file.\n");
323 
324 clean:
325 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
326 		free(pkts_send[i]);
327 	fclose(file);
328 }
329 
330 cmdline_parse_token_string_t cmd_send_file_send =
331 	TOKEN_STRING_INITIALIZER(struct cmd_sendfile_result, send_string,
332 				 "send");
333 cmdline_parse_token_string_t cmd_send_file_filepath =
334 	TOKEN_STRING_INITIALIZER(struct cmd_sendfile_result, filepath, NULL);
335 
336 
337 cmdline_parse_inst_t cmd_send_file = {
338 	.f = cmd_sendfile_parsed,
339 	.data = NULL,
340 	.help_str = "send <file_path>",
341 	.tokens = {
342 		(void *)&cmd_send_file_send,
343 		(void *)&cmd_send_file_filepath,
344 		NULL,
345 	},
346 };
347 
348 #define RECV_FILE_LEN 30
349 static int
350 start_polling_recv_file(void *param)
351 {
352 	struct rte_rawdev_buf *pkts_recv[NTB_MAX_PKT_BURST];
353 	struct ntb_fwd_lcore_conf *conf = param;
354 	struct rte_mbuf *mbuf;
355 	char filepath[RECV_FILE_LEN];
356 	uint64_t val, size, file_len;
357 	uint16_t nb_rx, i, file_no;
358 	size_t queue_id = 0;
359 	FILE *file;
360 	int ret;
361 
362 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
363 		pkts_recv[i] = (struct rte_rawdev_buf *)
364 				malloc(sizeof(struct rte_rawdev_buf));
365 
366 	file_no = 0;
367 	while (!conf->stopped) {
368 		snprintf(filepath, RECV_FILE_LEN, "ntb_recv_file%d", file_no);
369 		file = fopen(filepath, "w");
370 		if (file == NULL) {
371 			printf("Fail to open the file.\n");
372 			return -EINVAL;
373 		}
374 
375 		rte_rawdev_get_attr(dev_id, "spad_user_0", &val);
376 		size = val << 32;
377 		rte_rawdev_get_attr(dev_id, "spad_user_1", &val);
378 		size |= val;
379 
380 		if (!size) {
381 			fclose(file);
382 			continue;
383 		}
384 
385 		file_len = 0;
386 		nb_rx = NTB_MAX_PKT_BURST;
387 		while (file_len < size && !conf->stopped) {
388 			ret = rte_rawdev_dequeue_buffers(dev_id, pkts_recv,
389 					pkt_burst, (void *)queue_id);
390 			if (ret < 0) {
391 				printf("Dequeue failed with err %d\n", ret);
392 				fclose(file);
393 				goto clean;
394 			}
395 			nb_rx = ret;
396 			ntb_port_stats[0].rx += nb_rx;
397 			for (i = 0; i < nb_rx; i++) {
398 				mbuf = pkts_recv[i]->buf_addr;
399 				fwrite(rte_pktmbuf_mtod(mbuf, void *), 1,
400 					mbuf->data_len, file);
401 				file_len += mbuf->data_len;
402 				rte_pktmbuf_free(mbuf);
403 				pkts_recv[i]->buf_addr = NULL;
404 			}
405 		}
406 
407 		printf("Received file (size: %" PRIu64 ") from peer to %s.\n",
408 			size, filepath);
409 		fclose(file);
410 		file_no++;
411 	}
412 
413 clean:
414 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
415 		free(pkts_recv[i]);
416 	return 0;
417 }
418 
419 static int
420 start_iofwd_per_lcore(void *param)
421 {
422 	struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
423 	struct rte_mbuf *pkts_burst[NTB_MAX_PKT_BURST];
424 	struct ntb_fwd_lcore_conf *conf = param;
425 	struct ntb_fwd_stream fs;
426 	uint16_t nb_rx, nb_tx;
427 	int i, j, ret;
428 
429 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
430 		ntb_buf[i] = (struct rte_rawdev_buf *)
431 			     malloc(sizeof(struct rte_rawdev_buf));
432 
433 	while (!conf->stopped) {
434 		for (i = 0; i < conf->nb_stream; i++) {
435 			fs = fwd_streams[conf->stream_id + i];
436 			if (fs.tx_ntb) {
437 				nb_rx = rte_eth_rx_burst(fs.rx_port,
438 						fs.qp_id, pkts_burst,
439 						pkt_burst);
440 				if (unlikely(nb_rx == 0))
441 					continue;
442 				for (j = 0; j < nb_rx; j++)
443 					ntb_buf[j]->buf_addr = pkts_burst[j];
444 				ret = rte_rawdev_enqueue_buffers(fs.tx_port,
445 						ntb_buf, nb_rx,
446 						(void *)(size_t)fs.qp_id);
447 				if (ret < 0) {
448 					printf("Enqueue failed with err %d\n",
449 						ret);
450 					for (j = 0; j < nb_rx; j++)
451 						rte_pktmbuf_free(pkts_burst[j]);
452 					goto clean;
453 				}
454 				nb_tx = ret;
455 				ntb_port_stats[0].tx += nb_tx;
456 				ntb_port_stats[1].rx += nb_rx;
457 			} else {
458 				ret = rte_rawdev_dequeue_buffers(fs.rx_port,
459 						ntb_buf, pkt_burst,
460 						(void *)(size_t)fs.qp_id);
461 				if (ret < 0) {
462 					printf("Dequeue failed with err %d\n",
463 						ret);
464 					goto clean;
465 				}
466 				nb_rx = ret;
467 				if (unlikely(nb_rx == 0))
468 					continue;
469 				for (j = 0; j < nb_rx; j++)
470 					pkts_burst[j] = ntb_buf[j]->buf_addr;
471 				nb_tx = rte_eth_tx_burst(fs.tx_port,
472 					fs.qp_id, pkts_burst, nb_rx);
473 				ntb_port_stats[1].tx += nb_tx;
474 				ntb_port_stats[0].rx += nb_rx;
475 			}
476 			if (unlikely(nb_tx < nb_rx)) {
477 				do {
478 					rte_pktmbuf_free(pkts_burst[nb_tx]);
479 				} while (++nb_tx < nb_rx);
480 			}
481 		}
482 	}
483 
484 clean:
485 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
486 		free(ntb_buf[i]);
487 
488 	return 0;
489 }
490 
491 static int
492 start_rxonly_per_lcore(void *param)
493 {
494 	struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
495 	struct ntb_fwd_lcore_conf *conf = param;
496 	struct ntb_fwd_stream fs;
497 	uint16_t nb_rx;
498 	int i, j, ret;
499 
500 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
501 		ntb_buf[i] = (struct rte_rawdev_buf *)
502 			     malloc(sizeof(struct rte_rawdev_buf));
503 
504 	while (!conf->stopped) {
505 		for (i = 0; i < conf->nb_stream; i++) {
506 			fs = fwd_streams[conf->stream_id + i];
507 			ret = rte_rawdev_dequeue_buffers(fs.rx_port,
508 			      ntb_buf, pkt_burst, (void *)(size_t)fs.qp_id);
509 			if (ret < 0) {
510 				printf("Dequeue failed with err %d\n", ret);
511 				goto clean;
512 			}
513 			nb_rx = ret;
514 			if (unlikely(nb_rx == 0))
515 				continue;
516 			ntb_port_stats[0].rx += nb_rx;
517 
518 			for (j = 0; j < nb_rx; j++)
519 				rte_pktmbuf_free(ntb_buf[j]->buf_addr);
520 		}
521 	}
522 
523 clean:
524 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
525 		free(ntb_buf[i]);
526 
527 	return 0;
528 }
529 
530 
531 static int
532 start_txonly_per_lcore(void *param)
533 {
534 	struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
535 	struct rte_mbuf *pkts_burst[NTB_MAX_PKT_BURST];
536 	struct ntb_fwd_lcore_conf *conf = param;
537 	struct ntb_fwd_stream fs;
538 	uint16_t nb_pkt, nb_tx;
539 	int i, j, ret;
540 
541 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
542 		ntb_buf[i] = (struct rte_rawdev_buf *)
543 			     malloc(sizeof(struct rte_rawdev_buf));
544 
545 	while (!conf->stopped) {
546 		for (i = 0; i < conf->nb_stream; i++) {
547 			fs = fwd_streams[conf->stream_id + i];
548 			if (rte_mempool_get_bulk(mbuf_pool, (void **)pkts_burst,
549 				  pkt_burst) == 0) {
550 				for (nb_pkt = 0; nb_pkt < pkt_burst; nb_pkt++) {
551 					pkts_burst[nb_pkt]->port = dev_id;
552 					pkts_burst[nb_pkt]->data_len =
553 						pkts_burst[nb_pkt]->buf_len -
554 						RTE_PKTMBUF_HEADROOM;
555 					pkts_burst[nb_pkt]->pkt_len =
556 						pkts_burst[nb_pkt]->data_len;
557 					ntb_buf[nb_pkt]->buf_addr =
558 						pkts_burst[nb_pkt];
559 				}
560 			} else {
561 				for (nb_pkt = 0; nb_pkt < pkt_burst; nb_pkt++) {
562 					pkts_burst[nb_pkt] =
563 						rte_pktmbuf_alloc(mbuf_pool);
564 					if (pkts_burst[nb_pkt] == NULL)
565 						break;
566 					pkts_burst[nb_pkt]->port = dev_id;
567 					pkts_burst[nb_pkt]->data_len =
568 						pkts_burst[nb_pkt]->buf_len -
569 						RTE_PKTMBUF_HEADROOM;
570 					pkts_burst[nb_pkt]->pkt_len =
571 						pkts_burst[nb_pkt]->data_len;
572 					ntb_buf[nb_pkt]->buf_addr =
573 						pkts_burst[nb_pkt];
574 				}
575 			}
576 			ret = rte_rawdev_enqueue_buffers(fs.tx_port, ntb_buf,
577 					nb_pkt, (void *)(size_t)fs.qp_id);
578 			if (ret < 0) {
579 				printf("Enqueue failed with err %d\n", ret);
580 				for (j = 0; j < nb_pkt; j++)
581 					rte_pktmbuf_free(pkts_burst[j]);
582 				goto clean;
583 			}
584 			nb_tx = ret;
585 			ntb_port_stats[0].tx += nb_tx;
586 			if (unlikely(nb_tx < nb_pkt)) {
587 				do {
588 					rte_pktmbuf_free(pkts_burst[nb_tx]);
589 				} while (++nb_tx < nb_pkt);
590 			}
591 		}
592 	}
593 
594 clean:
595 	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
596 		free(ntb_buf[i]);
597 
598 	return 0;
599 }
600 
601 static int
602 ntb_fwd_config_setup(void)
603 {
604 	uint16_t i;
605 
606 	/* Make sure iofwd has valid ethdev. */
607 	if (fwd_mode == IOFWD && eth_port_id >= RTE_MAX_ETHPORTS) {
608 		printf("No ethdev, cannot be in iofwd mode.");
609 		return -EINVAL;
610 	}
611 
612 	if (fwd_mode == IOFWD) {
613 		fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
614 			sizeof(struct ntb_fwd_stream) * num_queues * 2,
615 			RTE_CACHE_LINE_SIZE);
616 		for (i = 0; i < num_queues; i++) {
617 			fwd_streams[i * 2].qp_id = i;
618 			fwd_streams[i * 2].tx_port = dev_id;
619 			fwd_streams[i * 2].rx_port = eth_port_id;
620 			fwd_streams[i * 2].tx_ntb = 1;
621 
622 			fwd_streams[i * 2 + 1].qp_id = i;
623 			fwd_streams[i * 2 + 1].tx_port = eth_port_id;
624 			fwd_streams[i * 2 + 1].rx_port = dev_id;
625 			fwd_streams[i * 2 + 1].tx_ntb = 0;
626 		}
627 		return 0;
628 	}
629 
630 	if (fwd_mode == RXONLY || fwd_mode == FILE_TRANS) {
631 		/* Only support 1 queue in file-trans for in order. */
632 		if (fwd_mode == FILE_TRANS)
633 			num_queues = 1;
634 
635 		fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
636 				sizeof(struct ntb_fwd_stream) * num_queues,
637 				RTE_CACHE_LINE_SIZE);
638 		for (i = 0; i < num_queues; i++) {
639 			fwd_streams[i].qp_id = i;
640 			fwd_streams[i].tx_port = RTE_MAX_ETHPORTS;
641 			fwd_streams[i].rx_port = dev_id;
642 			fwd_streams[i].tx_ntb = 0;
643 		}
644 		return 0;
645 	}
646 
647 	if (fwd_mode == TXONLY) {
648 		fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
649 				sizeof(struct ntb_fwd_stream) * num_queues,
650 				RTE_CACHE_LINE_SIZE);
651 		for (i = 0; i < num_queues; i++) {
652 			fwd_streams[i].qp_id = i;
653 			fwd_streams[i].tx_port = dev_id;
654 			fwd_streams[i].rx_port = RTE_MAX_ETHPORTS;
655 			fwd_streams[i].tx_ntb = 1;
656 		}
657 	}
658 	return 0;
659 }
660 
661 static void
662 assign_stream_to_lcores(void)
663 {
664 	struct ntb_fwd_lcore_conf *conf;
665 	struct ntb_fwd_stream *fs;
666 	uint16_t nb_streams, sm_per_lcore, sm_id, i;
667 	uint32_t lcore_id;
668 	uint8_t lcore_num, nb_extra;
669 
670 	lcore_num = rte_lcore_count();
671 	/* Exclude main core */
672 	lcore_num--;
673 
674 	nb_streams = (fwd_mode == IOFWD) ? num_queues * 2 : num_queues;
675 
676 	sm_per_lcore = nb_streams / lcore_num;
677 	nb_extra = nb_streams % lcore_num;
678 	sm_id = 0;
679 	i = 0;
680 
681 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
682 		conf = &fwd_lcore_conf[lcore_id];
683 
684 		if (i < nb_extra) {
685 			conf->nb_stream = sm_per_lcore + 1;
686 			conf->stream_id = sm_id;
687 			sm_id = sm_id + sm_per_lcore + 1;
688 		} else {
689 			conf->nb_stream = sm_per_lcore;
690 			conf->stream_id = sm_id;
691 			sm_id = sm_id + sm_per_lcore;
692 		}
693 
694 		i++;
695 		if (sm_id >= nb_streams)
696 			break;
697 	}
698 
699 	/* Print packet forwading config. */
700 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
701 		conf = &fwd_lcore_conf[lcore_id];
702 
703 		if (!conf->nb_stream)
704 			continue;
705 
706 		printf("Streams on Lcore %u :\n", lcore_id);
707 		for (i = 0; i < conf->nb_stream; i++) {
708 			fs = &fwd_streams[conf->stream_id + i];
709 			if (fwd_mode == IOFWD)
710 				printf(" + Stream %u : %s%u RX -> %s%u TX,"
711 					" Q=%u\n", conf->stream_id + i,
712 					fs->tx_ntb ? "Eth" : "NTB", fs->rx_port,
713 					fs->tx_ntb ? "NTB" : "Eth", fs->tx_port,
714 					fs->qp_id);
715 			if (fwd_mode == FILE_TRANS || fwd_mode == RXONLY)
716 				printf(" + Stream %u : %s%u RX only\n",
717 					conf->stream_id, "NTB", fs->rx_port);
718 			if (fwd_mode == TXONLY)
719 				printf(" + Stream %u : %s%u TX only\n",
720 					conf->stream_id, "NTB", fs->tx_port);
721 		}
722 	}
723 }
724 
725 static void
726 start_pkt_fwd(void)
727 {
728 	struct ntb_fwd_lcore_conf *conf;
729 	struct rte_eth_link eth_link;
730 	uint32_t lcore_id;
731 	int ret, i;
732 	char link_status_text[RTE_ETH_LINK_MAX_STR_LEN];
733 
734 	ret = ntb_fwd_config_setup();
735 	if (ret < 0) {
736 		printf("Cannot start traffic. Please reset fwd mode.\n");
737 		return;
738 	}
739 
740 	/* If using iofwd, checking ethdev link status first. */
741 	if (fwd_mode == IOFWD) {
742 		printf("Checking eth link status...\n");
743 		/* Wait for eth link up at most 100 times. */
744 		for (i = 0; i < 100; i++) {
745 			ret = rte_eth_link_get(eth_port_id, &eth_link);
746 			if (ret < 0) {
747 				printf("Link get failed with err %d\n", ret);
748 				return;
749 			}
750 			if (eth_link.link_status) {
751 				rte_eth_link_to_str(link_status_text,
752 					sizeof(link_status_text),
753 					&eth_link);
754 				printf("Eth%u %s\n", eth_port_id,
755 				       link_status_text);
756 				break;
757 			}
758 		}
759 		if (!eth_link.link_status) {
760 			printf("Eth%u link down. Cannot start traffic.\n",
761 				eth_port_id);
762 			return;
763 		}
764 	}
765 
766 	assign_stream_to_lcores();
767 	in_test = 1;
768 
769 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
770 		conf = &fwd_lcore_conf[lcore_id];
771 
772 		if (!conf->nb_stream)
773 			continue;
774 
775 		conf->stopped = 0;
776 		if (fwd_mode == FILE_TRANS)
777 			rte_eal_remote_launch(start_polling_recv_file,
778 					      conf, lcore_id);
779 		else if (fwd_mode == IOFWD)
780 			rte_eal_remote_launch(start_iofwd_per_lcore,
781 					      conf, lcore_id);
782 		else if (fwd_mode == RXONLY)
783 			rte_eal_remote_launch(start_rxonly_per_lcore,
784 					      conf, lcore_id);
785 		else if (fwd_mode == TXONLY)
786 			rte_eal_remote_launch(start_txonly_per_lcore,
787 					      conf, lcore_id);
788 	}
789 }
790 
791 /* *** START FWD PARAMETERS *** */
792 struct cmd_start_result {
793 	cmdline_fixed_string_t start;
794 };
795 
796 static void
797 cmd_start_parsed(__rte_unused void *parsed_result,
798 			    __rte_unused struct cmdline *cl,
799 			    __rte_unused void *data)
800 {
801 	start_pkt_fwd();
802 }
803 
804 cmdline_parse_token_string_t cmd_start_start =
805 		TOKEN_STRING_INITIALIZER(struct cmd_start_result, start, "start");
806 
807 cmdline_parse_inst_t cmd_start = {
808 	.f = cmd_start_parsed,
809 	.data = NULL,
810 	.help_str = "start pkt fwd between ntb and ethdev",
811 	.tokens = {
812 		(void *)&cmd_start_start,
813 		NULL,
814 	},
815 };
816 
817 /* *** STOP *** */
818 struct cmd_stop_result {
819 	cmdline_fixed_string_t stop;
820 };
821 
822 static void
823 cmd_stop_parsed(__rte_unused void *parsed_result,
824 		__rte_unused struct cmdline *cl,
825 		__rte_unused void *data)
826 {
827 	struct ntb_fwd_lcore_conf *conf;
828 	uint32_t lcore_id;
829 
830 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
831 		conf = &fwd_lcore_conf[lcore_id];
832 
833 		if (!conf->nb_stream)
834 			continue;
835 
836 		if (conf->stopped)
837 			continue;
838 
839 		conf->stopped = 1;
840 	}
841 	printf("\nWaiting for lcores to finish...\n");
842 	rte_eal_mp_wait_lcore();
843 	in_test = 0;
844 	printf("\nDone.\n");
845 }
846 
847 cmdline_parse_token_string_t cmd_stop_stop =
848 		TOKEN_STRING_INITIALIZER(struct cmd_stop_result, stop, "stop");
849 
850 cmdline_parse_inst_t cmd_stop = {
851 	.f = cmd_stop_parsed,
852 	.data = NULL,
853 	.help_str = "stop: Stop packet forwarding",
854 	.tokens = {
855 		(void *)&cmd_stop_stop,
856 		NULL,
857 	},
858 };
859 
860 static void
861 ntb_stats_clear(void)
862 {
863 	int nb_ids, i;
864 	uint32_t *ids;
865 
866 	/* Clear NTB dev stats */
867 	nb_ids = rte_rawdev_xstats_names_get(dev_id, NULL, 0);
868 	if (nb_ids  < 0) {
869 		printf("Error: Cannot get count of xstats\n");
870 		return;
871 	}
872 	ids = malloc(sizeof(uint32_t) * nb_ids);
873 	for (i = 0; i < nb_ids; i++)
874 		ids[i] = i;
875 	rte_rawdev_xstats_reset(dev_id, ids, nb_ids);
876 	printf("\n  statistics for NTB port %d cleared\n", dev_id);
877 
878 	/* Clear Ethdev stats if have any */
879 	if (fwd_mode == IOFWD && eth_port_id != RTE_MAX_ETHPORTS) {
880 		rte_eth_stats_reset(eth_port_id);
881 		printf("\n  statistics for ETH port %d cleared\n", eth_port_id);
882 	}
883 }
884 
885 static inline void
886 ntb_calculate_throughput(uint16_t port) {
887 	uint64_t diff_pkts_rx, diff_pkts_tx, diff_cycles;
888 	uint64_t mpps_rx, mpps_tx;
889 	static uint64_t prev_pkts_rx[2];
890 	static uint64_t prev_pkts_tx[2];
891 	static uint64_t prev_cycles[2];
892 
893 	diff_cycles = prev_cycles[port];
894 	prev_cycles[port] = rte_rdtsc();
895 	if (diff_cycles > 0)
896 		diff_cycles = prev_cycles[port] - diff_cycles;
897 	diff_pkts_rx = (ntb_port_stats[port].rx > prev_pkts_rx[port]) ?
898 		(ntb_port_stats[port].rx - prev_pkts_rx[port]) : 0;
899 	diff_pkts_tx = (ntb_port_stats[port].tx > prev_pkts_tx[port]) ?
900 		(ntb_port_stats[port].tx - prev_pkts_tx[port]) : 0;
901 	prev_pkts_rx[port] = ntb_port_stats[port].rx;
902 	prev_pkts_tx[port] = ntb_port_stats[port].tx;
903 	mpps_rx = diff_cycles > 0 ?
904 		diff_pkts_rx * rte_get_tsc_hz() / diff_cycles : 0;
905 	mpps_tx = diff_cycles > 0 ?
906 		diff_pkts_tx * rte_get_tsc_hz() / diff_cycles : 0;
907 	printf("  Throughput (since last show)\n");
908 	printf("  Rx-pps: %12"PRIu64"\n  Tx-pps: %12"PRIu64"\n",
909 			mpps_rx, mpps_tx);
910 
911 }
912 
913 static void
914 ntb_stats_display(void)
915 {
916 	struct rte_rawdev_xstats_name *xstats_names;
917 	struct rte_eth_stats stats;
918 	uint64_t *values;
919 	uint32_t *ids;
920 	int nb_ids, i;
921 
922 	printf("###### statistics for NTB port %d #######\n", dev_id);
923 
924 	/* Get NTB dev stats and stats names */
925 	nb_ids = rte_rawdev_xstats_names_get(dev_id, NULL, 0);
926 	if (nb_ids  < 0) {
927 		printf("Error: Cannot get count of xstats\n");
928 		return;
929 	}
930 	xstats_names = malloc(sizeof(struct rte_rawdev_xstats_name) * nb_ids);
931 	if (xstats_names == NULL) {
932 		printf("Cannot allocate memory for xstats lookup\n");
933 		return;
934 	}
935 	if (nb_ids != rte_rawdev_xstats_names_get(
936 			dev_id, xstats_names, nb_ids)) {
937 		printf("Error: Cannot get xstats lookup\n");
938 		free(xstats_names);
939 		return;
940 	}
941 	ids = malloc(sizeof(uint32_t) * nb_ids);
942 	for (i = 0; i < nb_ids; i++)
943 		ids[i] = i;
944 	values = malloc(sizeof(uint64_t) * nb_ids);
945 	if (nb_ids != rte_rawdev_xstats_get(dev_id, ids, values, nb_ids)) {
946 		printf("Error: Unable to get xstats\n");
947 		free(xstats_names);
948 		free(values);
949 		free(ids);
950 		return;
951 	}
952 
953 	/* Display NTB dev stats */
954 	for (i = 0; i < nb_ids; i++)
955 		printf("  %s: %"PRIu64"\n", xstats_names[i].name, values[i]);
956 	ntb_calculate_throughput(0);
957 
958 	/* Get Ethdev stats if have any */
959 	if (fwd_mode == IOFWD && eth_port_id != RTE_MAX_ETHPORTS) {
960 		printf("###### statistics for ETH port %d ######\n",
961 			eth_port_id);
962 		rte_eth_stats_get(eth_port_id, &stats);
963 		printf("  RX-packets: %"PRIu64"\n", stats.ipackets);
964 		printf("  RX-bytes: %"PRIu64"\n", stats.ibytes);
965 		printf("  RX-errors: %"PRIu64"\n", stats.ierrors);
966 		printf("  RX-missed: %"PRIu64"\n", stats.imissed);
967 		printf("  TX-packets: %"PRIu64"\n", stats.opackets);
968 		printf("  TX-bytes: %"PRIu64"\n", stats.obytes);
969 		printf("  TX-errors: %"PRIu64"\n", stats.oerrors);
970 		ntb_calculate_throughput(1);
971 	}
972 
973 	free(xstats_names);
974 	free(values);
975 	free(ids);
976 }
977 
978 /* *** SHOW/CLEAR PORT STATS *** */
979 struct cmd_stats_result {
980 	cmdline_fixed_string_t show;
981 	cmdline_fixed_string_t port;
982 	cmdline_fixed_string_t stats;
983 };
984 
985 static void
986 cmd_stats_parsed(void *parsed_result,
987 		 __rte_unused struct cmdline *cl,
988 		 __rte_unused void *data)
989 {
990 	struct cmd_stats_result *res = parsed_result;
991 	if (!strcmp(res->show, "clear"))
992 		ntb_stats_clear();
993 	else
994 		ntb_stats_display();
995 }
996 
997 cmdline_parse_token_string_t cmd_stats_show =
998 	TOKEN_STRING_INITIALIZER(struct cmd_stats_result, show, "show#clear");
999 cmdline_parse_token_string_t cmd_stats_port =
1000 	TOKEN_STRING_INITIALIZER(struct cmd_stats_result, port, "port");
1001 cmdline_parse_token_string_t cmd_stats_stats =
1002 	TOKEN_STRING_INITIALIZER(struct cmd_stats_result, stats, "stats");
1003 
1004 
1005 cmdline_parse_inst_t cmd_stats = {
1006 	.f = cmd_stats_parsed,
1007 	.data = NULL,
1008 	.help_str = "show|clear port stats",
1009 	.tokens = {
1010 		(void *)&cmd_stats_show,
1011 		(void *)&cmd_stats_port,
1012 		(void *)&cmd_stats_stats,
1013 		NULL,
1014 	},
1015 };
1016 
1017 /* *** SET FORWARDING MODE *** */
1018 struct cmd_set_fwd_mode_result {
1019 	cmdline_fixed_string_t set;
1020 	cmdline_fixed_string_t fwd;
1021 	cmdline_fixed_string_t mode;
1022 };
1023 
1024 static void
1025 cmd_set_fwd_mode_parsed(__rte_unused void *parsed_result,
1026 			__rte_unused struct cmdline *cl,
1027 			__rte_unused void *data)
1028 {
1029 	struct cmd_set_fwd_mode_result *res = parsed_result;
1030 	int i;
1031 
1032 	if (in_test) {
1033 		printf("Please stop traffic first.\n");
1034 		return;
1035 	}
1036 
1037 	for (i = 0; i < MAX_FWD_MODE; i++) {
1038 		if (!strcmp(res->mode, fwd_mode_s[i])) {
1039 			fwd_mode = i;
1040 			return;
1041 		}
1042 	}
1043 	printf("Invalid %s packet forwarding mode.\n", res->mode);
1044 }
1045 
1046 cmdline_parse_token_string_t cmd_setfwd_set =
1047 	TOKEN_STRING_INITIALIZER(struct cmd_set_fwd_mode_result, set, "set");
1048 cmdline_parse_token_string_t cmd_setfwd_fwd =
1049 	TOKEN_STRING_INITIALIZER(struct cmd_set_fwd_mode_result, fwd, "fwd");
1050 cmdline_parse_token_string_t cmd_setfwd_mode =
1051 	TOKEN_STRING_INITIALIZER(struct cmd_set_fwd_mode_result, mode,
1052 				"file-trans#iofwd#txonly#rxonly");
1053 
1054 cmdline_parse_inst_t cmd_set_fwd_mode = {
1055 	.f = cmd_set_fwd_mode_parsed,
1056 	.data = NULL,
1057 	.help_str = "set forwarding mode as file-trans|rxonly|txonly|iofwd",
1058 	.tokens = {
1059 		(void *)&cmd_setfwd_set,
1060 		(void *)&cmd_setfwd_fwd,
1061 		(void *)&cmd_setfwd_mode,
1062 		NULL,
1063 	},
1064 };
1065 
1066 /* list of instructions */
1067 cmdline_parse_ctx_t main_ctx[] = {
1068 	(cmdline_parse_inst_t *)&cmd_help,
1069 	(cmdline_parse_inst_t *)&cmd_send_file,
1070 	(cmdline_parse_inst_t *)&cmd_start,
1071 	(cmdline_parse_inst_t *)&cmd_stop,
1072 	(cmdline_parse_inst_t *)&cmd_stats,
1073 	(cmdline_parse_inst_t *)&cmd_set_fwd_mode,
1074 	(cmdline_parse_inst_t *)&cmd_quit,
1075 	NULL,
1076 };
1077 
1078 /* prompt function, called from main on MAIN lcore */
1079 static void
1080 prompt(void)
1081 {
1082 	struct cmdline *cl;
1083 
1084 	cl = cmdline_stdin_new(main_ctx, "ntb> ");
1085 	if (cl == NULL)
1086 		return;
1087 
1088 	cmdline_interact(cl);
1089 	cmdline_stdin_exit(cl);
1090 }
1091 
1092 static void
1093 signal_handler(int signum)
1094 {
1095 	if (signum == SIGINT || signum == SIGTERM) {
1096 		printf("\nSignal %d received, preparing to exit...\n", signum);
1097 		signal(signum, SIG_DFL);
1098 		kill(getpid(), signum);
1099 	}
1100 }
1101 
1102 #define OPT_BUF_SIZE         "buf-size"
1103 #define OPT_FWD_MODE         "fwd-mode"
1104 #define OPT_NB_DESC          "nb-desc"
1105 #define OPT_TXFREET          "txfreet"
1106 #define OPT_BURST            "burst"
1107 #define OPT_QP               "qp"
1108 
1109 enum {
1110 	/* long options mapped to a short option */
1111 	OPT_NO_ZERO_COPY_NUM = 1,
1112 	OPT_BUF_SIZE_NUM,
1113 	OPT_FWD_MODE_NUM,
1114 	OPT_NB_DESC_NUM,
1115 	OPT_TXFREET_NUM,
1116 	OPT_BURST_NUM,
1117 	OPT_QP_NUM,
1118 };
1119 
1120 static const char short_options[] =
1121 	"i" /* interactive mode */
1122 	;
1123 
1124 static const struct option lgopts[] = {
1125 	{OPT_BUF_SIZE,     1, NULL, OPT_BUF_SIZE_NUM     },
1126 	{OPT_FWD_MODE,     1, NULL, OPT_FWD_MODE_NUM     },
1127 	{OPT_NB_DESC,      1, NULL, OPT_NB_DESC_NUM      },
1128 	{OPT_TXFREET,      1, NULL, OPT_TXFREET_NUM      },
1129 	{OPT_BURST,        1, NULL, OPT_BURST_NUM        },
1130 	{OPT_QP,           1, NULL, OPT_QP_NUM           },
1131 	{0,                0, NULL, 0                    }
1132 };
1133 
1134 static void
1135 ntb_usage(const char *prgname)
1136 {
1137 	printf("%s [EAL options] -- [options]\n"
1138 	       "-i: run in interactive mode.\n"
1139 	       "-qp=N: set number of queues as N (N > 0, default: 1).\n"
1140 	       "--fwd-mode=N: set fwd mode (N: file-trans | rxonly | "
1141 	       "txonly | iofwd, default: file-trans)\n"
1142 	       "--buf-size=N: set mbuf dataroom size as N (0 < N < 65535,"
1143 	       " default: 2048).\n"
1144 	       "--nb-desc=N: set number of descriptors as N (%u <= N <= %u,"
1145 	       " default: 1024).\n"
1146 	       "--txfreet=N: set tx free thresh for NTB driver as N. (N >= 0)\n"
1147 	       "--burst=N: set pkt burst as N (0 < N <= %u default: 32).\n",
1148 	       prgname, NTB_MIN_DESC_SIZE, NTB_MAX_DESC_SIZE,
1149 	       NTB_MAX_PKT_BURST);
1150 }
1151 
1152 static void
1153 ntb_parse_args(int argc, char **argv)
1154 {
1155 	char *prgname = argv[0], **argvopt = argv;
1156 	int opt, opt_idx, n, i;
1157 
1158 	while ((opt = getopt_long(argc, argvopt, short_options,
1159 				lgopts, &opt_idx)) != EOF) {
1160 		switch (opt) {
1161 		case 'i':
1162 			printf("Interactive-mode selected.\n");
1163 			interactive = 1;
1164 			break;
1165 		case OPT_QP_NUM:
1166 			n = atoi(optarg);
1167 			if (n > 0)
1168 				num_queues = n;
1169 			else
1170 				rte_exit(EXIT_FAILURE, "q must be > 0.\n");
1171 			break;
1172 		case OPT_BUF_SIZE_NUM:
1173 			n = atoi(optarg);
1174 			if (n > RTE_PKTMBUF_HEADROOM && n <= 0xFFFF)
1175 				ntb_buf_size = n;
1176 			else
1177 				rte_exit(EXIT_FAILURE, "buf-size must be > "
1178 					"%u and < 65536.\n",
1179 					RTE_PKTMBUF_HEADROOM);
1180 			break;
1181 		case OPT_FWD_MODE_NUM:
1182 			for (i = 0; i < MAX_FWD_MODE; i++) {
1183 				if (!strcmp(optarg, fwd_mode_s[i])) {
1184 					fwd_mode = i;
1185 					break;
1186 				}
1187 			}
1188 			if (i == MAX_FWD_MODE)
1189 				rte_exit(EXIT_FAILURE, "Unsupported mode. "
1190 				"(Should be: file-trans | rxonly | txonly "
1191 				"| iofwd)\n");
1192 			break;
1193 		case OPT_NB_DESC_NUM:
1194 			n = atoi(optarg);
1195 			if (n >= NTB_MIN_DESC_SIZE && n <= NTB_MAX_DESC_SIZE)
1196 				nb_desc = n;
1197 			else
1198 				rte_exit(EXIT_FAILURE, "nb-desc must be within"
1199 					" [%u, %u].\n", NTB_MIN_DESC_SIZE,
1200 					NTB_MAX_DESC_SIZE);
1201 			break;
1202 		case OPT_TXFREET_NUM:
1203 			n = atoi(optarg);
1204 			if (n >= 0)
1205 				tx_free_thresh = n;
1206 			else
1207 				rte_exit(EXIT_FAILURE, "txfreet must be"
1208 					" >= 0\n");
1209 			break;
1210 		case OPT_BURST_NUM:
1211 			n = atoi(optarg);
1212 			if (n > 0 && n <= NTB_MAX_PKT_BURST)
1213 				pkt_burst = n;
1214 			else
1215 				rte_exit(EXIT_FAILURE, "burst must be within "
1216 					"(0, %u].\n", NTB_MAX_PKT_BURST);
1217 			break;
1218 
1219 		default:
1220 			ntb_usage(prgname);
1221 			rte_exit(EXIT_FAILURE,
1222 				 "Command line is incomplete or incorrect.\n");
1223 			break;
1224 		}
1225 	}
1226 }
1227 
1228 static void
1229 ntb_mempool_mz_free(__rte_unused struct rte_mempool_memhdr *memhdr,
1230 		void *opaque)
1231 {
1232 	const struct rte_memzone *mz = opaque;
1233 	rte_memzone_free(mz);
1234 }
1235 
1236 static struct rte_mempool *
1237 ntb_mbuf_pool_create(uint16_t mbuf_seg_size, uint32_t nb_mbuf,
1238 		     struct ntb_dev_info ntb_info,
1239 		     struct ntb_dev_config *ntb_conf,
1240 		     unsigned int socket_id)
1241 {
1242 	size_t mz_len, total_elt_sz, max_mz_len, left_sz;
1243 	struct rte_pktmbuf_pool_private mbp_priv;
1244 	char pool_name[RTE_MEMPOOL_NAMESIZE];
1245 	char mz_name[RTE_MEMZONE_NAMESIZE];
1246 	const struct rte_memzone *mz;
1247 	struct rte_mempool *mp;
1248 	uint64_t align;
1249 	uint32_t mz_id;
1250 	int ret;
1251 
1252 	snprintf(pool_name, sizeof(pool_name), "ntb_mbuf_pool_%u", socket_id);
1253 	mp = rte_mempool_create_empty(pool_name, nb_mbuf,
1254 				      (mbuf_seg_size + sizeof(struct rte_mbuf)),
1255 				      MEMPOOL_CACHE_SIZE,
1256 				      sizeof(struct rte_pktmbuf_pool_private),
1257 				      socket_id, 0);
1258 	if (mp == NULL)
1259 		return NULL;
1260 
1261 	if (rte_mempool_set_ops_byname(mp, rte_mbuf_best_mempool_ops(), NULL)) {
1262 		printf("error setting mempool handler\n");
1263 		goto fail;
1264 	}
1265 
1266 	memset(&mbp_priv, 0, sizeof(mbp_priv));
1267 	mbp_priv.mbuf_data_room_size = mbuf_seg_size;
1268 	mbp_priv.mbuf_priv_size = 0;
1269 	rte_pktmbuf_pool_init(mp, &mbp_priv);
1270 
1271 	ntb_conf->mz_list = rte_zmalloc("ntb_memzone_list",
1272 				sizeof(struct rte_memzone *) *
1273 				ntb_info.mw_cnt, 0);
1274 	if (ntb_conf->mz_list == NULL)
1275 		goto fail;
1276 
1277 	/* Put ntb header on mw0. */
1278 	if (ntb_info.mw_size[0] < ntb_info.ntb_hdr_size) {
1279 		printf("mw0 (size: %" PRIu64 ") is not enough for ntb hdr"
1280 		       " (size: %u)\n", ntb_info.mw_size[0],
1281 		       ntb_info.ntb_hdr_size);
1282 		goto fail;
1283 	}
1284 
1285 	total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;
1286 	left_sz = total_elt_sz * nb_mbuf;
1287 	for (mz_id = 0; mz_id < ntb_info.mw_cnt; mz_id++) {
1288 		/* If populated mbuf is enough, no need to reserve extra mz. */
1289 		if (!left_sz)
1290 			break;
1291 		snprintf(mz_name, sizeof(mz_name), "ntb_mw_%d", mz_id);
1292 		align = ntb_info.mw_size_align ? ntb_info.mw_size[mz_id] :
1293 			RTE_CACHE_LINE_SIZE;
1294 		/* Reserve ntb header space on memzone 0. */
1295 		max_mz_len = mz_id ? ntb_info.mw_size[mz_id] :
1296 			     ntb_info.mw_size[mz_id] - ntb_info.ntb_hdr_size;
1297 		mz_len = left_sz <= max_mz_len ? left_sz :
1298 			(max_mz_len / total_elt_sz * total_elt_sz);
1299 		if (!mz_len)
1300 			continue;
1301 		mz = rte_memzone_reserve_aligned(mz_name, mz_len, socket_id,
1302 					RTE_MEMZONE_IOVA_CONTIG, align);
1303 		if (mz == NULL) {
1304 			printf("Cannot allocate %" PRIu64 " aligned memzone"
1305 				" %u\n", align, mz_id);
1306 			goto fail;
1307 		}
1308 		left_sz -= mz_len;
1309 
1310 		/* Reserve ntb header space on memzone 0. */
1311 		if (mz_id)
1312 			ret = rte_mempool_populate_iova(mp, mz->addr, mz->iova,
1313 					mz->len, ntb_mempool_mz_free,
1314 					(void *)(uintptr_t)mz);
1315 		else
1316 			ret = rte_mempool_populate_iova(mp,
1317 					(void *)((size_t)mz->addr +
1318 					ntb_info.ntb_hdr_size),
1319 					mz->iova + ntb_info.ntb_hdr_size,
1320 					mz->len - ntb_info.ntb_hdr_size,
1321 					ntb_mempool_mz_free,
1322 					(void *)(uintptr_t)mz);
1323 		if (ret <= 0) {
1324 			rte_memzone_free(mz);
1325 			rte_mempool_free(mp);
1326 			return NULL;
1327 		}
1328 
1329 		ntb_conf->mz_list[mz_id] = mz;
1330 	}
1331 	if (left_sz) {
1332 		printf("mw space is not enough for mempool.\n");
1333 		goto fail;
1334 	}
1335 
1336 	ntb_conf->mz_num = mz_id;
1337 	rte_mempool_obj_iter(mp, rte_pktmbuf_init, NULL);
1338 
1339 	return mp;
1340 fail:
1341 	rte_mempool_free(mp);
1342 	return NULL;
1343 }
1344 
1345 int
1346 main(int argc, char **argv)
1347 {
1348 	struct rte_eth_conf eth_pconf = eth_port_conf;
1349 	struct rte_rawdev_info ntb_rawdev_conf;
1350 	struct rte_rawdev_info ntb_rawdev_info;
1351 	struct rte_eth_dev_info ethdev_info;
1352 	struct rte_eth_rxconf eth_rx_conf;
1353 	struct rte_eth_txconf eth_tx_conf;
1354 	struct ntb_queue_conf ntb_q_conf;
1355 	struct ntb_dev_config ntb_conf;
1356 	struct ntb_dev_info ntb_info;
1357 	uint64_t ntb_link_status;
1358 	uint32_t nb_mbuf;
1359 	int ret, i;
1360 
1361 	signal(SIGINT, signal_handler);
1362 	signal(SIGTERM, signal_handler);
1363 
1364 	ret = rte_eal_init(argc, argv);
1365 	if (ret < 0)
1366 		rte_exit(EXIT_FAILURE, "Error with EAL initialization.\n");
1367 
1368 	if (rte_lcore_count() < 2)
1369 		rte_exit(EXIT_FAILURE, "Need at least 2 cores\n");
1370 
1371 	/* Find 1st ntb rawdev. */
1372 	for (i = 0; i < RTE_RAWDEV_MAX_DEVS; i++)
1373 		if (rte_rawdevs[i].driver_name &&
1374 		    (strncmp(rte_rawdevs[i].driver_name, "raw_ntb",
1375 		    NTB_DRV_NAME_LEN) == 0) && (rte_rawdevs[i].attached == 1))
1376 			break;
1377 
1378 	if (i == RTE_RAWDEV_MAX_DEVS)
1379 		rte_exit(EXIT_FAILURE, "Cannot find any ntb device.\n");
1380 
1381 	dev_id = i;
1382 
1383 	argc -= ret;
1384 	argv += ret;
1385 
1386 	ntb_parse_args(argc, argv);
1387 
1388 	rte_rawdev_set_attr(dev_id, NTB_QUEUE_SZ_NAME, nb_desc);
1389 	printf("Set queue size as %u.\n", nb_desc);
1390 	rte_rawdev_set_attr(dev_id, NTB_QUEUE_NUM_NAME, num_queues);
1391 	printf("Set queue number as %u.\n", num_queues);
1392 	ntb_rawdev_info.dev_private = (rte_rawdev_obj_t)(&ntb_info);
1393 	rte_rawdev_info_get(dev_id, &ntb_rawdev_info, sizeof(ntb_info));
1394 
1395 	nb_mbuf = nb_desc * num_queues * 2 * 2 + rte_lcore_count() *
1396 		  MEMPOOL_CACHE_SIZE;
1397 	mbuf_pool = ntb_mbuf_pool_create(ntb_buf_size, nb_mbuf, ntb_info,
1398 					 &ntb_conf, rte_socket_id());
1399 	if (mbuf_pool == NULL)
1400 		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool.\n");
1401 
1402 	ntb_conf.num_queues = num_queues;
1403 	ntb_conf.queue_size = nb_desc;
1404 	ntb_rawdev_conf.dev_private = (rte_rawdev_obj_t)(&ntb_conf);
1405 	ret = rte_rawdev_configure(dev_id, &ntb_rawdev_conf, sizeof(ntb_conf));
1406 	if (ret)
1407 		rte_exit(EXIT_FAILURE, "Can't config ntb dev: err=%d, "
1408 			"port=%u\n", ret, dev_id);
1409 
1410 	ntb_q_conf.tx_free_thresh = tx_free_thresh;
1411 	ntb_q_conf.nb_desc = nb_desc;
1412 	ntb_q_conf.rx_mp = mbuf_pool;
1413 	for (i = 0; i < num_queues; i++) {
1414 		/* Setup rawdev queue */
1415 		ret = rte_rawdev_queue_setup(dev_id, i, &ntb_q_conf,
1416 				sizeof(ntb_q_conf));
1417 		if (ret < 0)
1418 			rte_exit(EXIT_FAILURE,
1419 				"Failed to setup ntb queue %u.\n", i);
1420 	}
1421 
1422 	/* Waiting for peer dev up at most 100s.*/
1423 	printf("Checking ntb link status...\n");
1424 	for (i = 0; i < 1000; i++) {
1425 		rte_rawdev_get_attr(dev_id, NTB_LINK_STATUS_NAME,
1426 				    &ntb_link_status);
1427 		if (ntb_link_status) {
1428 			printf("Peer dev ready, ntb link up.\n");
1429 			break;
1430 		}
1431 		rte_delay_ms(100);
1432 	}
1433 	rte_rawdev_get_attr(dev_id, NTB_LINK_STATUS_NAME, &ntb_link_status);
1434 	if (ntb_link_status == 0)
1435 		printf("Expire 100s. Link is not up. Please restart app.\n");
1436 
1437 	ret = rte_rawdev_start(dev_id);
1438 	if (ret < 0)
1439 		rte_exit(EXIT_FAILURE, "rte_rawdev_start: err=%d, port=%u\n",
1440 			ret, dev_id);
1441 
1442 	/* Find 1st ethdev */
1443 	eth_port_id = rte_eth_find_next(0);
1444 
1445 	if (eth_port_id < RTE_MAX_ETHPORTS) {
1446 		rte_eth_dev_info_get(eth_port_id, &ethdev_info);
1447 		eth_pconf.rx_adv_conf.rss_conf.rss_hf &=
1448 				ethdev_info.flow_type_rss_offloads;
1449 		ret = rte_eth_dev_configure(eth_port_id, num_queues,
1450 					    num_queues, &eth_pconf);
1451 		if (ret)
1452 			rte_exit(EXIT_FAILURE, "Can't config ethdev: err=%d, "
1453 				"port=%u\n", ret, eth_port_id);
1454 		eth_rx_conf = ethdev_info.default_rxconf;
1455 		eth_rx_conf.offloads = eth_pconf.rxmode.offloads;
1456 		eth_tx_conf = ethdev_info.default_txconf;
1457 		eth_tx_conf.offloads = eth_pconf.txmode.offloads;
1458 
1459 		/* Setup ethdev queue if ethdev exists */
1460 		for (i = 0; i < num_queues; i++) {
1461 			ret = rte_eth_rx_queue_setup(eth_port_id, i, nb_desc,
1462 					rte_eth_dev_socket_id(eth_port_id),
1463 					&eth_rx_conf, mbuf_pool);
1464 			if (ret < 0)
1465 				rte_exit(EXIT_FAILURE,
1466 					"Failed to setup eth rxq %u.\n", i);
1467 			ret = rte_eth_tx_queue_setup(eth_port_id, i, nb_desc,
1468 					rte_eth_dev_socket_id(eth_port_id),
1469 					&eth_tx_conf);
1470 			if (ret < 0)
1471 				rte_exit(EXIT_FAILURE,
1472 					"Failed to setup eth txq %u.\n", i);
1473 		}
1474 
1475 		ret = rte_eth_dev_start(eth_port_id);
1476 		if (ret < 0)
1477 			rte_exit(EXIT_FAILURE, "rte_eth_dev_start: err=%d, "
1478 				"port=%u\n", ret, eth_port_id);
1479 	}
1480 
1481 	/* initialize port stats */
1482 	memset(&ntb_port_stats, 0, sizeof(ntb_port_stats));
1483 
1484 	/* Set default fwd mode if user doesn't set it. */
1485 	if (fwd_mode == MAX_FWD_MODE && eth_port_id < RTE_MAX_ETHPORTS) {
1486 		printf("Set default fwd mode as iofwd.\n");
1487 		fwd_mode = IOFWD;
1488 	}
1489 	if (fwd_mode == MAX_FWD_MODE) {
1490 		printf("Set default fwd mode as file-trans.\n");
1491 		fwd_mode = FILE_TRANS;
1492 	}
1493 
1494 	if (interactive) {
1495 		sleep(1);
1496 		prompt();
1497 	} else {
1498 		start_pkt_fwd();
1499 	}
1500 
1501 	return 0;
1502 }
1503