xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision 8809f78c7dd9f33a44a4f89c58fc91ded34296ed)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #ifdef RTE_NET_I40E
32 #include <rte_pmd_i40e.h>
33 #endif
34 #include <rte_power.h>
35 
36 #include <libvirt/libvirt.h>
37 #include "channel_monitor.h"
38 #include "channel_commands.h"
39 #include "channel_manager.h"
40 #include "power_manager.h"
41 #include "oob_monitor.h"
42 
43 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
44 
45 #define MAX_EVENTS 256
46 
47 uint64_t vsi_pkt_count_prev[384];
48 uint64_t rdtsc_prev[384];
49 #define MAX_JSON_STRING_LEN 1024
50 char json_data[MAX_JSON_STRING_LEN];
51 
52 double time_period_ms = 1;
53 static volatile unsigned run_loop = 1;
54 static int global_event_fd;
55 static unsigned int policy_is_set;
56 static struct epoll_event *global_events_list;
57 static struct policy policies[RTE_MAX_LCORE];
58 
59 #ifdef USE_JANSSON
60 
61 union PFID {
62 	struct rte_ether_addr addr;
63 	uint64_t pfid;
64 };
65 
66 static int
67 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
68 {
69 	int i;
70 	char *end;
71 	unsigned long o[RTE_ETHER_ADDR_LEN];
72 
73 	i = 0;
74 	do {
75 		errno = 0;
76 		o[i] = strtoul(a, &end, 16);
77 		if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
78 			return -1;
79 		a = end + 1;
80 	} while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
81 
82 	/* Junk at the end of line */
83 	if (end[0] != 0)
84 		return -1;
85 
86 	/* Support the format XX:XX:XX:XX:XX:XX */
87 	if (i == RTE_ETHER_ADDR_LEN) {
88 		while (i-- != 0) {
89 			if (o[i] > UINT8_MAX)
90 				return -1;
91 			ether_addr->addr_bytes[i] = (uint8_t)o[i];
92 		}
93 	/* Support the format XXXX:XXXX:XXXX */
94 	} else if (i == RTE_ETHER_ADDR_LEN / 2) {
95 		while (i-- != 0) {
96 			if (o[i] > UINT16_MAX)
97 				return -1;
98 			ether_addr->addr_bytes[i * 2] =
99 					(uint8_t)(o[i] >> 8);
100 			ether_addr->addr_bytes[i * 2 + 1] =
101 					(uint8_t)(o[i] & 0xff);
102 		}
103 	/* unknown format */
104 	} else
105 		return -1;
106 
107 	return 0;
108 }
109 
110 static int
111 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
112 {
113 	union PFID pfid;
114 	int ret;
115 
116 	/* Use port MAC address as the vfid */
117 	ret = str_to_ether_addr(mac, &pfid.addr);
118 
119 	if (ret != 0) {
120 		RTE_LOG(ERR, CHANNEL_MONITOR,
121 			"Invalid mac address received in JSON\n");
122 		pkt->vfid[idx] = 0;
123 		return -1;
124 	}
125 
126 	printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
127 			"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
128 			pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
129 			pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
130 			pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
131 
132 	pkt->vfid[idx] = pfid.pfid;
133 	return 0;
134 }
135 
136 static char*
137 get_resource_name_from_chn_path(const char *channel_path)
138 {
139 	char *substr = NULL;
140 
141 	substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
142 
143 	return substr;
144 }
145 
146 static int
147 get_resource_id_from_vmname(const char *vm_name)
148 {
149 	int result = -1;
150 	int off = 0;
151 
152 	if (vm_name == NULL)
153 		return -1;
154 
155 	while (vm_name[off] != '\0') {
156 		if (isdigit(vm_name[off]))
157 			break;
158 		off++;
159 	}
160 	result = atoi(&vm_name[off]);
161 	if ((result == 0) && (vm_name[off] != '0'))
162 		return -1;
163 
164 	return result;
165 }
166 
167 static int
168 parse_json_to_pkt(json_t *element, struct channel_packet *pkt,
169 					const char *vm_name)
170 {
171 	const char *key;
172 	json_t *value;
173 	int ret;
174 	int resource_id;
175 
176 	memset(pkt, 0, sizeof(struct channel_packet));
177 
178 	pkt->nb_mac_to_monitor = 0;
179 	pkt->t_boost_status.tbEnabled = false;
180 	pkt->workload = LOW;
181 	pkt->policy_to_use = TIME;
182 	pkt->command = PKT_POLICY;
183 	pkt->core_type = CORE_TYPE_PHYSICAL;
184 
185 	if (vm_name == NULL) {
186 		RTE_LOG(ERR, CHANNEL_MONITOR,
187 			"vm_name is NULL, request rejected !\n");
188 		return -1;
189 	}
190 
191 	json_object_foreach(element, key, value) {
192 		if (!strcmp(key, "policy")) {
193 			/* Recurse in to get the contents of profile */
194 			ret = parse_json_to_pkt(value, pkt, vm_name);
195 			if (ret)
196 				return ret;
197 		} else if (!strcmp(key, "instruction")) {
198 			/* Recurse in to get the contents of instruction */
199 			ret = parse_json_to_pkt(value, pkt, vm_name);
200 			if (ret)
201 				return ret;
202 		} else if (!strcmp(key, "command")) {
203 			char command[32];
204 			strlcpy(command, json_string_value(value), 32);
205 			if (!strcmp(command, "power")) {
206 				pkt->command = CPU_POWER;
207 			} else if (!strcmp(command, "create")) {
208 				pkt->command = PKT_POLICY;
209 			} else if (!strcmp(command, "destroy")) {
210 				pkt->command = PKT_POLICY_REMOVE;
211 			} else {
212 				RTE_LOG(ERR, CHANNEL_MONITOR,
213 					"Invalid command received in JSON\n");
214 				return -1;
215 			}
216 		} else if (!strcmp(key, "policy_type")) {
217 			char command[32];
218 			strlcpy(command, json_string_value(value), 32);
219 			if (!strcmp(command, "TIME")) {
220 				pkt->policy_to_use = TIME;
221 			} else if (!strcmp(command, "TRAFFIC")) {
222 				pkt->policy_to_use = TRAFFIC;
223 			} else if (!strcmp(command, "WORKLOAD")) {
224 				pkt->policy_to_use = WORKLOAD;
225 			} else if (!strcmp(command, "BRANCH_RATIO")) {
226 				pkt->policy_to_use = BRANCH_RATIO;
227 			} else {
228 				RTE_LOG(ERR, CHANNEL_MONITOR,
229 					"Wrong policy_type received in JSON\n");
230 				return -1;
231 			}
232 		} else if (!strcmp(key, "workload")) {
233 			char command[32];
234 			strlcpy(command, json_string_value(value), 32);
235 			if (!strcmp(command, "HIGH")) {
236 				pkt->workload = HIGH;
237 			} else if (!strcmp(command, "MEDIUM")) {
238 				pkt->workload = MEDIUM;
239 			} else if (!strcmp(command, "LOW")) {
240 				pkt->workload = LOW;
241 			} else {
242 				RTE_LOG(ERR, CHANNEL_MONITOR,
243 					"Wrong workload received in JSON\n");
244 				return -1;
245 			}
246 		} else if (!strcmp(key, "busy_hours")) {
247 			unsigned int i;
248 			size_t size = json_array_size(value);
249 
250 			for (i = 0; i < size; i++) {
251 				int hour = (int)json_integer_value(
252 						json_array_get(value, i));
253 				pkt->timer_policy.busy_hours[i] = hour;
254 			}
255 		} else if (!strcmp(key, "quiet_hours")) {
256 			unsigned int i;
257 			size_t size = json_array_size(value);
258 
259 			for (i = 0; i < size; i++) {
260 				int hour = (int)json_integer_value(
261 						json_array_get(value, i));
262 				pkt->timer_policy.quiet_hours[i] = hour;
263 			}
264 		} else if (!strcmp(key, "mac_list")) {
265 			unsigned int i;
266 			size_t size = json_array_size(value);
267 
268 			for (i = 0; i < size; i++) {
269 				char mac[32];
270 				strlcpy(mac,
271 					json_string_value(json_array_get(value, i)),
272 					32);
273 				set_policy_mac(pkt, i, mac);
274 			}
275 			pkt->nb_mac_to_monitor = size;
276 		} else if (!strcmp(key, "avg_packet_thresh")) {
277 			pkt->traffic_policy.avg_max_packet_thresh =
278 					(uint32_t)json_integer_value(value);
279 		} else if (!strcmp(key, "max_packet_thresh")) {
280 			pkt->traffic_policy.max_max_packet_thresh =
281 					(uint32_t)json_integer_value(value);
282 		} else if (!strcmp(key, "unit")) {
283 			char unit[32];
284 			strlcpy(unit, json_string_value(value), 32);
285 			if (!strcmp(unit, "SCALE_UP")) {
286 				pkt->unit = CPU_POWER_SCALE_UP;
287 			} else if (!strcmp(unit, "SCALE_DOWN")) {
288 				pkt->unit = CPU_POWER_SCALE_DOWN;
289 			} else if (!strcmp(unit, "SCALE_MAX")) {
290 				pkt->unit = CPU_POWER_SCALE_MAX;
291 			} else if (!strcmp(unit, "SCALE_MIN")) {
292 				pkt->unit = CPU_POWER_SCALE_MIN;
293 			} else if (!strcmp(unit, "ENABLE_TURBO")) {
294 				pkt->unit = CPU_POWER_ENABLE_TURBO;
295 			} else if (!strcmp(unit, "DISABLE_TURBO")) {
296 				pkt->unit = CPU_POWER_DISABLE_TURBO;
297 			} else {
298 				RTE_LOG(ERR, CHANNEL_MONITOR,
299 					"Invalid command received in JSON\n");
300 				return -1;
301 			}
302 		} else {
303 			RTE_LOG(ERR, CHANNEL_MONITOR,
304 				"Unknown key received in JSON string: %s\n",
305 				key);
306 		}
307 
308 		resource_id = get_resource_id_from_vmname(vm_name);
309 		if (resource_id < 0) {
310 			RTE_LOG(ERR, CHANNEL_MONITOR,
311 				"Could not get resource_id from vm_name:%s\n",
312 				vm_name);
313 			return -1;
314 		}
315 		strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ);
316 		pkt->resource_id = resource_id;
317 	}
318 	return 0;
319 }
320 #endif
321 
322 void channel_monitor_exit(void)
323 {
324 	run_loop = 0;
325 	rte_free(global_events_list);
326 }
327 
328 static void
329 core_share(int pNo, int z, int x, int t)
330 {
331 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
332 		if (strcmp(policies[pNo].pkt.vm_name,
333 				lvm_info[x].vm_name) != 0) {
334 			policies[pNo].core_share[z].status = 1;
335 			power_manager_scale_core_max(
336 					policies[pNo].core_share[z].pcpu);
337 		}
338 	}
339 }
340 
341 static void
342 core_share_status(int pNo)
343 {
344 
345 	int noVms = 0, noVcpus = 0, z, x, t;
346 
347 	get_all_vm(&noVms, &noVcpus);
348 
349 	/* Reset Core Share Status. */
350 	for (z = 0; z < noVcpus; z++)
351 		policies[pNo].core_share[z].status = 0;
352 
353 	/* Foreach vcpu in a policy. */
354 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
355 		/* Foreach VM on the platform. */
356 		for (x = 0; x < noVms; x++) {
357 			/* Foreach vcpu of VMs on platform. */
358 			for (t = 0; t < lvm_info[x].num_cpus; t++)
359 				core_share(pNo, z, x, t);
360 		}
361 	}
362 }
363 
364 
365 static int
366 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
367 {
368 	int ret = 0;
369 
370 	if (pol->pkt.policy_to_use == BRANCH_RATIO) {
371 		ci->cd[pcpu].oob_enabled = 1;
372 		ret = add_core_to_monitor(pcpu);
373 		if (ret == 0)
374 			RTE_LOG(INFO, CHANNEL_MONITOR,
375 					"Monitoring pcpu %d OOB for %s\n",
376 					pcpu, pol->pkt.vm_name);
377 		else
378 			RTE_LOG(ERR, CHANNEL_MONITOR,
379 					"Error monitoring pcpu %d OOB for %s\n",
380 					pcpu, pol->pkt.vm_name);
381 
382 	} else {
383 		pol->core_share[count].pcpu = pcpu;
384 		RTE_LOG(INFO, CHANNEL_MONITOR,
385 				"Monitoring pcpu %d for %s\n",
386 				pcpu, pol->pkt.vm_name);
387 	}
388 	return ret;
389 }
390 
391 static void
392 get_pcpu_to_control(struct policy *pol)
393 {
394 
395 	/* Convert vcpu to pcpu. */
396 	struct vm_info info;
397 	int pcpu, count;
398 	struct core_info *ci;
399 
400 	ci = get_core_info();
401 
402 	RTE_LOG(DEBUG, CHANNEL_MONITOR,
403 			"Looking for pcpu for %s\n", pol->pkt.vm_name);
404 
405 	/*
406 	 * So now that we're handling virtual and physical cores, we need to
407 	 * differenciate between them when adding them to the branch monitor.
408 	 * Virtual cores need to be converted to physical cores.
409 	 */
410 	if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
411 		/*
412 		 * If the cores in the policy are virtual, we need to map them
413 		 * to physical core. We look up the vm info and use that for
414 		 * the mapping.
415 		 */
416 		get_info_vm(pol->pkt.vm_name, &info);
417 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
418 			pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
419 			pcpu_monitor(pol, ci, pcpu, count);
420 		}
421 	} else {
422 		/*
423 		 * If the cores in the policy are physical, we just use
424 		 * those core id's directly.
425 		 */
426 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
427 			pcpu = pol->pkt.vcpu_to_control[count];
428 			pcpu_monitor(pol, ci, pcpu, count);
429 		}
430 	}
431 }
432 
433 static int
434 get_pfid(struct policy *pol)
435 {
436 
437 	int i, x, ret = 0;
438 
439 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
440 
441 		RTE_ETH_FOREACH_DEV(x) {
442 #ifdef RTE_NET_I40E
443 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
444 				(struct rte_ether_addr *)&(pol->pkt.vfid[i]));
445 #else
446 			ret = -ENOTSUP;
447 #endif
448 			if (ret != -EINVAL) {
449 				pol->port[i] = x;
450 				break;
451 			}
452 		}
453 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
454 			RTE_LOG(INFO, CHANNEL_MONITOR,
455 				"Error with Policy. MAC not found on "
456 				"attached ports ");
457 			pol->enabled = 0;
458 			return ret;
459 		}
460 		pol->pfid[i] = ret;
461 	}
462 	return 1;
463 }
464 
465 static int
466 update_policy(struct channel_packet *pkt)
467 {
468 
469 	unsigned int updated = 0;
470 	unsigned int i;
471 
472 
473 	RTE_LOG(INFO, CHANNEL_MONITOR,
474 			"Applying policy for %s\n", pkt->vm_name);
475 
476 	for (i = 0; i < RTE_DIM(policies); i++) {
477 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
478 			/* Copy the contents of *pkt into the policy.pkt */
479 			policies[i].pkt = *pkt;
480 			get_pcpu_to_control(&policies[i]);
481 			/* Check Eth dev only for Traffic policy */
482 			if (policies[i].pkt.policy_to_use == TRAFFIC) {
483 				if (get_pfid(&policies[i]) < 0) {
484 					updated = 1;
485 					break;
486 				}
487 			}
488 			core_share_status(i);
489 			policies[i].enabled = 1;
490 			updated = 1;
491 		}
492 	}
493 	if (!updated) {
494 		for (i = 0; i < RTE_DIM(policies); i++) {
495 			if (policies[i].enabled == 0) {
496 				policies[i].pkt = *pkt;
497 				get_pcpu_to_control(&policies[i]);
498 				/* Check Eth dev only for Traffic policy */
499 				if (policies[i].pkt.policy_to_use == TRAFFIC) {
500 					if (get_pfid(&policies[i]) < 0) {
501 						updated = 1;
502 						break;
503 					}
504 				}
505 				core_share_status(i);
506 				policies[i].enabled = 1;
507 				break;
508 			}
509 		}
510 	}
511 	return 0;
512 }
513 
514 static int
515 remove_policy(struct channel_packet *pkt __rte_unused)
516 {
517 	unsigned int i;
518 
519 	/*
520 	 * Disabling the policy is simply a case of setting
521 	 * enabled to 0
522 	 */
523 	for (i = 0; i < RTE_DIM(policies); i++) {
524 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
525 			policies[i].enabled = 0;
526 			return 0;
527 		}
528 	}
529 	return -1;
530 }
531 
532 static uint64_t
533 get_pkt_diff(struct policy *pol)
534 {
535 
536 	uint64_t vsi_pkt_count,
537 		vsi_pkt_total = 0,
538 		vsi_pkt_count_prev_total = 0;
539 	double rdtsc_curr, rdtsc_diff, diff;
540 	int x;
541 #ifdef RTE_NET_I40E
542 	struct rte_eth_stats vf_stats;
543 #endif
544 
545 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
546 
547 #ifdef RTE_NET_I40E
548 		/*Read vsi stats*/
549 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
550 			vsi_pkt_count = vf_stats.ipackets;
551 		else
552 			vsi_pkt_count = -1;
553 #else
554 		vsi_pkt_count = -1;
555 #endif
556 
557 		vsi_pkt_total += vsi_pkt_count;
558 
559 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
560 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
561 	}
562 
563 	rdtsc_curr = rte_rdtsc_precise();
564 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
565 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
566 
567 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
568 			((double)rte_get_tsc_hz() / rdtsc_diff);
569 
570 	return diff;
571 }
572 
573 static void
574 apply_traffic_profile(struct policy *pol)
575 {
576 
577 	int count;
578 	uint64_t diff = 0;
579 
580 	diff = get_pkt_diff(pol);
581 
582 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
583 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
584 			if (pol->core_share[count].status != 1)
585 				power_manager_scale_core_max(
586 						pol->core_share[count].pcpu);
587 		}
588 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
589 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
590 			if (pol->core_share[count].status != 1)
591 				power_manager_scale_core_med(
592 						pol->core_share[count].pcpu);
593 		}
594 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
595 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
596 			if (pol->core_share[count].status != 1)
597 				power_manager_scale_core_min(
598 						pol->core_share[count].pcpu);
599 		}
600 	}
601 }
602 
603 static void
604 apply_time_profile(struct policy *pol)
605 {
606 
607 	int count, x;
608 	struct timeval tv;
609 	struct tm *ptm;
610 	char time_string[40];
611 
612 	/* Obtain the time of day, and convert it to a tm struct. */
613 	gettimeofday(&tv, NULL);
614 	ptm = localtime(&tv.tv_sec);
615 	/* Format the date and time, down to a single second. */
616 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
617 
618 	for (x = 0; x < HOURS; x++) {
619 
620 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
621 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
622 				if (pol->core_share[count].status != 1) {
623 					power_manager_scale_core_max(
624 						pol->core_share[count].pcpu);
625 				}
626 			}
627 			break;
628 		} else if (ptm->tm_hour ==
629 				pol->pkt.timer_policy.quiet_hours[x]) {
630 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
631 				if (pol->core_share[count].status != 1) {
632 					power_manager_scale_core_min(
633 						pol->core_share[count].pcpu);
634 			}
635 		}
636 			break;
637 		} else if (ptm->tm_hour ==
638 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
639 			apply_traffic_profile(pol);
640 			break;
641 		}
642 	}
643 }
644 
645 static void
646 apply_workload_profile(struct policy *pol)
647 {
648 
649 	int count;
650 
651 	if (pol->pkt.workload == HIGH) {
652 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
653 			if (pol->core_share[count].status != 1)
654 				power_manager_scale_core_max(
655 						pol->core_share[count].pcpu);
656 		}
657 	} else if (pol->pkt.workload == MEDIUM) {
658 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
659 			if (pol->core_share[count].status != 1)
660 				power_manager_scale_core_med(
661 						pol->core_share[count].pcpu);
662 		}
663 	} else if (pol->pkt.workload == LOW) {
664 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
665 			if (pol->core_share[count].status != 1)
666 				power_manager_scale_core_min(
667 						pol->core_share[count].pcpu);
668 		}
669 	}
670 }
671 
672 static void
673 apply_policy(struct policy *pol)
674 {
675 
676 	struct channel_packet *pkt = &pol->pkt;
677 
678 	/*Check policy to use*/
679 	if (pkt->policy_to_use == TRAFFIC)
680 		apply_traffic_profile(pol);
681 	else if (pkt->policy_to_use == TIME)
682 		apply_time_profile(pol);
683 	else if (pkt->policy_to_use == WORKLOAD)
684 		apply_workload_profile(pol);
685 }
686 
687 static int
688 write_binary_packet(void *buffer,
689 		size_t buffer_len,
690 		struct channel_info *chan_info)
691 {
692 	int ret;
693 
694 	if (buffer_len == 0 || buffer == NULL)
695 		return -1;
696 
697 	if (chan_info->fd < 0) {
698 		RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n");
699 		return -1;
700 	}
701 
702 	while (buffer_len > 0) {
703 		ret = write(chan_info->fd, buffer, buffer_len);
704 		if (ret == -1) {
705 			if (errno == EINTR)
706 				continue;
707 			RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n",
708 					strerror(errno));
709 			return -1;
710 		}
711 		buffer = (char *)buffer + ret;
712 		buffer_len -= ret;
713 	}
714 	return 0;
715 }
716 
717 static int
718 send_freq(struct channel_packet *pkt,
719 		struct channel_info *chan_info,
720 		bool freq_list)
721 {
722 	unsigned int vcore_id = pkt->resource_id;
723 	struct channel_packet_freq_list channel_pkt_freq_list;
724 	struct vm_info info;
725 
726 	if (get_info_vm(pkt->vm_name, &info) != 0)
727 		return -1;
728 
729 	if (!freq_list && vcore_id >= MAX_VCPU_PER_VM)
730 		return -1;
731 
732 	if (!info.allow_query)
733 		return -1;
734 
735 	channel_pkt_freq_list.command = CPU_POWER_FREQ_LIST;
736 	channel_pkt_freq_list.num_vcpu = info.num_vcpus;
737 
738 	if (freq_list) {
739 		unsigned int i;
740 		for (i = 0; i < info.num_vcpus; i++)
741 			channel_pkt_freq_list.freq_list[i] =
742 			  power_manager_get_current_frequency(info.pcpu_map[i]);
743 	} else {
744 		channel_pkt_freq_list.freq_list[vcore_id] =
745 		  power_manager_get_current_frequency(info.pcpu_map[vcore_id]);
746 	}
747 
748 	return write_binary_packet(&channel_pkt_freq_list,
749 			sizeof(channel_pkt_freq_list),
750 			chan_info);
751 }
752 
753 static int
754 send_capabilities(struct channel_packet *pkt,
755 		struct channel_info *chan_info,
756 		bool list_requested)
757 {
758 	unsigned int vcore_id = pkt->resource_id;
759 	struct channel_packet_caps_list channel_pkt_caps_list;
760 	struct vm_info info;
761 	struct rte_power_core_capabilities caps;
762 	int ret;
763 
764 	if (get_info_vm(pkt->vm_name, &info) != 0)
765 		return -1;
766 
767 	if (!list_requested && vcore_id >= MAX_VCPU_PER_VM)
768 		return -1;
769 
770 	if (!info.allow_query)
771 		return -1;
772 
773 	channel_pkt_caps_list.command = CPU_POWER_CAPS_LIST;
774 	channel_pkt_caps_list.num_vcpu = info.num_vcpus;
775 
776 	if (list_requested) {
777 		unsigned int i;
778 		for (i = 0; i < info.num_vcpus; i++) {
779 			ret = rte_power_get_capabilities(info.pcpu_map[i],
780 					&caps);
781 			if (ret == 0) {
782 				channel_pkt_caps_list.turbo[i] =
783 						caps.turbo;
784 				channel_pkt_caps_list.priority[i] =
785 						caps.priority;
786 			} else
787 				return -1;
788 
789 		}
790 	} else {
791 		ret = rte_power_get_capabilities(info.pcpu_map[vcore_id],
792 				&caps);
793 		if (ret == 0) {
794 			channel_pkt_caps_list.turbo[vcore_id] =
795 					caps.turbo;
796 			channel_pkt_caps_list.priority[vcore_id] =
797 					caps.priority;
798 		} else
799 			return -1;
800 	}
801 
802 	return write_binary_packet(&channel_pkt_caps_list,
803 			sizeof(channel_pkt_caps_list),
804 			chan_info);
805 }
806 
807 static int
808 send_ack_for_received_cmd(struct channel_packet *pkt,
809 		struct channel_info *chan_info,
810 		uint32_t command)
811 {
812 	pkt->command = command;
813 	return write_binary_packet(pkt,
814 			sizeof(struct channel_packet),
815 			chan_info);
816 }
817 
818 static int
819 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
820 {
821 	int ret;
822 
823 	if (chan_info == NULL)
824 		return -1;
825 
826 	if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
827 			CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
828 		return -1;
829 
830 	if (pkt->command == CPU_POWER) {
831 		unsigned int core_num;
832 
833 		if (pkt->core_type == CORE_TYPE_VIRTUAL)
834 			core_num = get_pcpu(chan_info, pkt->resource_id);
835 		else
836 			core_num = pkt->resource_id;
837 
838 		RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
839 			core_num);
840 
841 		int scale_res;
842 		bool valid_unit = true;
843 
844 		switch (pkt->unit) {
845 		case(CPU_POWER_SCALE_MIN):
846 			scale_res = power_manager_scale_core_min(core_num);
847 			break;
848 		case(CPU_POWER_SCALE_MAX):
849 			scale_res = power_manager_scale_core_max(core_num);
850 			break;
851 		case(CPU_POWER_SCALE_DOWN):
852 			scale_res = power_manager_scale_core_down(core_num);
853 			break;
854 		case(CPU_POWER_SCALE_UP):
855 			scale_res = power_manager_scale_core_up(core_num);
856 			break;
857 		case(CPU_POWER_ENABLE_TURBO):
858 			scale_res = power_manager_enable_turbo_core(core_num);
859 			break;
860 		case(CPU_POWER_DISABLE_TURBO):
861 			scale_res = power_manager_disable_turbo_core(core_num);
862 			break;
863 		default:
864 			valid_unit = false;
865 			break;
866 		}
867 
868 		if (valid_unit) {
869 			ret = send_ack_for_received_cmd(pkt,
870 					chan_info,
871 					scale_res >= 0 ?
872 						CPU_POWER_CMD_ACK :
873 						CPU_POWER_CMD_NACK);
874 			if (ret < 0)
875 				RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
876 		} else
877 			RTE_LOG(ERR, CHANNEL_MONITOR, "Unexpected unit type.\n");
878 
879 	}
880 
881 	if (pkt->command == PKT_POLICY) {
882 		RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
883 				pkt->vm_name);
884 		int ret = send_ack_for_received_cmd(pkt,
885 				chan_info,
886 				CPU_POWER_CMD_ACK);
887 		if (ret < 0)
888 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
889 		update_policy(pkt);
890 		policy_is_set = 1;
891 	}
892 
893 	if (pkt->command == PKT_POLICY_REMOVE) {
894 		ret = remove_policy(pkt);
895 		if (ret == 0)
896 			RTE_LOG(INFO, CHANNEL_MONITOR,
897 				 "Removed policy %s\n", pkt->vm_name);
898 		else
899 			RTE_LOG(INFO, CHANNEL_MONITOR,
900 				 "Policy %s does not exist\n", pkt->vm_name);
901 	}
902 
903 	if (pkt->command == CPU_POWER_QUERY_FREQ_LIST ||
904 		pkt->command == CPU_POWER_QUERY_FREQ) {
905 
906 		RTE_LOG(INFO, CHANNEL_MONITOR,
907 			"Frequency for %s requested.\n", pkt->vm_name);
908 		int ret = send_freq(pkt,
909 				chan_info,
910 				pkt->command == CPU_POWER_QUERY_FREQ_LIST);
911 		if (ret < 0)
912 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during frequency sending.\n");
913 	}
914 
915 	if (pkt->command == CPU_POWER_QUERY_CAPS_LIST ||
916 		pkt->command == CPU_POWER_QUERY_CAPS) {
917 
918 		RTE_LOG(INFO, CHANNEL_MONITOR,
919 			"Capabilities for %s requested.\n", pkt->vm_name);
920 		int ret = send_capabilities(pkt,
921 				chan_info,
922 				pkt->command == CPU_POWER_QUERY_CAPS_LIST);
923 		if (ret < 0)
924 			RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending capabilities.\n");
925 	}
926 
927 	/*
928 	 * Return is not checked as channel status may have been set to DISABLED
929 	 * from management thread
930 	 */
931 	rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
932 			CHANNEL_MGR_CHANNEL_CONNECTED);
933 	return 0;
934 
935 }
936 
937 int
938 add_channel_to_monitor(struct channel_info **chan_info)
939 {
940 	struct channel_info *info = *chan_info;
941 	struct epoll_event event;
942 
943 	event.events = EPOLLIN;
944 	event.data.ptr = info;
945 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
946 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
947 				"to epoll\n", info->channel_path);
948 		return -1;
949 	}
950 	RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
951 			"to monitor\n", info->channel_path);
952 	return 0;
953 }
954 
955 int
956 remove_channel_from_monitor(struct channel_info *chan_info)
957 {
958 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
959 			chan_info->fd, NULL) < 0) {
960 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
961 				"from epoll\n", chan_info->channel_path);
962 		return -1;
963 	}
964 	return 0;
965 }
966 
967 int
968 channel_monitor_init(void)
969 {
970 	global_event_fd = epoll_create1(0);
971 	if (global_event_fd == 0) {
972 		RTE_LOG(ERR, CHANNEL_MONITOR,
973 				"Error creating epoll context with error %s\n",
974 				strerror(errno));
975 		return -1;
976 	}
977 	global_events_list = rte_malloc("epoll_events",
978 			sizeof(*global_events_list)
979 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
980 	if (global_events_list == NULL) {
981 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
982 				"epoll events\n");
983 		return -1;
984 	}
985 	return 0;
986 }
987 
988 static void
989 read_binary_packet(struct channel_info *chan_info)
990 {
991 	struct channel_packet pkt;
992 	void *buffer = &pkt;
993 	int buffer_len = sizeof(pkt);
994 	int n_bytes, err = 0;
995 
996 	while (buffer_len > 0) {
997 		n_bytes = read(chan_info->fd,
998 				buffer, buffer_len);
999 		if (n_bytes == buffer_len)
1000 			break;
1001 		if (n_bytes < 0) {
1002 			err = errno;
1003 			RTE_LOG(DEBUG, CHANNEL_MONITOR,
1004 				"Received error on "
1005 				"channel '%s' read: %s\n",
1006 				chan_info->channel_path,
1007 				strerror(err));
1008 			remove_channel(&chan_info);
1009 			break;
1010 		}
1011 		buffer = (char *)buffer + n_bytes;
1012 		buffer_len -= n_bytes;
1013 	}
1014 	if (!err)
1015 		process_request(&pkt, chan_info);
1016 }
1017 
1018 #ifdef USE_JANSSON
1019 static void
1020 read_json_packet(struct channel_info *chan_info)
1021 {
1022 	struct channel_packet pkt;
1023 	int n_bytes, ret;
1024 	json_t *root;
1025 	json_error_t error;
1026 	const char *resource_name;
1027 	char *start, *end;
1028 	uint32_t n;
1029 
1030 
1031 	/* read opening brace to closing brace */
1032 	do {
1033 		int idx = 0;
1034 		int indent = 0;
1035 		do {
1036 			n_bytes = read(chan_info->fd, &json_data[idx], 1);
1037 			if (n_bytes == 0)
1038 				break;
1039 			if (json_data[idx] == '{')
1040 				indent++;
1041 			if (json_data[idx] == '}')
1042 				indent--;
1043 			if ((indent > 0) || (idx > 0))
1044 				idx++;
1045 			if (indent <= 0)
1046 				json_data[idx] = 0;
1047 			if (idx >= MAX_JSON_STRING_LEN-1)
1048 				break;
1049 		} while (indent > 0);
1050 
1051 		json_data[idx] = '\0';
1052 
1053 		if (strlen(json_data) == 0)
1054 			continue;
1055 
1056 		printf("got [%s]\n", json_data);
1057 
1058 		root = json_loads(json_data, 0, &error);
1059 
1060 		if (root) {
1061 			resource_name = get_resource_name_from_chn_path(
1062 				chan_info->channel_path);
1063 			/*
1064 			 * Because our data is now in the json
1065 			 * object, we can overwrite the pkt
1066 			 * with a channel_packet struct, using
1067 			 * parse_json_to_pkt()
1068 			 */
1069 			ret = parse_json_to_pkt(root, &pkt, resource_name);
1070 			json_decref(root);
1071 			if (ret) {
1072 				RTE_LOG(ERR, CHANNEL_MONITOR,
1073 					"Error validating JSON profile data\n");
1074 				break;
1075 			}
1076 			start = strstr(pkt.vm_name,
1077 					CHANNEL_MGR_FIFO_PATTERN_NAME);
1078 			if (start != NULL) {
1079 				/* move past pattern to start of fifo id */
1080 				start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
1081 
1082 				end = start;
1083 				n = (uint32_t)strtoul(start, &end, 10);
1084 
1085 				if (end[0] == '\0') {
1086 					/* Add core id to core list */
1087 					pkt.num_vcpu = 1;
1088 					pkt.vcpu_to_control[0] = n;
1089 					process_request(&pkt, chan_info);
1090 				} else {
1091 					RTE_LOG(ERR, CHANNEL_MONITOR,
1092 						"Cannot extract core id from fifo name\n");
1093 				}
1094 			} else {
1095 				process_request(&pkt, chan_info);
1096 			}
1097 		} else {
1098 			RTE_LOG(ERR, CHANNEL_MONITOR,
1099 					"JSON error on line %d: %s\n",
1100 					error.line, error.text);
1101 		}
1102 	} while (n_bytes > 0);
1103 }
1104 #endif
1105 
1106 void
1107 run_channel_monitor(void)
1108 {
1109 	while (run_loop) {
1110 		int n_events, i;
1111 
1112 		n_events = epoll_wait(global_event_fd, global_events_list,
1113 				MAX_EVENTS, 1);
1114 		if (!run_loop)
1115 			break;
1116 		for (i = 0; i < n_events; i++) {
1117 			struct channel_info *chan_info = (struct channel_info *)
1118 					global_events_list[i].data.ptr;
1119 			if ((global_events_list[i].events & EPOLLERR) ||
1120 				(global_events_list[i].events & EPOLLHUP)) {
1121 				RTE_LOG(INFO, CHANNEL_MONITOR,
1122 						"Remote closed connection for "
1123 						"channel '%s'\n",
1124 						chan_info->channel_path);
1125 				remove_channel(&chan_info);
1126 				continue;
1127 			}
1128 			if (global_events_list[i].events & EPOLLIN) {
1129 
1130 				switch (chan_info->type) {
1131 				case CHANNEL_TYPE_BINARY:
1132 					read_binary_packet(chan_info);
1133 					break;
1134 #ifdef USE_JANSSON
1135 				case CHANNEL_TYPE_JSON:
1136 					read_json_packet(chan_info);
1137 					break;
1138 #endif
1139 				default:
1140 					break;
1141 				}
1142 			}
1143 		}
1144 		rte_delay_us(time_period_ms*1000);
1145 		if (policy_is_set) {
1146 			unsigned int j;
1147 
1148 			for (j = 0; j < RTE_DIM(policies); j++) {
1149 				if (policies[j].enabled == 1)
1150 					apply_policy(&policies[j]);
1151 			}
1152 		}
1153 	}
1154 }
1155