xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision 8728ccf37615904cf23fb8763895b05c9a3c6b0c)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <sys/types.h>
13 #include <sys/epoll.h>
14 #include <sys/queue.h>
15 #include <sys/time.h>
16 
17 #include <rte_log.h>
18 #include <rte_memory.h>
19 #include <rte_malloc.h>
20 #include <rte_atomic.h>
21 #include <rte_cycles.h>
22 #include <rte_ethdev.h>
23 #include <rte_pmd_i40e.h>
24 
25 #include <libvirt/libvirt.h>
26 #include "channel_monitor.h"
27 #include "channel_commands.h"
28 #include "channel_manager.h"
29 #include "power_manager.h"
30 
31 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
32 
33 #define MAX_EVENTS 256
34 
35 uint64_t vsi_pkt_count_prev[384];
36 uint64_t rdtsc_prev[384];
37 
38 double time_period_ms = 1;
39 static volatile unsigned run_loop = 1;
40 static int global_event_fd;
41 static unsigned int policy_is_set;
42 static struct epoll_event *global_events_list;
43 static struct policy policies[MAX_VMS];
44 
45 void channel_monitor_exit(void)
46 {
47 	run_loop = 0;
48 	rte_free(global_events_list);
49 }
50 
51 static void
52 core_share(int pNo, int z, int x, int t)
53 {
54 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
55 		if (strcmp(policies[pNo].pkt.vm_name,
56 				lvm_info[x].vm_name) != 0) {
57 			policies[pNo].core_share[z].status = 1;
58 			power_manager_scale_core_max(
59 					policies[pNo].core_share[z].pcpu);
60 		}
61 	}
62 }
63 
64 static void
65 core_share_status(int pNo)
66 {
67 
68 	int noVms, noVcpus, z, x, t;
69 
70 	get_all_vm(&noVms, &noVcpus);
71 
72 	/* Reset Core Share Status. */
73 	for (z = 0; z < noVcpus; z++)
74 		policies[pNo].core_share[z].status = 0;
75 
76 	/* Foreach vcpu in a policy. */
77 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
78 		/* Foreach VM on the platform. */
79 		for (x = 0; x < noVms; x++) {
80 			/* Foreach vcpu of VMs on platform. */
81 			for (t = 0; t < lvm_info[x].num_cpus; t++)
82 				core_share(pNo, z, x, t);
83 		}
84 	}
85 }
86 
87 static void
88 get_pcpu_to_control(struct policy *pol)
89 {
90 
91 	/* Convert vcpu to pcpu. */
92 	struct vm_info info;
93 	int pcpu, count;
94 	uint64_t mask_u64b;
95 
96 	RTE_LOG(INFO, CHANNEL_MONITOR, "Looking for pcpu for %s\n",
97 			pol->pkt.vm_name);
98 	get_info_vm(pol->pkt.vm_name, &info);
99 
100 	for (count = 0; count < pol->pkt.num_vcpu; count++) {
101 		mask_u64b = info.pcpu_mask[pol->pkt.vcpu_to_control[count]];
102 		for (pcpu = 0; mask_u64b; mask_u64b &= ~(1ULL << pcpu++)) {
103 			if ((mask_u64b >> pcpu) & 1)
104 				pol->core_share[count].pcpu = pcpu;
105 		}
106 	}
107 }
108 
109 static int
110 get_pfid(struct policy *pol)
111 {
112 
113 	int i, x, ret = 0;
114 
115 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
116 
117 		RTE_ETH_FOREACH_DEV(x) {
118 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
119 				(struct ether_addr *)&(pol->pkt.vfid[i]));
120 			if (ret != -EINVAL) {
121 				pol->port[i] = x;
122 				break;
123 			}
124 		}
125 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
126 			RTE_LOG(INFO, CHANNEL_MONITOR,
127 				"Error with Policy. MAC not found on "
128 				"attached ports ");
129 			pol->enabled = 0;
130 			return ret;
131 		}
132 		pol->pfid[i] = ret;
133 	}
134 	return 1;
135 }
136 
137 static int
138 update_policy(struct channel_packet *pkt)
139 {
140 
141 	unsigned int updated = 0;
142 	int i;
143 
144 	for (i = 0; i < MAX_VMS; i++) {
145 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
146 			policies[i].pkt = *pkt;
147 			get_pcpu_to_control(&policies[i]);
148 			if (get_pfid(&policies[i]) == -1) {
149 				updated = 1;
150 				break;
151 			}
152 			core_share_status(i);
153 			policies[i].enabled = 1;
154 			updated = 1;
155 		}
156 	}
157 	if (!updated) {
158 		for (i = 0; i < MAX_VMS; i++) {
159 			if (policies[i].enabled == 0) {
160 				policies[i].pkt = *pkt;
161 				get_pcpu_to_control(&policies[i]);
162 				if (get_pfid(&policies[i]) == -1)
163 					break;
164 				core_share_status(i);
165 				policies[i].enabled = 1;
166 				break;
167 			}
168 		}
169 	}
170 	return 0;
171 }
172 
173 static uint64_t
174 get_pkt_diff(struct policy *pol)
175 {
176 
177 	uint64_t vsi_pkt_count,
178 		vsi_pkt_total = 0,
179 		vsi_pkt_count_prev_total = 0;
180 	double rdtsc_curr, rdtsc_diff, diff;
181 	int x;
182 	struct rte_eth_stats vf_stats;
183 
184 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
185 
186 		/*Read vsi stats*/
187 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
188 			vsi_pkt_count = vf_stats.ipackets;
189 		else
190 			vsi_pkt_count = -1;
191 
192 		vsi_pkt_total += vsi_pkt_count;
193 
194 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
195 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
196 	}
197 
198 	rdtsc_curr = rte_rdtsc_precise();
199 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
200 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
201 
202 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
203 			((double)rte_get_tsc_hz() / rdtsc_diff);
204 
205 	return diff;
206 }
207 
208 static void
209 apply_traffic_profile(struct policy *pol)
210 {
211 
212 	int count;
213 	uint64_t diff = 0;
214 
215 	diff = get_pkt_diff(pol);
216 
217 	RTE_LOG(INFO, CHANNEL_MONITOR, "Applying traffic profile\n");
218 
219 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
220 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
221 			if (pol->core_share[count].status != 1)
222 				power_manager_scale_core_max(
223 						pol->core_share[count].pcpu);
224 		}
225 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
226 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
227 			if (pol->core_share[count].status != 1)
228 				power_manager_scale_core_med(
229 						pol->core_share[count].pcpu);
230 		}
231 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
232 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
233 			if (pol->core_share[count].status != 1)
234 				power_manager_scale_core_min(
235 						pol->core_share[count].pcpu);
236 		}
237 	}
238 }
239 
240 static void
241 apply_time_profile(struct policy *pol)
242 {
243 
244 	int count, x;
245 	struct timeval tv;
246 	struct tm *ptm;
247 	char time_string[40];
248 
249 	/* Obtain the time of day, and convert it to a tm struct. */
250 	gettimeofday(&tv, NULL);
251 	ptm = localtime(&tv.tv_sec);
252 	/* Format the date and time, down to a single second. */
253 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
254 
255 	for (x = 0; x < HOURS; x++) {
256 
257 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
258 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
259 				if (pol->core_share[count].status != 1) {
260 					power_manager_scale_core_max(
261 						pol->core_share[count].pcpu);
262 				RTE_LOG(INFO, CHANNEL_MONITOR,
263 					"Scaling up core %d to max\n",
264 					pol->core_share[count].pcpu);
265 				}
266 			}
267 			break;
268 		} else if (ptm->tm_hour ==
269 				pol->pkt.timer_policy.quiet_hours[x]) {
270 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
271 				if (pol->core_share[count].status != 1) {
272 					power_manager_scale_core_min(
273 						pol->core_share[count].pcpu);
274 				RTE_LOG(INFO, CHANNEL_MONITOR,
275 					"Scaling down core %d to min\n",
276 					pol->core_share[count].pcpu);
277 			}
278 		}
279 			break;
280 		} else if (ptm->tm_hour ==
281 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
282 			apply_traffic_profile(pol);
283 			break;
284 		}
285 	}
286 }
287 
288 static void
289 apply_workload_profile(struct policy *pol)
290 {
291 
292 	int count;
293 
294 	if (pol->pkt.workload == HIGH) {
295 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
296 			if (pol->core_share[count].status != 1)
297 				power_manager_scale_core_max(
298 						pol->core_share[count].pcpu);
299 		}
300 	} else if (pol->pkt.workload == MEDIUM) {
301 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
302 			if (pol->core_share[count].status != 1)
303 				power_manager_scale_core_med(
304 						pol->core_share[count].pcpu);
305 		}
306 	} else if (pol->pkt.workload == LOW) {
307 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
308 			if (pol->core_share[count].status != 1)
309 				power_manager_scale_core_min(
310 						pol->core_share[count].pcpu);
311 		}
312 	}
313 }
314 
315 static void
316 apply_policy(struct policy *pol)
317 {
318 
319 	struct channel_packet *pkt = &pol->pkt;
320 
321 	/*Check policy to use*/
322 	if (pkt->policy_to_use == TRAFFIC)
323 		apply_traffic_profile(pol);
324 	else if (pkt->policy_to_use == TIME)
325 		apply_time_profile(pol);
326 	else if (pkt->policy_to_use == WORKLOAD)
327 		apply_workload_profile(pol);
328 }
329 
330 
331 static int
332 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
333 {
334 	uint64_t core_mask;
335 
336 	if (chan_info == NULL)
337 		return -1;
338 
339 	if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
340 			CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
341 		return -1;
342 
343 	if (pkt->command == CPU_POWER) {
344 		core_mask = get_pcpus_mask(chan_info, pkt->resource_id);
345 		if (core_mask == 0) {
346 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error get physical CPU mask for "
347 				"channel '%s' using vCPU(%u)\n", chan_info->channel_path,
348 				(unsigned)pkt->unit);
349 			return -1;
350 		}
351 		if (__builtin_popcountll(core_mask) == 1) {
352 
353 			unsigned core_num = __builtin_ffsll(core_mask) - 1;
354 
355 			switch (pkt->unit) {
356 			case(CPU_POWER_SCALE_MIN):
357 					power_manager_scale_core_min(core_num);
358 			break;
359 			case(CPU_POWER_SCALE_MAX):
360 					power_manager_scale_core_max(core_num);
361 			break;
362 			case(CPU_POWER_SCALE_DOWN):
363 					power_manager_scale_core_down(core_num);
364 			break;
365 			case(CPU_POWER_SCALE_UP):
366 					power_manager_scale_core_up(core_num);
367 			break;
368 			case(CPU_POWER_ENABLE_TURBO):
369 				power_manager_enable_turbo_core(core_num);
370 			break;
371 			case(CPU_POWER_DISABLE_TURBO):
372 				power_manager_disable_turbo_core(core_num);
373 			break;
374 			default:
375 				break;
376 			}
377 		} else {
378 			switch (pkt->unit) {
379 			case(CPU_POWER_SCALE_MIN):
380 					power_manager_scale_mask_min(core_mask);
381 			break;
382 			case(CPU_POWER_SCALE_MAX):
383 					power_manager_scale_mask_max(core_mask);
384 			break;
385 			case(CPU_POWER_SCALE_DOWN):
386 					power_manager_scale_mask_down(core_mask);
387 			break;
388 			case(CPU_POWER_SCALE_UP):
389 					power_manager_scale_mask_up(core_mask);
390 			break;
391 			case(CPU_POWER_ENABLE_TURBO):
392 				power_manager_enable_turbo_mask(core_mask);
393 			break;
394 			case(CPU_POWER_DISABLE_TURBO):
395 				power_manager_disable_turbo_mask(core_mask);
396 			break;
397 			default:
398 				break;
399 			}
400 
401 		}
402 	}
403 
404 	if (pkt->command == PKT_POLICY) {
405 		RTE_LOG(INFO, CHANNEL_MONITOR, "\nProcessing Policy request from Guest\n");
406 		update_policy(pkt);
407 		policy_is_set = 1;
408 	}
409 
410 	/* Return is not checked as channel status may have been set to DISABLED
411 	 * from management thread
412 	 */
413 	rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
414 			CHANNEL_MGR_CHANNEL_CONNECTED);
415 	return 0;
416 
417 }
418 
419 int
420 add_channel_to_monitor(struct channel_info **chan_info)
421 {
422 	struct channel_info *info = *chan_info;
423 	struct epoll_event event;
424 
425 	event.events = EPOLLIN;
426 	event.data.ptr = info;
427 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
428 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
429 				"to epoll\n", info->channel_path);
430 		return -1;
431 	}
432 	return 0;
433 }
434 
435 int
436 remove_channel_from_monitor(struct channel_info *chan_info)
437 {
438 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL, chan_info->fd, NULL) < 0) {
439 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
440 				"from epoll\n", chan_info->channel_path);
441 		return -1;
442 	}
443 	return 0;
444 }
445 
446 int
447 channel_monitor_init(void)
448 {
449 	global_event_fd = epoll_create1(0);
450 	if (global_event_fd == 0) {
451 		RTE_LOG(ERR, CHANNEL_MONITOR, "Error creating epoll context with "
452 				"error %s\n", strerror(errno));
453 		return -1;
454 	}
455 	global_events_list = rte_malloc("epoll_events", sizeof(*global_events_list)
456 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
457 	if (global_events_list == NULL) {
458 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
459 				"epoll events\n");
460 		return -1;
461 	}
462 	return 0;
463 }
464 
465 void
466 run_channel_monitor(void)
467 {
468 	while (run_loop) {
469 		int n_events, i;
470 
471 		n_events = epoll_wait(global_event_fd, global_events_list,
472 				MAX_EVENTS, 1);
473 		if (!run_loop)
474 			break;
475 		for (i = 0; i < n_events; i++) {
476 			struct channel_info *chan_info = (struct channel_info *)
477 					global_events_list[i].data.ptr;
478 			if ((global_events_list[i].events & EPOLLERR) ||
479 				(global_events_list[i].events & EPOLLHUP)) {
480 				RTE_LOG(DEBUG, CHANNEL_MONITOR, "Remote closed connection for "
481 						"channel '%s'\n",
482 						chan_info->channel_path);
483 				remove_channel(&chan_info);
484 				continue;
485 			}
486 			if (global_events_list[i].events & EPOLLIN) {
487 
488 				int n_bytes, err = 0;
489 				struct channel_packet pkt;
490 				void *buffer = &pkt;
491 				int buffer_len = sizeof(pkt);
492 
493 				while (buffer_len > 0) {
494 					n_bytes = read(chan_info->fd,
495 							buffer, buffer_len);
496 					if (n_bytes == buffer_len)
497 						break;
498 					if (n_bytes == -1) {
499 						err = errno;
500 						RTE_LOG(DEBUG, CHANNEL_MONITOR,
501 							"Received error on "
502 							"channel '%s' read: %s\n",
503 							chan_info->channel_path,
504 							strerror(err));
505 						remove_channel(&chan_info);
506 						break;
507 					}
508 					buffer = (char *)buffer + n_bytes;
509 					buffer_len -= n_bytes;
510 				}
511 				if (!err)
512 					process_request(&pkt, chan_info);
513 			}
514 		}
515 		rte_delay_us(time_period_ms*1000);
516 		if (policy_is_set) {
517 			int j;
518 
519 			for (j = 0; j < MAX_VMS; j++) {
520 				if (policies[j].enabled == 1)
521 					apply_policy(&policies[j]);
522 			}
523 		}
524 	}
525 }
526