xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision daa02b5cddbb8e11b31d41e2bf7bb1ae64dcae2f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_cycles.h>
29 #include <rte_ethdev.h>
30 #ifdef RTE_NET_I40E
31 #include <rte_pmd_i40e.h>
32 #endif
33 #include <rte_power.h>
34 
35 #include <libvirt/libvirt.h>
36 #include "channel_monitor.h"
37 #include "channel_manager.h"
38 #include "power_manager.h"
39 #include "oob_monitor.h"
40 
41 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
42 
43 #define MAX_EVENTS 256
44 
45 uint64_t vsi_pkt_count_prev[384];
46 uint64_t rdtsc_prev[384];
47 #define MAX_JSON_STRING_LEN 1024
48 char json_data[MAX_JSON_STRING_LEN];
49 
50 double time_period_ms = 1;
51 static volatile unsigned run_loop = 1;
52 static int global_event_fd;
53 static unsigned int policy_is_set;
54 static struct epoll_event *global_events_list;
55 static struct policy policies[RTE_MAX_LCORE];
56 
57 #ifdef USE_JANSSON
58 
59 union PFID {
60 	struct rte_ether_addr addr;
61 	uint64_t pfid;
62 };
63 
64 static int
65 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
66 {
67 	int i;
68 	char *end;
69 	unsigned long o[RTE_ETHER_ADDR_LEN];
70 
71 	i = 0;
72 	do {
73 		errno = 0;
74 		o[i] = strtoul(a, &end, 16);
75 		if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
76 			return -1;
77 		a = end + 1;
78 	} while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
79 
80 	/* Junk at the end of line */
81 	if (end[0] != 0)
82 		return -1;
83 
84 	/* Support the format XX:XX:XX:XX:XX:XX */
85 	if (i == RTE_ETHER_ADDR_LEN) {
86 		while (i-- != 0) {
87 			if (o[i] > UINT8_MAX)
88 				return -1;
89 			ether_addr->addr_bytes[i] = (uint8_t)o[i];
90 		}
91 	/* Support the format XXXX:XXXX:XXXX */
92 	} else if (i == RTE_ETHER_ADDR_LEN / 2) {
93 		while (i-- != 0) {
94 			if (o[i] > UINT16_MAX)
95 				return -1;
96 			ether_addr->addr_bytes[i * 2] =
97 					(uint8_t)(o[i] >> 8);
98 			ether_addr->addr_bytes[i * 2 + 1] =
99 					(uint8_t)(o[i] & 0xff);
100 		}
101 	/* unknown format */
102 	} else
103 		return -1;
104 
105 	return 0;
106 }
107 
108 static int
109 set_policy_mac(struct rte_power_channel_packet *pkt, int idx, char *mac)
110 {
111 	union PFID pfid;
112 	int ret;
113 
114 	/* Use port MAC address as the vfid */
115 	ret = str_to_ether_addr(mac, &pfid.addr);
116 
117 	if (ret != 0) {
118 		RTE_LOG(ERR, CHANNEL_MONITOR,
119 			"Invalid mac address received in JSON\n");
120 		pkt->vfid[idx] = 0;
121 		return -1;
122 	}
123 
124 	printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
125 			"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
126 			RTE_ETHER_ADDR_BYTES(&pfid.addr));
127 
128 	pkt->vfid[idx] = pfid.pfid;
129 	return 0;
130 }
131 
132 static char*
133 get_resource_name_from_chn_path(const char *channel_path)
134 {
135 	char *substr = NULL;
136 
137 	substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
138 
139 	return substr;
140 }
141 
142 static int
143 get_resource_id_from_vmname(const char *vm_name)
144 {
145 	int result = -1;
146 	int off = 0;
147 
148 	if (vm_name == NULL)
149 		return -1;
150 
151 	while (vm_name[off] != '\0') {
152 		if (isdigit(vm_name[off]))
153 			break;
154 		off++;
155 	}
156 	result = atoi(&vm_name[off]);
157 	if ((result == 0) && (vm_name[off] != '0'))
158 		return -1;
159 
160 	return result;
161 }
162 
163 static int
164 parse_json_to_pkt(json_t *element, struct rte_power_channel_packet *pkt,
165 					const char *vm_name)
166 {
167 	const char *key;
168 	json_t *value;
169 	int ret;
170 	int resource_id;
171 
172 	memset(pkt, 0, sizeof(*pkt));
173 
174 	pkt->nb_mac_to_monitor = 0;
175 	pkt->t_boost_status.tbEnabled = false;
176 	pkt->workload = RTE_POWER_WL_LOW;
177 	pkt->policy_to_use = RTE_POWER_POLICY_TIME;
178 	pkt->command = RTE_POWER_PKT_POLICY;
179 	pkt->core_type = RTE_POWER_CORE_TYPE_PHYSICAL;
180 
181 	if (vm_name == NULL) {
182 		RTE_LOG(ERR, CHANNEL_MONITOR,
183 			"vm_name is NULL, request rejected !\n");
184 		return -1;
185 	}
186 
187 	json_object_foreach(element, key, value) {
188 		if (!strcmp(key, "policy")) {
189 			/* Recurse in to get the contents of profile */
190 			ret = parse_json_to_pkt(value, pkt, vm_name);
191 			if (ret)
192 				return ret;
193 		} else if (!strcmp(key, "instruction")) {
194 			/* Recurse in to get the contents of instruction */
195 			ret = parse_json_to_pkt(value, pkt, vm_name);
196 			if (ret)
197 				return ret;
198 		} else if (!strcmp(key, "command")) {
199 			char command[32];
200 			strlcpy(command, json_string_value(value), 32);
201 			if (!strcmp(command, "power")) {
202 				pkt->command = RTE_POWER_CPU_POWER;
203 			} else if (!strcmp(command, "create")) {
204 				pkt->command = RTE_POWER_PKT_POLICY;
205 			} else if (!strcmp(command, "destroy")) {
206 				pkt->command = RTE_POWER_PKT_POLICY_REMOVE;
207 			} else {
208 				RTE_LOG(ERR, CHANNEL_MONITOR,
209 					"Invalid command received in JSON\n");
210 				return -1;
211 			}
212 		} else if (!strcmp(key, "policy_type")) {
213 			char command[32];
214 			strlcpy(command, json_string_value(value), 32);
215 			if (!strcmp(command, "TIME")) {
216 				pkt->policy_to_use =
217 						RTE_POWER_POLICY_TIME;
218 			} else if (!strcmp(command, "TRAFFIC")) {
219 				pkt->policy_to_use =
220 						RTE_POWER_POLICY_TRAFFIC;
221 			} else if (!strcmp(command, "WORKLOAD")) {
222 				pkt->policy_to_use =
223 						RTE_POWER_POLICY_WORKLOAD;
224 			} else if (!strcmp(command, "BRANCH_RATIO")) {
225 				pkt->policy_to_use =
226 						RTE_POWER_POLICY_BRANCH_RATIO;
227 			} else {
228 				RTE_LOG(ERR, CHANNEL_MONITOR,
229 					"Wrong policy_type received in JSON\n");
230 				return -1;
231 			}
232 		} else if (!strcmp(key, "workload")) {
233 			char command[32];
234 			strlcpy(command, json_string_value(value), 32);
235 			if (!strcmp(command, "HIGH")) {
236 				pkt->workload = RTE_POWER_WL_HIGH;
237 			} else if (!strcmp(command, "MEDIUM")) {
238 				pkt->workload = RTE_POWER_WL_MEDIUM;
239 			} else if (!strcmp(command, "LOW")) {
240 				pkt->workload = RTE_POWER_WL_LOW;
241 			} else {
242 				RTE_LOG(ERR, CHANNEL_MONITOR,
243 					"Wrong workload received in JSON\n");
244 				return -1;
245 			}
246 		} else if (!strcmp(key, "busy_hours")) {
247 			unsigned int i;
248 			size_t size = json_array_size(value);
249 
250 			for (i = 0; i < size; i++) {
251 				int hour = (int)json_integer_value(
252 						json_array_get(value, i));
253 				pkt->timer_policy.busy_hours[i] = hour;
254 			}
255 		} else if (!strcmp(key, "quiet_hours")) {
256 			unsigned int i;
257 			size_t size = json_array_size(value);
258 
259 			for (i = 0; i < size; i++) {
260 				int hour = (int)json_integer_value(
261 						json_array_get(value, i));
262 				pkt->timer_policy.quiet_hours[i] = hour;
263 			}
264 		} else if (!strcmp(key, "mac_list")) {
265 			unsigned int i;
266 			size_t size = json_array_size(value);
267 
268 			for (i = 0; i < size; i++) {
269 				char mac[32];
270 				strlcpy(mac,
271 					json_string_value(json_array_get(value, i)),
272 					32);
273 				set_policy_mac(pkt, i, mac);
274 			}
275 			pkt->nb_mac_to_monitor = size;
276 		} else if (!strcmp(key, "avg_packet_thresh")) {
277 			pkt->traffic_policy.avg_max_packet_thresh =
278 					(uint32_t)json_integer_value(value);
279 		} else if (!strcmp(key, "max_packet_thresh")) {
280 			pkt->traffic_policy.max_max_packet_thresh =
281 					(uint32_t)json_integer_value(value);
282 		} else if (!strcmp(key, "unit")) {
283 			char unit[32];
284 			strlcpy(unit, json_string_value(value), 32);
285 			if (!strcmp(unit, "SCALE_UP")) {
286 				pkt->unit = RTE_POWER_SCALE_UP;
287 			} else if (!strcmp(unit, "SCALE_DOWN")) {
288 				pkt->unit = RTE_POWER_SCALE_DOWN;
289 			} else if (!strcmp(unit, "SCALE_MAX")) {
290 				pkt->unit = RTE_POWER_SCALE_MAX;
291 			} else if (!strcmp(unit, "SCALE_MIN")) {
292 				pkt->unit = RTE_POWER_SCALE_MIN;
293 			} else if (!strcmp(unit, "ENABLE_TURBO")) {
294 				pkt->unit = RTE_POWER_ENABLE_TURBO;
295 			} else if (!strcmp(unit, "DISABLE_TURBO")) {
296 				pkt->unit = RTE_POWER_DISABLE_TURBO;
297 			} else {
298 				RTE_LOG(ERR, CHANNEL_MONITOR,
299 					"Invalid command received in JSON\n");
300 				return -1;
301 			}
302 		} else {
303 			RTE_LOG(ERR, CHANNEL_MONITOR,
304 				"Unknown key received in JSON string: %s\n",
305 				key);
306 		}
307 
308 		resource_id = get_resource_id_from_vmname(vm_name);
309 		if (resource_id < 0) {
310 			RTE_LOG(ERR, CHANNEL_MONITOR,
311 				"Could not get resource_id from vm_name:%s\n",
312 				vm_name);
313 			return -1;
314 		}
315 		strlcpy(pkt->vm_name, vm_name, RTE_POWER_VM_MAX_NAME_SZ);
316 		pkt->resource_id = resource_id;
317 	}
318 	return 0;
319 }
320 #endif
321 
322 void channel_monitor_exit(void)
323 {
324 	run_loop = 0;
325 	rte_free(global_events_list);
326 }
327 
328 static void
329 core_share(int pNo, int z, int x, int t)
330 {
331 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
332 		if (strcmp(policies[pNo].pkt.vm_name,
333 				lvm_info[x].vm_name) != 0) {
334 			policies[pNo].core_share[z].status = 1;
335 			power_manager_scale_core_max(
336 					policies[pNo].core_share[z].pcpu);
337 		}
338 	}
339 }
340 
341 static void
342 core_share_status(int pNo)
343 {
344 
345 	int noVms = 0, noVcpus = 0, z, x, t;
346 
347 	get_all_vm(&noVms, &noVcpus);
348 
349 	/* Reset Core Share Status. */
350 	for (z = 0; z < noVcpus; z++)
351 		policies[pNo].core_share[z].status = 0;
352 
353 	/* Foreach vcpu in a policy. */
354 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
355 		/* Foreach VM on the platform. */
356 		for (x = 0; x < noVms; x++) {
357 			/* Foreach vcpu of VMs on platform. */
358 			for (t = 0; t < lvm_info[x].num_cpus; t++)
359 				core_share(pNo, z, x, t);
360 		}
361 	}
362 }
363 
364 
365 static int
366 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
367 {
368 	int ret = 0;
369 
370 	if (pol->pkt.policy_to_use == RTE_POWER_POLICY_BRANCH_RATIO) {
371 		ci->cd[pcpu].oob_enabled = 1;
372 		ret = add_core_to_monitor(pcpu);
373 		if (ret == 0)
374 			RTE_LOG(INFO, CHANNEL_MONITOR,
375 					"Monitoring pcpu %d OOB for %s\n",
376 					pcpu, pol->pkt.vm_name);
377 		else
378 			RTE_LOG(ERR, CHANNEL_MONITOR,
379 					"Error monitoring pcpu %d OOB for %s\n",
380 					pcpu, pol->pkt.vm_name);
381 
382 	} else {
383 		pol->core_share[count].pcpu = pcpu;
384 		RTE_LOG(INFO, CHANNEL_MONITOR,
385 				"Monitoring pcpu %d for %s\n",
386 				pcpu, pol->pkt.vm_name);
387 	}
388 	return ret;
389 }
390 
391 static void
392 get_pcpu_to_control(struct policy *pol)
393 {
394 
395 	/* Convert vcpu to pcpu. */
396 	struct vm_info info;
397 	int pcpu, count;
398 	struct core_info *ci;
399 
400 	ci = get_core_info();
401 
402 	RTE_LOG(DEBUG, CHANNEL_MONITOR,
403 			"Looking for pcpu for %s\n", pol->pkt.vm_name);
404 
405 	/*
406 	 * So now that we're handling virtual and physical cores, we need to
407 	 * differenciate between them when adding them to the branch monitor.
408 	 * Virtual cores need to be converted to physical cores.
409 	 */
410 	if (pol->pkt.core_type == RTE_POWER_CORE_TYPE_VIRTUAL) {
411 		/*
412 		 * If the cores in the policy are virtual, we need to map them
413 		 * to physical core. We look up the vm info and use that for
414 		 * the mapping.
415 		 */
416 		get_info_vm(pol->pkt.vm_name, &info);
417 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
418 			pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
419 			pcpu_monitor(pol, ci, pcpu, count);
420 		}
421 	} else {
422 		/*
423 		 * If the cores in the policy are physical, we just use
424 		 * those core id's directly.
425 		 */
426 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
427 			pcpu = pol->pkt.vcpu_to_control[count];
428 			pcpu_monitor(pol, ci, pcpu, count);
429 		}
430 	}
431 }
432 
433 static int
434 get_pfid(struct policy *pol)
435 {
436 
437 	int i, x, ret = 0;
438 
439 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
440 
441 		RTE_ETH_FOREACH_DEV(x) {
442 #ifdef RTE_NET_I40E
443 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
444 				(struct rte_ether_addr *)&(pol->pkt.vfid[i]));
445 #else
446 			ret = -ENOTSUP;
447 #endif
448 			if (ret != -EINVAL) {
449 				pol->port[i] = x;
450 				break;
451 			}
452 		}
453 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
454 			RTE_LOG(INFO, CHANNEL_MONITOR,
455 				"Error with Policy. MAC not found on "
456 				"attached ports ");
457 			pol->enabled = 0;
458 			return ret;
459 		}
460 		pol->pfid[i] = ret;
461 	}
462 	return 1;
463 }
464 
465 static int
466 update_policy(struct rte_power_channel_packet *pkt)
467 {
468 
469 	unsigned int updated = 0;
470 	unsigned int i;
471 
472 
473 	RTE_LOG(INFO, CHANNEL_MONITOR,
474 			"Applying policy for %s\n", pkt->vm_name);
475 
476 	for (i = 0; i < RTE_DIM(policies); i++) {
477 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
478 			/* Copy the contents of *pkt into the policy.pkt */
479 			policies[i].pkt = *pkt;
480 			get_pcpu_to_control(&policies[i]);
481 			/* Check Eth dev only for Traffic policy */
482 			if (policies[i].pkt.policy_to_use ==
483 					RTE_POWER_POLICY_TRAFFIC) {
484 				if (get_pfid(&policies[i]) < 0) {
485 					updated = 1;
486 					break;
487 				}
488 			}
489 			core_share_status(i);
490 			policies[i].enabled = 1;
491 			updated = 1;
492 		}
493 	}
494 	if (!updated) {
495 		for (i = 0; i < RTE_DIM(policies); i++) {
496 			if (policies[i].enabled == 0) {
497 				policies[i].pkt = *pkt;
498 				get_pcpu_to_control(&policies[i]);
499 				/* Check Eth dev only for Traffic policy */
500 				if (policies[i].pkt.policy_to_use ==
501 						RTE_POWER_POLICY_TRAFFIC) {
502 					if (get_pfid(&policies[i]) < 0) {
503 						updated = 1;
504 						break;
505 					}
506 				}
507 				core_share_status(i);
508 				policies[i].enabled = 1;
509 				break;
510 			}
511 		}
512 	}
513 	return 0;
514 }
515 
516 static int
517 remove_policy(struct rte_power_channel_packet *pkt __rte_unused)
518 {
519 	unsigned int i;
520 
521 	/*
522 	 * Disabling the policy is simply a case of setting
523 	 * enabled to 0
524 	 */
525 	for (i = 0; i < RTE_DIM(policies); i++) {
526 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
527 			policies[i].enabled = 0;
528 			return 0;
529 		}
530 	}
531 	return -1;
532 }
533 
534 static uint64_t
535 get_pkt_diff(struct policy *pol)
536 {
537 
538 	uint64_t vsi_pkt_count,
539 		vsi_pkt_total = 0,
540 		vsi_pkt_count_prev_total = 0;
541 	double rdtsc_curr, rdtsc_diff, diff;
542 	int x;
543 #ifdef RTE_NET_I40E
544 	struct rte_eth_stats vf_stats;
545 #endif
546 
547 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
548 
549 #ifdef RTE_NET_I40E
550 		/*Read vsi stats*/
551 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
552 			vsi_pkt_count = vf_stats.ipackets;
553 		else
554 			vsi_pkt_count = -1;
555 #else
556 		vsi_pkt_count = -1;
557 #endif
558 
559 		vsi_pkt_total += vsi_pkt_count;
560 
561 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
562 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
563 	}
564 
565 	rdtsc_curr = rte_rdtsc_precise();
566 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
567 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
568 
569 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
570 			((double)rte_get_tsc_hz() / rdtsc_diff);
571 
572 	return diff;
573 }
574 
575 static void
576 apply_traffic_profile(struct policy *pol)
577 {
578 
579 	int count;
580 	uint64_t diff = 0;
581 
582 	diff = get_pkt_diff(pol);
583 
584 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
585 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
586 			if (pol->core_share[count].status != 1)
587 				power_manager_scale_core_max(
588 						pol->core_share[count].pcpu);
589 		}
590 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
591 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
592 			if (pol->core_share[count].status != 1)
593 				power_manager_scale_core_med(
594 						pol->core_share[count].pcpu);
595 		}
596 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
597 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
598 			if (pol->core_share[count].status != 1)
599 				power_manager_scale_core_min(
600 						pol->core_share[count].pcpu);
601 		}
602 	}
603 }
604 
605 static void
606 apply_time_profile(struct policy *pol)
607 {
608 
609 	int count, x;
610 	struct timeval tv;
611 	struct tm *ptm;
612 	char time_string[40];
613 
614 	/* Obtain the time of day, and convert it to a tm struct. */
615 	gettimeofday(&tv, NULL);
616 	ptm = localtime(&tv.tv_sec);
617 	/* Format the date and time, down to a single second. */
618 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
619 
620 	for (x = 0; x < RTE_POWER_HOURS_PER_DAY; x++) {
621 
622 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
623 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
624 				if (pol->core_share[count].status != 1) {
625 					power_manager_scale_core_max(
626 						pol->core_share[count].pcpu);
627 				}
628 			}
629 			break;
630 		} else if (ptm->tm_hour ==
631 				pol->pkt.timer_policy.quiet_hours[x]) {
632 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
633 				if (pol->core_share[count].status != 1) {
634 					power_manager_scale_core_min(
635 						pol->core_share[count].pcpu);
636 			}
637 		}
638 			break;
639 		} else if (ptm->tm_hour ==
640 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
641 			apply_traffic_profile(pol);
642 			break;
643 		}
644 	}
645 }
646 
647 static void
648 apply_workload_profile(struct policy *pol)
649 {
650 
651 	int count;
652 
653 	if (pol->pkt.workload == RTE_POWER_WL_HIGH) {
654 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
655 			if (pol->core_share[count].status != 1)
656 				power_manager_scale_core_max(
657 						pol->core_share[count].pcpu);
658 		}
659 	} else if (pol->pkt.workload == RTE_POWER_WL_MEDIUM) {
660 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
661 			if (pol->core_share[count].status != 1)
662 				power_manager_scale_core_med(
663 						pol->core_share[count].pcpu);
664 		}
665 	} else if (pol->pkt.workload == RTE_POWER_WL_LOW) {
666 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
667 			if (pol->core_share[count].status != 1)
668 				power_manager_scale_core_min(
669 						pol->core_share[count].pcpu);
670 		}
671 	}
672 }
673 
674 static void
675 apply_policy(struct policy *pol)
676 {
677 
678 	struct rte_power_channel_packet *pkt = &pol->pkt;
679 
680 	/*Check policy to use*/
681 	if (pkt->policy_to_use == RTE_POWER_POLICY_TRAFFIC)
682 		apply_traffic_profile(pol);
683 	else if (pkt->policy_to_use == RTE_POWER_POLICY_TIME)
684 		apply_time_profile(pol);
685 	else if (pkt->policy_to_use == RTE_POWER_POLICY_WORKLOAD)
686 		apply_workload_profile(pol);
687 }
688 
689 static int
690 write_binary_packet(void *buffer,
691 		size_t buffer_len,
692 		struct channel_info *chan_info)
693 {
694 	int ret;
695 
696 	if (buffer_len == 0 || buffer == NULL)
697 		return -1;
698 
699 	if (chan_info->fd < 0) {
700 		RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n");
701 		return -1;
702 	}
703 
704 	while (buffer_len > 0) {
705 		ret = write(chan_info->fd, buffer, buffer_len);
706 		if (ret == -1) {
707 			if (errno == EINTR)
708 				continue;
709 			RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n",
710 					strerror(errno));
711 			return -1;
712 		}
713 		buffer = (char *)buffer + ret;
714 		buffer_len -= ret;
715 	}
716 	return 0;
717 }
718 
719 static int
720 send_freq(struct rte_power_channel_packet *pkt,
721 		struct channel_info *chan_info,
722 		bool freq_list)
723 {
724 	unsigned int vcore_id = pkt->resource_id;
725 	struct rte_power_channel_packet_freq_list channel_pkt_freq_list;
726 	struct vm_info info;
727 
728 	if (get_info_vm(pkt->vm_name, &info) != 0)
729 		return -1;
730 
731 	if (!freq_list && vcore_id >= RTE_POWER_MAX_VCPU_PER_VM)
732 		return -1;
733 
734 	if (!info.allow_query)
735 		return -1;
736 
737 	channel_pkt_freq_list.command = RTE_POWER_FREQ_LIST;
738 	channel_pkt_freq_list.num_vcpu = info.num_vcpus;
739 
740 	if (freq_list) {
741 		unsigned int i;
742 		for (i = 0; i < info.num_vcpus; i++)
743 			channel_pkt_freq_list.freq_list[i] =
744 			  power_manager_get_current_frequency(info.pcpu_map[i]);
745 	} else {
746 		channel_pkt_freq_list.freq_list[vcore_id] =
747 		  power_manager_get_current_frequency(info.pcpu_map[vcore_id]);
748 	}
749 
750 	return write_binary_packet(&channel_pkt_freq_list,
751 			sizeof(channel_pkt_freq_list),
752 			chan_info);
753 }
754 
755 static int
756 send_capabilities(struct rte_power_channel_packet *pkt,
757 		struct channel_info *chan_info,
758 		bool list_requested)
759 {
760 	unsigned int vcore_id = pkt->resource_id;
761 	struct rte_power_channel_packet_caps_list channel_pkt_caps_list;
762 	struct vm_info info;
763 	struct rte_power_core_capabilities caps;
764 	int ret;
765 
766 	if (get_info_vm(pkt->vm_name, &info) != 0)
767 		return -1;
768 
769 	if (!list_requested && vcore_id >= RTE_POWER_MAX_VCPU_PER_VM)
770 		return -1;
771 
772 	if (!info.allow_query)
773 		return -1;
774 
775 	channel_pkt_caps_list.command = RTE_POWER_CAPS_LIST;
776 	channel_pkt_caps_list.num_vcpu = info.num_vcpus;
777 
778 	if (list_requested) {
779 		unsigned int i;
780 		for (i = 0; i < info.num_vcpus; i++) {
781 			ret = rte_power_get_capabilities(info.pcpu_map[i],
782 					&caps);
783 			if (ret == 0) {
784 				channel_pkt_caps_list.turbo[i] =
785 						caps.turbo;
786 				channel_pkt_caps_list.priority[i] =
787 						caps.priority;
788 			} else
789 				return -1;
790 
791 		}
792 	} else {
793 		ret = rte_power_get_capabilities(info.pcpu_map[vcore_id],
794 				&caps);
795 		if (ret == 0) {
796 			channel_pkt_caps_list.turbo[vcore_id] =
797 					caps.turbo;
798 			channel_pkt_caps_list.priority[vcore_id] =
799 					caps.priority;
800 		} else
801 			return -1;
802 	}
803 
804 	return write_binary_packet(&channel_pkt_caps_list,
805 			sizeof(channel_pkt_caps_list),
806 			chan_info);
807 }
808 
809 static int
810 send_ack_for_received_cmd(struct rte_power_channel_packet *pkt,
811 		struct channel_info *chan_info,
812 		uint32_t command)
813 {
814 	pkt->command = command;
815 	return write_binary_packet(pkt,
816 			sizeof(*pkt),
817 			chan_info);
818 }
819 
820 static int
821 process_request(struct rte_power_channel_packet *pkt,
822 		struct channel_info *chan_info)
823 {
824 	int ret;
825 
826 	if (chan_info == NULL)
827 		return -1;
828 
829 	uint32_t channel_connected = CHANNEL_MGR_CHANNEL_CONNECTED;
830 	if (__atomic_compare_exchange_n(&(chan_info->status), &channel_connected,
831 		CHANNEL_MGR_CHANNEL_PROCESSING, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED) == 0)
832 		return -1;
833 
834 	if (pkt->command == RTE_POWER_CPU_POWER) {
835 		unsigned int core_num;
836 
837 		if (pkt->core_type == RTE_POWER_CORE_TYPE_VIRTUAL)
838 			core_num = get_pcpu(chan_info, pkt->resource_id);
839 		else
840 			core_num = pkt->resource_id;
841 
842 		RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
843 			core_num);
844 
845 		int scale_res;
846 		bool valid_unit = true;
847 
848 		switch (pkt->unit) {
849 		case(RTE_POWER_SCALE_MIN):
850 			scale_res = power_manager_scale_core_min(core_num);
851 			break;
852 		case(RTE_POWER_SCALE_MAX):
853 			scale_res = power_manager_scale_core_max(core_num);
854 			break;
855 		case(RTE_POWER_SCALE_DOWN):
856 			scale_res = power_manager_scale_core_down(core_num);
857 			break;
858 		case(RTE_POWER_SCALE_UP):
859 			scale_res = power_manager_scale_core_up(core_num);
860 			break;
861 		case(RTE_POWER_ENABLE_TURBO):
862 			scale_res = power_manager_enable_turbo_core(core_num);
863 			break;
864 		case(RTE_POWER_DISABLE_TURBO):
865 			scale_res = power_manager_disable_turbo_core(core_num);
866 			break;
867 		default:
868 			valid_unit = false;
869 			break;
870 		}
871 
872 		if (valid_unit) {
873 			ret = send_ack_for_received_cmd(pkt,
874 					chan_info,
875 					scale_res >= 0 ?
876 						RTE_POWER_CMD_ACK :
877 						RTE_POWER_CMD_NACK);
878 			if (ret < 0)
879 				RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
880 		} else
881 			RTE_LOG(ERR, CHANNEL_MONITOR, "Unexpected unit type.\n");
882 
883 	}
884 
885 	if (pkt->command == RTE_POWER_PKT_POLICY) {
886 		RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
887 				pkt->vm_name);
888 		int ret = send_ack_for_received_cmd(pkt,
889 				chan_info,
890 				RTE_POWER_CMD_ACK);
891 		if (ret < 0)
892 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
893 		update_policy(pkt);
894 		policy_is_set = 1;
895 	}
896 
897 	if (pkt->command == RTE_POWER_PKT_POLICY_REMOVE) {
898 		ret = remove_policy(pkt);
899 		if (ret == 0)
900 			RTE_LOG(INFO, CHANNEL_MONITOR,
901 				 "Removed policy %s\n", pkt->vm_name);
902 		else
903 			RTE_LOG(INFO, CHANNEL_MONITOR,
904 				 "Policy %s does not exist\n", pkt->vm_name);
905 	}
906 
907 	if (pkt->command == RTE_POWER_QUERY_FREQ_LIST ||
908 		pkt->command == RTE_POWER_QUERY_FREQ) {
909 
910 		RTE_LOG(INFO, CHANNEL_MONITOR,
911 			"Frequency for %s requested.\n", pkt->vm_name);
912 		int ret = send_freq(pkt,
913 				chan_info,
914 				pkt->command == RTE_POWER_QUERY_FREQ_LIST);
915 		if (ret < 0)
916 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during frequency sending.\n");
917 	}
918 
919 	if (pkt->command == RTE_POWER_QUERY_CAPS_LIST ||
920 		pkt->command == RTE_POWER_QUERY_CAPS) {
921 
922 		RTE_LOG(INFO, CHANNEL_MONITOR,
923 			"Capabilities for %s requested.\n", pkt->vm_name);
924 		int ret = send_capabilities(pkt,
925 				chan_info,
926 				pkt->command == RTE_POWER_QUERY_CAPS_LIST);
927 		if (ret < 0)
928 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending capabilities.\n");
929 	}
930 
931 	/*
932 	 * Return is not checked as channel status may have been set to DISABLED
933 	 * from management thread
934 	 */
935 	uint32_t channel_processing = CHANNEL_MGR_CHANNEL_PROCESSING;
936 	__atomic_compare_exchange_n(&(chan_info->status), &channel_processing,
937 		CHANNEL_MGR_CHANNEL_CONNECTED, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
938 	return 0;
939 
940 }
941 
942 int
943 add_channel_to_monitor(struct channel_info **chan_info)
944 {
945 	struct channel_info *info = *chan_info;
946 	struct epoll_event event;
947 
948 	event.events = EPOLLIN;
949 	event.data.ptr = info;
950 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
951 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
952 				"to epoll\n", info->channel_path);
953 		return -1;
954 	}
955 	RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
956 			"to monitor\n", info->channel_path);
957 	return 0;
958 }
959 
960 int
961 remove_channel_from_monitor(struct channel_info *chan_info)
962 {
963 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
964 			chan_info->fd, NULL) < 0) {
965 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
966 				"from epoll\n", chan_info->channel_path);
967 		return -1;
968 	}
969 	return 0;
970 }
971 
972 int
973 channel_monitor_init(void)
974 {
975 	global_event_fd = epoll_create1(0);
976 	if (global_event_fd == 0) {
977 		RTE_LOG(ERR, CHANNEL_MONITOR,
978 				"Error creating epoll context with error %s\n",
979 				strerror(errno));
980 		return -1;
981 	}
982 	global_events_list = rte_malloc("epoll_events",
983 			sizeof(*global_events_list)
984 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
985 	if (global_events_list == NULL) {
986 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
987 				"epoll events\n");
988 		return -1;
989 	}
990 	return 0;
991 }
992 
993 static void
994 read_binary_packet(struct channel_info *chan_info)
995 {
996 	struct rte_power_channel_packet pkt;
997 	void *buffer = &pkt;
998 	int buffer_len = sizeof(pkt);
999 	int n_bytes, err = 0;
1000 
1001 	while (buffer_len > 0) {
1002 		n_bytes = read(chan_info->fd,
1003 				buffer, buffer_len);
1004 		if (n_bytes == buffer_len)
1005 			break;
1006 		if (n_bytes < 0) {
1007 			err = errno;
1008 			RTE_LOG(DEBUG, CHANNEL_MONITOR,
1009 				"Received error on "
1010 				"channel '%s' read: %s\n",
1011 				chan_info->channel_path,
1012 				strerror(err));
1013 			remove_channel(&chan_info);
1014 			break;
1015 		}
1016 		buffer = (char *)buffer + n_bytes;
1017 		buffer_len -= n_bytes;
1018 	}
1019 	if (!err)
1020 		process_request(&pkt, chan_info);
1021 }
1022 
1023 #ifdef USE_JANSSON
1024 static void
1025 read_json_packet(struct channel_info *chan_info)
1026 {
1027 	struct rte_power_channel_packet pkt;
1028 	int n_bytes, ret;
1029 	json_t *root;
1030 	json_error_t error;
1031 	const char *resource_name;
1032 	char *start, *end;
1033 	uint32_t n;
1034 
1035 
1036 	/* read opening brace to closing brace */
1037 	do {
1038 		int idx = 0;
1039 		int indent = 0;
1040 		do {
1041 			n_bytes = read(chan_info->fd, &json_data[idx], 1);
1042 			if (n_bytes == 0)
1043 				break;
1044 			if (json_data[idx] == '{')
1045 				indent++;
1046 			if (json_data[idx] == '}')
1047 				indent--;
1048 			if ((indent > 0) || (idx > 0))
1049 				idx++;
1050 			if (indent <= 0)
1051 				json_data[idx] = 0;
1052 			if (idx >= MAX_JSON_STRING_LEN-1)
1053 				break;
1054 		} while (indent > 0);
1055 
1056 		json_data[idx] = '\0';
1057 
1058 		if (strlen(json_data) == 0)
1059 			continue;
1060 
1061 		printf("got [%s]\n", json_data);
1062 
1063 		root = json_loads(json_data, 0, &error);
1064 
1065 		if (root) {
1066 			resource_name = get_resource_name_from_chn_path(
1067 				chan_info->channel_path);
1068 			/*
1069 			 * Because our data is now in the json
1070 			 * object, we can overwrite the pkt
1071 			 * with a rte_power_channel_packet struct, using
1072 			 * parse_json_to_pkt()
1073 			 */
1074 			ret = parse_json_to_pkt(root, &pkt, resource_name);
1075 			json_decref(root);
1076 			if (ret) {
1077 				RTE_LOG(ERR, CHANNEL_MONITOR,
1078 					"Error validating JSON profile data\n");
1079 				break;
1080 			}
1081 			start = strstr(pkt.vm_name,
1082 					CHANNEL_MGR_FIFO_PATTERN_NAME);
1083 			if (start != NULL) {
1084 				/* move past pattern to start of fifo id */
1085 				start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
1086 
1087 				end = start;
1088 				n = (uint32_t)strtoul(start, &end, 10);
1089 
1090 				if (end[0] == '\0') {
1091 					/* Add core id to core list */
1092 					pkt.num_vcpu = 1;
1093 					pkt.vcpu_to_control[0] = n;
1094 					process_request(&pkt, chan_info);
1095 				} else {
1096 					RTE_LOG(ERR, CHANNEL_MONITOR,
1097 						"Cannot extract core id from fifo name\n");
1098 				}
1099 			} else {
1100 				process_request(&pkt, chan_info);
1101 			}
1102 		} else {
1103 			RTE_LOG(ERR, CHANNEL_MONITOR,
1104 					"JSON error on line %d: %s\n",
1105 					error.line, error.text);
1106 		}
1107 	} while (n_bytes > 0);
1108 }
1109 #endif
1110 
1111 void
1112 run_channel_monitor(void)
1113 {
1114 	while (run_loop) {
1115 		int n_events, i;
1116 
1117 		n_events = epoll_wait(global_event_fd, global_events_list,
1118 				MAX_EVENTS, 1);
1119 		if (!run_loop)
1120 			break;
1121 		for (i = 0; i < n_events; i++) {
1122 			struct channel_info *chan_info = (struct channel_info *)
1123 					global_events_list[i].data.ptr;
1124 			if ((global_events_list[i].events & EPOLLERR) ||
1125 				(global_events_list[i].events & EPOLLHUP)) {
1126 				RTE_LOG(INFO, CHANNEL_MONITOR,
1127 						"Remote closed connection for "
1128 						"channel '%s'\n",
1129 						chan_info->channel_path);
1130 				remove_channel(&chan_info);
1131 				continue;
1132 			}
1133 			if (global_events_list[i].events & EPOLLIN) {
1134 
1135 				switch (chan_info->type) {
1136 				case CHANNEL_TYPE_BINARY:
1137 					read_binary_packet(chan_info);
1138 					break;
1139 #ifdef USE_JANSSON
1140 				case CHANNEL_TYPE_JSON:
1141 					read_json_packet(chan_info);
1142 					break;
1143 #endif
1144 				default:
1145 					break;
1146 				}
1147 			}
1148 		}
1149 		rte_delay_us(time_period_ms*1000);
1150 		if (policy_is_set) {
1151 			unsigned int j;
1152 
1153 			for (j = 0; j < RTE_DIM(policies); j++) {
1154 				if (policies[j].enabled == 1)
1155 					apply_policy(&policies[j]);
1156 			}
1157 		}
1158 	}
1159 }
1160