xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision b462f2737eb08b07b84da4204fbd1c9b9ba00b2d)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <ctype.h>
6 #include <unistd.h>
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <stdint.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <string.h>
13 #include <fcntl.h>
14 #include <sys/types.h>
15 #include <sys/epoll.h>
16 #include <sys/queue.h>
17 #include <sys/time.h>
18 #include <sys/socket.h>
19 #include <sys/select.h>
20 #ifdef USE_JANSSON
21 #include <jansson.h>
22 #else
23 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
24 #endif
25 #include <rte_string_fns.h>
26 #include <rte_log.h>
27 #include <rte_memory.h>
28 #include <rte_malloc.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #ifdef RTE_NET_I40E
32 #include <rte_pmd_i40e.h>
33 #endif
34 #include <rte_power_cpufreq.h>
35 #include <rte_power_guest_channel.h>
36 
37 #include <libvirt/libvirt.h>
38 #include "channel_monitor.h"
39 #include "channel_manager.h"
40 #include "power_manager.h"
41 #include "oob_monitor.h"
42 
43 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
44 
45 #define MAX_EVENTS 256
46 
47 uint64_t vsi_pkt_count_prev[384];
48 uint64_t rdtsc_prev[384];
49 #define MAX_JSON_STRING_LEN 1024
50 char json_data[MAX_JSON_STRING_LEN];
51 
52 double time_period_ms = 1;
53 static volatile unsigned run_loop = 1;
54 static int global_event_fd;
55 static unsigned int policy_is_set;
56 static struct epoll_event *global_events_list;
57 static struct policy policies[RTE_MAX_LCORE];
58 
59 #ifdef USE_JANSSON
60 
61 union PFID {
62 	struct rte_ether_addr addr;
63 	uint64_t pfid;
64 };
65 
66 static int
67 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
68 {
69 	int i;
70 	char *end;
71 	unsigned long o[RTE_ETHER_ADDR_LEN];
72 
73 	i = 0;
74 	do {
75 		errno = 0;
76 		o[i] = strtoul(a, &end, 16);
77 		if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
78 			return -1;
79 		a = end + 1;
80 	} while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
81 
82 	/* Junk at the end of line */
83 	if (end[0] != 0)
84 		return -1;
85 
86 	/* Support the format XX:XX:XX:XX:XX:XX */
87 	if (i == RTE_ETHER_ADDR_LEN) {
88 		while (i-- != 0) {
89 			if (o[i] > UINT8_MAX)
90 				return -1;
91 			ether_addr->addr_bytes[i] = (uint8_t)o[i];
92 		}
93 	/* Support the format XXXX:XXXX:XXXX */
94 	} else if (i == RTE_ETHER_ADDR_LEN / 2) {
95 		while (i-- != 0) {
96 			if (o[i] > UINT16_MAX)
97 				return -1;
98 			ether_addr->addr_bytes[i * 2] =
99 					(uint8_t)(o[i] >> 8);
100 			ether_addr->addr_bytes[i * 2 + 1] =
101 					(uint8_t)(o[i] & 0xff);
102 		}
103 	/* unknown format */
104 	} else
105 		return -1;
106 
107 	return 0;
108 }
109 
110 static int
111 set_policy_mac(struct rte_power_channel_packet *pkt, int idx, char *mac)
112 {
113 	union PFID pfid;
114 	int ret;
115 
116 	/* Use port MAC address as the vfid */
117 	ret = str_to_ether_addr(mac, &pfid.addr);
118 
119 	if (ret != 0) {
120 		RTE_LOG(ERR, CHANNEL_MONITOR,
121 			"Invalid mac address received in JSON\n");
122 		pkt->vfid[idx] = 0;
123 		return -1;
124 	}
125 
126 	printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
127 			"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
128 			RTE_ETHER_ADDR_BYTES(&pfid.addr));
129 
130 	pkt->vfid[idx] = pfid.pfid;
131 	return 0;
132 }
133 
134 static char*
135 get_resource_name_from_chn_path(const char *channel_path)
136 {
137 	char *substr = NULL;
138 
139 	substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
140 
141 	return substr;
142 }
143 
144 static int
145 get_resource_id_from_vmname(const char *vm_name)
146 {
147 	int result = -1;
148 	int off = 0;
149 
150 	if (vm_name == NULL)
151 		return -1;
152 
153 	while (vm_name[off] != '\0') {
154 		if (isdigit(vm_name[off]))
155 			break;
156 		off++;
157 	}
158 	result = atoi(&vm_name[off]);
159 	if ((result == 0) && (vm_name[off] != '0'))
160 		return -1;
161 
162 	return result;
163 }
164 
165 static int
166 parse_json_to_pkt(json_t *element, struct rte_power_channel_packet *pkt,
167 					const char *vm_name)
168 {
169 	const char *key;
170 	json_t *value;
171 	int ret;
172 	int resource_id;
173 
174 	memset(pkt, 0, sizeof(*pkt));
175 
176 	pkt->nb_mac_to_monitor = 0;
177 	pkt->t_boost_status.tbEnabled = false;
178 	pkt->workload = RTE_POWER_WL_LOW;
179 	pkt->policy_to_use = RTE_POWER_POLICY_TIME;
180 	pkt->command = RTE_POWER_PKT_POLICY;
181 	pkt->core_type = RTE_POWER_CORE_TYPE_PHYSICAL;
182 
183 	if (vm_name == NULL) {
184 		RTE_LOG(ERR, CHANNEL_MONITOR,
185 			"vm_name is NULL, request rejected !\n");
186 		return -1;
187 	}
188 
189 	json_object_foreach(element, key, value) {
190 		if (!strcmp(key, "policy")) {
191 			/* Recurse in to get the contents of profile */
192 			ret = parse_json_to_pkt(value, pkt, vm_name);
193 			if (ret)
194 				return ret;
195 		} else if (!strcmp(key, "instruction")) {
196 			/* Recurse in to get the contents of instruction */
197 			ret = parse_json_to_pkt(value, pkt, vm_name);
198 			if (ret)
199 				return ret;
200 		} else if (!strcmp(key, "command")) {
201 			char command[32];
202 			strlcpy(command, json_string_value(value), 32);
203 			if (!strcmp(command, "power")) {
204 				pkt->command = RTE_POWER_CPU_POWER;
205 			} else if (!strcmp(command, "create")) {
206 				pkt->command = RTE_POWER_PKT_POLICY;
207 			} else if (!strcmp(command, "destroy")) {
208 				pkt->command = RTE_POWER_PKT_POLICY_REMOVE;
209 			} else {
210 				RTE_LOG(ERR, CHANNEL_MONITOR,
211 					"Invalid command received in JSON\n");
212 				return -1;
213 			}
214 		} else if (!strcmp(key, "policy_type")) {
215 			char command[32];
216 			strlcpy(command, json_string_value(value), 32);
217 			if (!strcmp(command, "TIME")) {
218 				pkt->policy_to_use =
219 						RTE_POWER_POLICY_TIME;
220 			} else if (!strcmp(command, "TRAFFIC")) {
221 				pkt->policy_to_use =
222 						RTE_POWER_POLICY_TRAFFIC;
223 			} else if (!strcmp(command, "WORKLOAD")) {
224 				pkt->policy_to_use =
225 						RTE_POWER_POLICY_WORKLOAD;
226 			} else if (!strcmp(command, "BRANCH_RATIO")) {
227 				pkt->policy_to_use =
228 						RTE_POWER_POLICY_BRANCH_RATIO;
229 			} else {
230 				RTE_LOG(ERR, CHANNEL_MONITOR,
231 					"Wrong policy_type received in JSON\n");
232 				return -1;
233 			}
234 		} else if (!strcmp(key, "workload")) {
235 			char command[32];
236 			strlcpy(command, json_string_value(value), 32);
237 			if (!strcmp(command, "HIGH")) {
238 				pkt->workload = RTE_POWER_WL_HIGH;
239 			} else if (!strcmp(command, "MEDIUM")) {
240 				pkt->workload = RTE_POWER_WL_MEDIUM;
241 			} else if (!strcmp(command, "LOW")) {
242 				pkt->workload = RTE_POWER_WL_LOW;
243 			} else {
244 				RTE_LOG(ERR, CHANNEL_MONITOR,
245 					"Wrong workload received in JSON\n");
246 				return -1;
247 			}
248 		} else if (!strcmp(key, "busy_hours")) {
249 			unsigned int i;
250 			size_t size = json_array_size(value);
251 
252 			for (i = 0; i < size; i++) {
253 				int hour = (int)json_integer_value(
254 						json_array_get(value, i));
255 				pkt->timer_policy.busy_hours[i] = hour;
256 			}
257 		} else if (!strcmp(key, "quiet_hours")) {
258 			unsigned int i;
259 			size_t size = json_array_size(value);
260 
261 			for (i = 0; i < size; i++) {
262 				int hour = (int)json_integer_value(
263 						json_array_get(value, i));
264 				pkt->timer_policy.quiet_hours[i] = hour;
265 			}
266 		} else if (!strcmp(key, "mac_list")) {
267 			unsigned int i;
268 			size_t size = json_array_size(value);
269 
270 			for (i = 0; i < size; i++) {
271 				char mac[32];
272 				strlcpy(mac,
273 					json_string_value(json_array_get(value, i)),
274 					32);
275 				set_policy_mac(pkt, i, mac);
276 			}
277 			pkt->nb_mac_to_monitor = size;
278 		} else if (!strcmp(key, "avg_packet_thresh")) {
279 			pkt->traffic_policy.avg_max_packet_thresh =
280 					(uint32_t)json_integer_value(value);
281 		} else if (!strcmp(key, "max_packet_thresh")) {
282 			pkt->traffic_policy.max_max_packet_thresh =
283 					(uint32_t)json_integer_value(value);
284 		} else if (!strcmp(key, "unit")) {
285 			char unit[32];
286 			strlcpy(unit, json_string_value(value), 32);
287 			if (!strcmp(unit, "SCALE_UP")) {
288 				pkt->unit = RTE_POWER_SCALE_UP;
289 			} else if (!strcmp(unit, "SCALE_DOWN")) {
290 				pkt->unit = RTE_POWER_SCALE_DOWN;
291 			} else if (!strcmp(unit, "SCALE_MAX")) {
292 				pkt->unit = RTE_POWER_SCALE_MAX;
293 			} else if (!strcmp(unit, "SCALE_MIN")) {
294 				pkt->unit = RTE_POWER_SCALE_MIN;
295 			} else if (!strcmp(unit, "ENABLE_TURBO")) {
296 				pkt->unit = RTE_POWER_ENABLE_TURBO;
297 			} else if (!strcmp(unit, "DISABLE_TURBO")) {
298 				pkt->unit = RTE_POWER_DISABLE_TURBO;
299 			} else {
300 				RTE_LOG(ERR, CHANNEL_MONITOR,
301 					"Invalid command received in JSON\n");
302 				return -1;
303 			}
304 		} else {
305 			RTE_LOG(ERR, CHANNEL_MONITOR,
306 				"Unknown key received in JSON string: %s\n",
307 				key);
308 		}
309 
310 		resource_id = get_resource_id_from_vmname(vm_name);
311 		if (resource_id < 0) {
312 			RTE_LOG(ERR, CHANNEL_MONITOR,
313 				"Could not get resource_id from vm_name:%s\n",
314 				vm_name);
315 			return -1;
316 		}
317 		strlcpy(pkt->vm_name, vm_name, RTE_POWER_VM_MAX_NAME_SZ);
318 		pkt->resource_id = resource_id;
319 	}
320 	return 0;
321 }
322 #endif
323 
324 void channel_monitor_exit(void)
325 {
326 	run_loop = 0;
327 	rte_free(global_events_list);
328 }
329 
330 static void
331 core_share(int pNo, int z, int x, int t)
332 {
333 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
334 		if (strcmp(policies[pNo].pkt.vm_name,
335 				lvm_info[x].vm_name) != 0) {
336 			policies[pNo].core_share[z].status = 1;
337 			power_manager_scale_core_max(
338 					policies[pNo].core_share[z].pcpu);
339 		}
340 	}
341 }
342 
343 static void
344 core_share_status(int pNo)
345 {
346 
347 	int noVms = 0, noVcpus = 0, z, x, t;
348 
349 	get_all_vm(&noVms, &noVcpus);
350 
351 	/* Reset Core Share Status. */
352 	for (z = 0; z < noVcpus; z++)
353 		policies[pNo].core_share[z].status = 0;
354 
355 	/* Foreach vcpu in a policy. */
356 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
357 		/* Foreach VM on the platform. */
358 		for (x = 0; x < noVms; x++) {
359 			/* Foreach vcpu of VMs on platform. */
360 			for (t = 0; t < lvm_info[x].num_cpus; t++)
361 				core_share(pNo, z, x, t);
362 		}
363 	}
364 }
365 
366 
367 static int
368 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
369 {
370 	int ret = 0;
371 
372 	if (pol->pkt.policy_to_use == RTE_POWER_POLICY_BRANCH_RATIO) {
373 		ci->cd[pcpu].oob_enabled = 1;
374 		ret = add_core_to_monitor(pcpu);
375 		if (ret == 0)
376 			RTE_LOG(INFO, CHANNEL_MONITOR,
377 					"Monitoring pcpu %d OOB for %s\n",
378 					pcpu, pol->pkt.vm_name);
379 		else
380 			RTE_LOG(ERR, CHANNEL_MONITOR,
381 					"Error monitoring pcpu %d OOB for %s\n",
382 					pcpu, pol->pkt.vm_name);
383 
384 	} else {
385 		pol->core_share[count].pcpu = pcpu;
386 		RTE_LOG(INFO, CHANNEL_MONITOR,
387 				"Monitoring pcpu %d for %s\n",
388 				pcpu, pol->pkt.vm_name);
389 	}
390 	return ret;
391 }
392 
393 static void
394 get_pcpu_to_control(struct policy *pol)
395 {
396 
397 	/* Convert vcpu to pcpu. */
398 	struct vm_info info;
399 	int pcpu, count;
400 	struct core_info *ci;
401 
402 	ci = get_core_info();
403 
404 	RTE_LOG(DEBUG, CHANNEL_MONITOR,
405 			"Looking for pcpu for %s\n", pol->pkt.vm_name);
406 
407 	/*
408 	 * So now that we're handling virtual and physical cores, we need to
409 	 * differentiate between them when adding them to the branch monitor.
410 	 * Virtual cores need to be converted to physical cores.
411 	 */
412 	if (pol->pkt.core_type == RTE_POWER_CORE_TYPE_VIRTUAL) {
413 		/*
414 		 * If the cores in the policy are virtual, we need to map them
415 		 * to physical core. We look up the vm info and use that for
416 		 * the mapping.
417 		 */
418 		get_info_vm(pol->pkt.vm_name, &info);
419 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
420 			pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
421 			pcpu_monitor(pol, ci, pcpu, count);
422 		}
423 	} else {
424 		/*
425 		 * If the cores in the policy are physical, we just use
426 		 * those core id's directly.
427 		 */
428 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
429 			pcpu = pol->pkt.vcpu_to_control[count];
430 			pcpu_monitor(pol, ci, pcpu, count);
431 		}
432 	}
433 }
434 
435 static int
436 get_pfid(struct policy *pol)
437 {
438 
439 	int i, x, ret = 0;
440 
441 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
442 
443 		RTE_ETH_FOREACH_DEV(x) {
444 #ifdef RTE_NET_I40E
445 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
446 				(struct rte_ether_addr *)&(pol->pkt.vfid[i]));
447 #else
448 			ret = -ENOTSUP;
449 #endif
450 			if (ret != -EINVAL) {
451 				pol->port[i] = x;
452 				break;
453 			}
454 		}
455 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
456 			RTE_LOG(INFO, CHANNEL_MONITOR,
457 				"Error with Policy. MAC not found on "
458 				"attached ports ");
459 			pol->enabled = 0;
460 			return ret;
461 		}
462 		pol->pfid[i] = ret;
463 	}
464 	return 1;
465 }
466 
467 static int
468 update_policy(struct rte_power_channel_packet *pkt)
469 {
470 
471 	unsigned int updated = 0;
472 	unsigned int i;
473 
474 
475 	RTE_LOG(INFO, CHANNEL_MONITOR,
476 			"Applying policy for %s\n", pkt->vm_name);
477 
478 	for (i = 0; i < RTE_DIM(policies); i++) {
479 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
480 			/* Copy the contents of *pkt into the policy.pkt */
481 			policies[i].pkt = *pkt;
482 			get_pcpu_to_control(&policies[i]);
483 			/* Check Eth dev only for Traffic policy */
484 			if (policies[i].pkt.policy_to_use ==
485 					RTE_POWER_POLICY_TRAFFIC) {
486 				if (get_pfid(&policies[i]) < 0) {
487 					updated = 1;
488 					break;
489 				}
490 			}
491 			core_share_status(i);
492 			policies[i].enabled = 1;
493 			updated = 1;
494 		}
495 	}
496 	if (!updated) {
497 		for (i = 0; i < RTE_DIM(policies); i++) {
498 			if (policies[i].enabled == 0) {
499 				policies[i].pkt = *pkt;
500 				get_pcpu_to_control(&policies[i]);
501 				/* Check Eth dev only for Traffic policy */
502 				if (policies[i].pkt.policy_to_use ==
503 						RTE_POWER_POLICY_TRAFFIC) {
504 					if (get_pfid(&policies[i]) < 0) {
505 						updated = 1;
506 						break;
507 					}
508 				}
509 				core_share_status(i);
510 				policies[i].enabled = 1;
511 				break;
512 			}
513 		}
514 	}
515 	return 0;
516 }
517 
518 static int
519 remove_policy(struct rte_power_channel_packet *pkt __rte_unused)
520 {
521 	unsigned int i;
522 
523 	/*
524 	 * Disabling the policy is simply a case of setting
525 	 * enabled to 0
526 	 */
527 	for (i = 0; i < RTE_DIM(policies); i++) {
528 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
529 			policies[i].enabled = 0;
530 			return 0;
531 		}
532 	}
533 	return -1;
534 }
535 
536 static uint64_t
537 get_pkt_diff(struct policy *pol)
538 {
539 
540 	uint64_t vsi_pkt_count,
541 		vsi_pkt_total = 0,
542 		vsi_pkt_count_prev_total = 0;
543 	double rdtsc_curr, rdtsc_diff, diff;
544 	int x;
545 #ifdef RTE_NET_I40E
546 	struct rte_eth_stats vf_stats;
547 #endif
548 
549 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
550 
551 #ifdef RTE_NET_I40E
552 		/*Read vsi stats*/
553 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
554 			vsi_pkt_count = vf_stats.ipackets;
555 		else
556 			vsi_pkt_count = -1;
557 #else
558 		vsi_pkt_count = -1;
559 #endif
560 
561 		vsi_pkt_total += vsi_pkt_count;
562 
563 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
564 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
565 	}
566 
567 	rdtsc_curr = rte_rdtsc_precise();
568 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
569 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
570 
571 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
572 			((double)rte_get_tsc_hz() / rdtsc_diff);
573 
574 	return diff;
575 }
576 
577 static void
578 apply_traffic_profile(struct policy *pol)
579 {
580 
581 	int count;
582 	uint64_t diff = 0;
583 
584 	diff = get_pkt_diff(pol);
585 
586 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
587 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
588 			if (pol->core_share[count].status != 1)
589 				power_manager_scale_core_max(
590 						pol->core_share[count].pcpu);
591 		}
592 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
593 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
594 			if (pol->core_share[count].status != 1)
595 				power_manager_scale_core_med(
596 						pol->core_share[count].pcpu);
597 		}
598 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
599 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
600 			if (pol->core_share[count].status != 1)
601 				power_manager_scale_core_min(
602 						pol->core_share[count].pcpu);
603 		}
604 	}
605 }
606 
607 static void
608 apply_time_profile(struct policy *pol)
609 {
610 
611 	int count, x;
612 	struct timeval tv;
613 	struct tm *ptm;
614 	char time_string[40];
615 
616 	/* Obtain the time of day, and convert it to a tm struct. */
617 	gettimeofday(&tv, NULL);
618 	ptm = localtime(&tv.tv_sec);
619 	/* Format the date and time, down to a single second. */
620 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
621 
622 	for (x = 0; x < RTE_POWER_HOURS_PER_DAY; x++) {
623 
624 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
625 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
626 				if (pol->core_share[count].status != 1) {
627 					power_manager_scale_core_max(
628 						pol->core_share[count].pcpu);
629 				}
630 			}
631 			break;
632 		} else if (ptm->tm_hour ==
633 				pol->pkt.timer_policy.quiet_hours[x]) {
634 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
635 				if (pol->core_share[count].status != 1) {
636 					power_manager_scale_core_min(
637 						pol->core_share[count].pcpu);
638 			}
639 		}
640 			break;
641 		} else if (ptm->tm_hour ==
642 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
643 			apply_traffic_profile(pol);
644 			break;
645 		}
646 	}
647 }
648 
649 static void
650 apply_workload_profile(struct policy *pol)
651 {
652 
653 	int count;
654 
655 	if (pol->pkt.workload == RTE_POWER_WL_HIGH) {
656 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
657 			if (pol->core_share[count].status != 1)
658 				power_manager_scale_core_max(
659 						pol->core_share[count].pcpu);
660 		}
661 	} else if (pol->pkt.workload == RTE_POWER_WL_MEDIUM) {
662 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
663 			if (pol->core_share[count].status != 1)
664 				power_manager_scale_core_med(
665 						pol->core_share[count].pcpu);
666 		}
667 	} else if (pol->pkt.workload == RTE_POWER_WL_LOW) {
668 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
669 			if (pol->core_share[count].status != 1)
670 				power_manager_scale_core_min(
671 						pol->core_share[count].pcpu);
672 		}
673 	}
674 }
675 
676 static void
677 apply_policy(struct policy *pol)
678 {
679 
680 	struct rte_power_channel_packet *pkt = &pol->pkt;
681 
682 	/*Check policy to use*/
683 	if (pkt->policy_to_use == RTE_POWER_POLICY_TRAFFIC)
684 		apply_traffic_profile(pol);
685 	else if (pkt->policy_to_use == RTE_POWER_POLICY_TIME)
686 		apply_time_profile(pol);
687 	else if (pkt->policy_to_use == RTE_POWER_POLICY_WORKLOAD)
688 		apply_workload_profile(pol);
689 }
690 
691 static int
692 write_binary_packet(void *buffer,
693 		size_t buffer_len,
694 		struct channel_info *chan_info)
695 {
696 	int ret;
697 
698 	if (buffer_len == 0 || buffer == NULL)
699 		return -1;
700 
701 	if (chan_info->fd < 0) {
702 		RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n");
703 		return -1;
704 	}
705 
706 	while (buffer_len > 0) {
707 		ret = write(chan_info->fd, buffer, buffer_len);
708 		if (ret == -1) {
709 			if (errno == EINTR)
710 				continue;
711 			RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n",
712 					strerror(errno));
713 			return -1;
714 		}
715 		buffer = (char *)buffer + ret;
716 		buffer_len -= ret;
717 	}
718 	return 0;
719 }
720 
721 static int
722 send_freq(struct rte_power_channel_packet *pkt,
723 		struct channel_info *chan_info,
724 		bool freq_list)
725 {
726 	unsigned int vcore_id = pkt->resource_id;
727 	struct rte_power_channel_packet_freq_list channel_pkt_freq_list;
728 	struct vm_info info;
729 
730 	if (get_info_vm(pkt->vm_name, &info) != 0)
731 		return -1;
732 
733 	if (!freq_list && vcore_id >= RTE_POWER_MAX_VCPU_PER_VM)
734 		return -1;
735 
736 	if (!info.allow_query)
737 		return -1;
738 
739 	channel_pkt_freq_list.command = RTE_POWER_FREQ_LIST;
740 	channel_pkt_freq_list.num_vcpu = info.num_vcpus;
741 
742 	if (freq_list) {
743 		unsigned int i;
744 		for (i = 0; i < info.num_vcpus; i++)
745 			channel_pkt_freq_list.freq_list[i] =
746 			  power_manager_get_current_frequency(info.pcpu_map[i]);
747 	} else {
748 		channel_pkt_freq_list.freq_list[vcore_id] =
749 		  power_manager_get_current_frequency(info.pcpu_map[vcore_id]);
750 	}
751 
752 	return write_binary_packet(&channel_pkt_freq_list,
753 			sizeof(channel_pkt_freq_list),
754 			chan_info);
755 }
756 
757 static int
758 send_capabilities(struct rte_power_channel_packet *pkt,
759 		struct channel_info *chan_info,
760 		bool list_requested)
761 {
762 	unsigned int vcore_id = pkt->resource_id;
763 	struct rte_power_channel_packet_caps_list channel_pkt_caps_list;
764 	struct vm_info info;
765 	struct rte_power_core_capabilities caps;
766 	int ret;
767 
768 	if (get_info_vm(pkt->vm_name, &info) != 0)
769 		return -1;
770 
771 	if (!list_requested && vcore_id >= RTE_POWER_MAX_VCPU_PER_VM)
772 		return -1;
773 
774 	if (!info.allow_query)
775 		return -1;
776 
777 	channel_pkt_caps_list.command = RTE_POWER_CAPS_LIST;
778 	channel_pkt_caps_list.num_vcpu = info.num_vcpus;
779 
780 	if (list_requested) {
781 		unsigned int i;
782 		for (i = 0; i < info.num_vcpus; i++) {
783 			ret = rte_power_get_capabilities(info.pcpu_map[i],
784 					&caps);
785 			if (ret == 0) {
786 				channel_pkt_caps_list.turbo[i] =
787 						caps.turbo;
788 				channel_pkt_caps_list.priority[i] =
789 						caps.priority;
790 			} else
791 				return -1;
792 
793 		}
794 	} else {
795 		ret = rte_power_get_capabilities(info.pcpu_map[vcore_id],
796 				&caps);
797 		if (ret == 0) {
798 			channel_pkt_caps_list.turbo[vcore_id] =
799 					caps.turbo;
800 			channel_pkt_caps_list.priority[vcore_id] =
801 					caps.priority;
802 		} else
803 			return -1;
804 	}
805 
806 	return write_binary_packet(&channel_pkt_caps_list,
807 			sizeof(channel_pkt_caps_list),
808 			chan_info);
809 }
810 
811 static int
812 send_ack_for_received_cmd(struct rte_power_channel_packet *pkt,
813 		struct channel_info *chan_info,
814 		uint32_t command)
815 {
816 	pkt->command = command;
817 	return write_binary_packet(pkt,
818 			sizeof(*pkt),
819 			chan_info);
820 }
821 
822 static int
823 process_request(struct rte_power_channel_packet *pkt,
824 		struct channel_info *chan_info)
825 {
826 	int ret;
827 
828 	if (chan_info == NULL)
829 		return -1;
830 
831 	uint32_t channel_connected = CHANNEL_MGR_CHANNEL_CONNECTED;
832 	if (rte_atomic_compare_exchange_strong_explicit(&(chan_info->status), &channel_connected,
833 			CHANNEL_MGR_CHANNEL_PROCESSING, rte_memory_order_relaxed,
834 			rte_memory_order_relaxed) == 0)
835 		return -1;
836 
837 	if (pkt->command == RTE_POWER_CPU_POWER) {
838 		unsigned int core_num;
839 
840 		if (pkt->core_type == RTE_POWER_CORE_TYPE_VIRTUAL)
841 			core_num = get_pcpu(chan_info, pkt->resource_id);
842 		else
843 			core_num = pkt->resource_id;
844 
845 		RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
846 			core_num);
847 
848 		int scale_res;
849 		bool valid_unit = true;
850 
851 		switch (pkt->unit) {
852 		case(RTE_POWER_SCALE_MIN):
853 			scale_res = power_manager_scale_core_min(core_num);
854 			break;
855 		case(RTE_POWER_SCALE_MAX):
856 			scale_res = power_manager_scale_core_max(core_num);
857 			break;
858 		case(RTE_POWER_SCALE_DOWN):
859 			scale_res = power_manager_scale_core_down(core_num);
860 			break;
861 		case(RTE_POWER_SCALE_UP):
862 			scale_res = power_manager_scale_core_up(core_num);
863 			break;
864 		case(RTE_POWER_ENABLE_TURBO):
865 			scale_res = power_manager_enable_turbo_core(core_num);
866 			break;
867 		case(RTE_POWER_DISABLE_TURBO):
868 			scale_res = power_manager_disable_turbo_core(core_num);
869 			break;
870 		default:
871 			valid_unit = false;
872 			break;
873 		}
874 
875 		if (valid_unit) {
876 			ret = send_ack_for_received_cmd(pkt,
877 					chan_info,
878 					scale_res >= 0 ?
879 						RTE_POWER_CMD_ACK :
880 						RTE_POWER_CMD_NACK);
881 			if (ret < 0)
882 				RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
883 		} else
884 			RTE_LOG(ERR, CHANNEL_MONITOR, "Unexpected unit type.\n");
885 
886 	}
887 
888 	if (pkt->command == RTE_POWER_PKT_POLICY) {
889 		RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
890 				pkt->vm_name);
891 		int ret = send_ack_for_received_cmd(pkt,
892 				chan_info,
893 				RTE_POWER_CMD_ACK);
894 		if (ret < 0)
895 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
896 		update_policy(pkt);
897 		policy_is_set = 1;
898 	}
899 
900 	if (pkt->command == RTE_POWER_PKT_POLICY_REMOVE) {
901 		ret = remove_policy(pkt);
902 		if (ret == 0)
903 			RTE_LOG(INFO, CHANNEL_MONITOR,
904 				 "Removed policy %s\n", pkt->vm_name);
905 		else
906 			RTE_LOG(INFO, CHANNEL_MONITOR,
907 				 "Policy %s does not exist\n", pkt->vm_name);
908 	}
909 
910 	if (pkt->command == RTE_POWER_QUERY_FREQ_LIST ||
911 		pkt->command == RTE_POWER_QUERY_FREQ) {
912 
913 		RTE_LOG(INFO, CHANNEL_MONITOR,
914 			"Frequency for %s requested.\n", pkt->vm_name);
915 		int ret = send_freq(pkt,
916 				chan_info,
917 				pkt->command == RTE_POWER_QUERY_FREQ_LIST);
918 		if (ret < 0)
919 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during frequency sending.\n");
920 	}
921 
922 	if (pkt->command == RTE_POWER_QUERY_CAPS_LIST ||
923 		pkt->command == RTE_POWER_QUERY_CAPS) {
924 
925 		RTE_LOG(INFO, CHANNEL_MONITOR,
926 			"Capabilities for %s requested.\n", pkt->vm_name);
927 		int ret = send_capabilities(pkt,
928 				chan_info,
929 				pkt->command == RTE_POWER_QUERY_CAPS_LIST);
930 		if (ret < 0)
931 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending capabilities.\n");
932 	}
933 
934 	/*
935 	 * Return is not checked as channel status may have been set to DISABLED
936 	 * from management thread
937 	 */
938 	uint32_t channel_processing = CHANNEL_MGR_CHANNEL_PROCESSING;
939 	rte_atomic_compare_exchange_strong_explicit(&(chan_info->status), &channel_processing,
940 		CHANNEL_MGR_CHANNEL_CONNECTED, rte_memory_order_relaxed, rte_memory_order_relaxed);
941 	return 0;
942 
943 }
944 
945 int
946 add_channel_to_monitor(struct channel_info **chan_info)
947 {
948 	struct channel_info *info = *chan_info;
949 	struct epoll_event event;
950 
951 	event.events = EPOLLIN;
952 	event.data.ptr = info;
953 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
954 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
955 				"to epoll\n", info->channel_path);
956 		return -1;
957 	}
958 	RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
959 			"to monitor\n", info->channel_path);
960 	return 0;
961 }
962 
963 int
964 remove_channel_from_monitor(struct channel_info *chan_info)
965 {
966 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
967 			chan_info->fd, NULL) < 0) {
968 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
969 				"from epoll\n", chan_info->channel_path);
970 		return -1;
971 	}
972 	return 0;
973 }
974 
975 int
976 channel_monitor_init(void)
977 {
978 	global_event_fd = epoll_create1(0);
979 	if (global_event_fd == 0) {
980 		RTE_LOG(ERR, CHANNEL_MONITOR,
981 				"Error creating epoll context with error %s\n",
982 				strerror(errno));
983 		return -1;
984 	}
985 	global_events_list = rte_malloc("epoll_events",
986 			sizeof(*global_events_list)
987 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
988 	if (global_events_list == NULL) {
989 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
990 				"epoll events\n");
991 		return -1;
992 	}
993 	return 0;
994 }
995 
996 static void
997 read_binary_packet(struct channel_info *chan_info)
998 {
999 	struct rte_power_channel_packet pkt;
1000 	void *buffer = &pkt;
1001 	int buffer_len = sizeof(pkt);
1002 	int n_bytes, err = 0;
1003 
1004 	while (buffer_len > 0) {
1005 		n_bytes = read(chan_info->fd,
1006 				buffer, buffer_len);
1007 		if (n_bytes == buffer_len)
1008 			break;
1009 		if (n_bytes < 0) {
1010 			err = errno;
1011 			RTE_LOG(DEBUG, CHANNEL_MONITOR,
1012 				"Received error on "
1013 				"channel '%s' read: %s\n",
1014 				chan_info->channel_path,
1015 				strerror(err));
1016 			remove_channel(&chan_info);
1017 			break;
1018 		}
1019 		buffer = (char *)buffer + n_bytes;
1020 		buffer_len -= n_bytes;
1021 	}
1022 	if (!err)
1023 		process_request(&pkt, chan_info);
1024 }
1025 
1026 #ifdef USE_JANSSON
1027 static void
1028 read_json_packet(struct channel_info *chan_info)
1029 {
1030 	struct rte_power_channel_packet pkt;
1031 	int n_bytes, ret;
1032 	json_t *root;
1033 	json_error_t error;
1034 	const char *resource_name;
1035 	char *start, *end;
1036 	uint32_t n;
1037 
1038 
1039 	/* read opening brace to closing brace */
1040 	do {
1041 		int idx = 0;
1042 		int indent = 0;
1043 		do {
1044 			n_bytes = read(chan_info->fd, &json_data[idx], 1);
1045 			if (n_bytes == 0)
1046 				break;
1047 			if (json_data[idx] == '{')
1048 				indent++;
1049 			if (json_data[idx] == '}')
1050 				indent--;
1051 			if ((indent > 0) || (idx > 0))
1052 				idx++;
1053 			if (indent <= 0)
1054 				json_data[idx] = 0;
1055 			if (idx >= MAX_JSON_STRING_LEN-1)
1056 				break;
1057 		} while (indent > 0);
1058 
1059 		json_data[idx] = '\0';
1060 
1061 		if (strlen(json_data) == 0)
1062 			continue;
1063 
1064 		printf("got [%s]\n", json_data);
1065 
1066 		root = json_loads(json_data, 0, &error);
1067 
1068 		if (root) {
1069 			resource_name = get_resource_name_from_chn_path(
1070 				chan_info->channel_path);
1071 			/*
1072 			 * Because our data is now in the json
1073 			 * object, we can overwrite the pkt
1074 			 * with a rte_power_channel_packet struct, using
1075 			 * parse_json_to_pkt()
1076 			 */
1077 			ret = parse_json_to_pkt(root, &pkt, resource_name);
1078 			json_decref(root);
1079 			if (ret) {
1080 				RTE_LOG(ERR, CHANNEL_MONITOR,
1081 					"Error validating JSON profile data\n");
1082 				break;
1083 			}
1084 			start = strstr(pkt.vm_name,
1085 					CHANNEL_MGR_FIFO_PATTERN_NAME);
1086 			if (start != NULL) {
1087 				/* move past pattern to start of fifo id */
1088 				start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
1089 
1090 				end = start;
1091 				n = (uint32_t)strtoul(start, &end, 10);
1092 
1093 				if (end[0] == '\0') {
1094 					/* Add core id to core list */
1095 					pkt.num_vcpu = 1;
1096 					pkt.vcpu_to_control[0] = n;
1097 					process_request(&pkt, chan_info);
1098 				} else {
1099 					RTE_LOG(ERR, CHANNEL_MONITOR,
1100 						"Cannot extract core id from fifo name\n");
1101 				}
1102 			} else {
1103 				process_request(&pkt, chan_info);
1104 			}
1105 		} else {
1106 			RTE_LOG(ERR, CHANNEL_MONITOR,
1107 					"JSON error on line %d: %s\n",
1108 					error.line, error.text);
1109 		}
1110 	} while (n_bytes > 0);
1111 }
1112 #endif
1113 
1114 void
1115 run_channel_monitor(void)
1116 {
1117 	while (run_loop) {
1118 		int n_events, i;
1119 
1120 		n_events = epoll_wait(global_event_fd, global_events_list,
1121 				MAX_EVENTS, 1);
1122 		if (!run_loop)
1123 			break;
1124 		for (i = 0; i < n_events; i++) {
1125 			struct channel_info *chan_info = (struct channel_info *)
1126 					global_events_list[i].data.ptr;
1127 			if ((global_events_list[i].events & EPOLLERR) ||
1128 				(global_events_list[i].events & EPOLLHUP)) {
1129 				RTE_LOG(INFO, CHANNEL_MONITOR,
1130 						"Remote closed connection for "
1131 						"channel '%s'\n",
1132 						chan_info->channel_path);
1133 				remove_channel(&chan_info);
1134 				continue;
1135 			}
1136 			if (global_events_list[i].events & EPOLLIN) {
1137 
1138 				switch (chan_info->type) {
1139 				case CHANNEL_TYPE_BINARY:
1140 					read_binary_packet(chan_info);
1141 					break;
1142 #ifdef USE_JANSSON
1143 				case CHANNEL_TYPE_JSON:
1144 					read_json_packet(chan_info);
1145 					break;
1146 #endif
1147 				default:
1148 					break;
1149 				}
1150 			}
1151 		}
1152 		rte_delay_us(time_period_ms*1000);
1153 		if (policy_is_set) {
1154 			unsigned int j;
1155 
1156 			for (j = 0; j < RTE_DIM(policies); j++) {
1157 				if (policies[j].enabled == 1)
1158 					apply_policy(&policies[j]);
1159 			}
1160 		}
1161 	}
1162 }
1163