xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision 089e5ed727a15da2729cfee9b63533dd120bd04c)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32 
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39 
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41 
42 #define MAX_EVENTS 256
43 
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48 
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[RTE_MAX_LCORE];
55 
56 #ifdef USE_JANSSON
57 
58 union PFID {
59 	struct rte_ether_addr addr;
60 	uint64_t pfid;
61 };
62 
63 static int
64 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
65 {
66 	int i;
67 	char *end;
68 	unsigned long o[RTE_ETHER_ADDR_LEN];
69 
70 	i = 0;
71 	do {
72 		errno = 0;
73 		o[i] = strtoul(a, &end, 16);
74 		if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75 			return -1;
76 		a = end + 1;
77 	} while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78 
79 	/* Junk at the end of line */
80 	if (end[0] != 0)
81 		return -1;
82 
83 	/* Support the format XX:XX:XX:XX:XX:XX */
84 	if (i == RTE_ETHER_ADDR_LEN) {
85 		while (i-- != 0) {
86 			if (o[i] > UINT8_MAX)
87 				return -1;
88 			ether_addr->addr_bytes[i] = (uint8_t)o[i];
89 		}
90 	/* Support the format XXXX:XXXX:XXXX */
91 	} else if (i == RTE_ETHER_ADDR_LEN / 2) {
92 		while (i-- != 0) {
93 			if (o[i] > UINT16_MAX)
94 				return -1;
95 			ether_addr->addr_bytes[i * 2] =
96 					(uint8_t)(o[i] >> 8);
97 			ether_addr->addr_bytes[i * 2 + 1] =
98 					(uint8_t)(o[i] & 0xff);
99 		}
100 	/* unknown format */
101 	} else
102 		return -1;
103 
104 	return 0;
105 }
106 
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110 	union PFID pfid;
111 	int ret;
112 
113 	/* Use port MAC address as the vfid */
114 	ret = str_to_ether_addr(mac, &pfid.addr);
115 
116 	if (ret != 0) {
117 		RTE_LOG(ERR, CHANNEL_MONITOR,
118 			"Invalid mac address received in JSON\n");
119 		pkt->vfid[idx] = 0;
120 		return -1;
121 	}
122 
123 	printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124 			"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125 			pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126 			pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127 			pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128 
129 	pkt->vfid[idx] = pfid.pfid;
130 	return 0;
131 }
132 
133 static char*
134 get_resource_name_from_chn_path(const char *channel_path)
135 {
136 	char *substr = NULL;
137 
138 	substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
139 
140 	return substr;
141 }
142 
143 static int
144 get_resource_id_from_vmname(const char *vm_name)
145 {
146 	int result = -1;
147 	int off = 0;
148 
149 	if (vm_name == NULL)
150 		return -1;
151 
152 	while (vm_name[off] != '\0') {
153 		if (isdigit(vm_name[off]))
154 			break;
155 		off++;
156 	}
157 	result = atoi(&vm_name[off]);
158 	if ((result == 0) && (vm_name[off] != '0'))
159 		return -1;
160 
161 	return result;
162 }
163 
164 static int
165 parse_json_to_pkt(json_t *element, struct channel_packet *pkt,
166 					const char *vm_name)
167 {
168 	const char *key;
169 	json_t *value;
170 	int ret;
171 	int resource_id;
172 
173 	memset(pkt, 0, sizeof(struct channel_packet));
174 
175 	pkt->nb_mac_to_monitor = 0;
176 	pkt->t_boost_status.tbEnabled = false;
177 	pkt->workload = LOW;
178 	pkt->policy_to_use = TIME;
179 	pkt->command = PKT_POLICY;
180 	pkt->core_type = CORE_TYPE_PHYSICAL;
181 
182 	if (vm_name == NULL) {
183 		RTE_LOG(ERR, CHANNEL_MONITOR,
184 			"vm_name is NULL, request rejected !\n");
185 		return -1;
186 	}
187 
188 	json_object_foreach(element, key, value) {
189 		if (!strcmp(key, "policy")) {
190 			/* Recurse in to get the contents of profile */
191 			ret = parse_json_to_pkt(value, pkt, vm_name);
192 			if (ret)
193 				return ret;
194 		} else if (!strcmp(key, "instruction")) {
195 			/* Recurse in to get the contents of instruction */
196 			ret = parse_json_to_pkt(value, pkt, vm_name);
197 			if (ret)
198 				return ret;
199 		} else if (!strcmp(key, "command")) {
200 			char command[32];
201 			strlcpy(command, json_string_value(value), 32);
202 			if (!strcmp(command, "power")) {
203 				pkt->command = CPU_POWER;
204 			} else if (!strcmp(command, "create")) {
205 				pkt->command = PKT_POLICY;
206 			} else if (!strcmp(command, "destroy")) {
207 				pkt->command = PKT_POLICY_REMOVE;
208 			} else {
209 				RTE_LOG(ERR, CHANNEL_MONITOR,
210 					"Invalid command received in JSON\n");
211 				return -1;
212 			}
213 		} else if (!strcmp(key, "policy_type")) {
214 			char command[32];
215 			strlcpy(command, json_string_value(value), 32);
216 			if (!strcmp(command, "TIME")) {
217 				pkt->policy_to_use = TIME;
218 			} else if (!strcmp(command, "TRAFFIC")) {
219 				pkt->policy_to_use = TRAFFIC;
220 			} else if (!strcmp(command, "WORKLOAD")) {
221 				pkt->policy_to_use = WORKLOAD;
222 			} else if (!strcmp(command, "BRANCH_RATIO")) {
223 				pkt->policy_to_use = BRANCH_RATIO;
224 			} else {
225 				RTE_LOG(ERR, CHANNEL_MONITOR,
226 					"Wrong policy_type received in JSON\n");
227 				return -1;
228 			}
229 		} else if (!strcmp(key, "workload")) {
230 			char command[32];
231 			strlcpy(command, json_string_value(value), 32);
232 			if (!strcmp(command, "HIGH")) {
233 				pkt->workload = HIGH;
234 			} else if (!strcmp(command, "MEDIUM")) {
235 				pkt->workload = MEDIUM;
236 			} else if (!strcmp(command, "LOW")) {
237 				pkt->workload = LOW;
238 			} else {
239 				RTE_LOG(ERR, CHANNEL_MONITOR,
240 					"Wrong workload received in JSON\n");
241 				return -1;
242 			}
243 		} else if (!strcmp(key, "busy_hours")) {
244 			unsigned int i;
245 			size_t size = json_array_size(value);
246 
247 			for (i = 0; i < size; i++) {
248 				int hour = (int)json_integer_value(
249 						json_array_get(value, i));
250 				pkt->timer_policy.busy_hours[i] = hour;
251 			}
252 		} else if (!strcmp(key, "quiet_hours")) {
253 			unsigned int i;
254 			size_t size = json_array_size(value);
255 
256 			for (i = 0; i < size; i++) {
257 				int hour = (int)json_integer_value(
258 						json_array_get(value, i));
259 				pkt->timer_policy.quiet_hours[i] = hour;
260 			}
261 		} else if (!strcmp(key, "mac_list")) {
262 			unsigned int i;
263 			size_t size = json_array_size(value);
264 
265 			for (i = 0; i < size; i++) {
266 				char mac[32];
267 				strlcpy(mac,
268 					json_string_value(json_array_get(value, i)),
269 					32);
270 				set_policy_mac(pkt, i, mac);
271 			}
272 			pkt->nb_mac_to_monitor = size;
273 		} else if (!strcmp(key, "avg_packet_thresh")) {
274 			pkt->traffic_policy.avg_max_packet_thresh =
275 					(uint32_t)json_integer_value(value);
276 		} else if (!strcmp(key, "max_packet_thresh")) {
277 			pkt->traffic_policy.max_max_packet_thresh =
278 					(uint32_t)json_integer_value(value);
279 		} else if (!strcmp(key, "unit")) {
280 			char unit[32];
281 			strlcpy(unit, json_string_value(value), 32);
282 			if (!strcmp(unit, "SCALE_UP")) {
283 				pkt->unit = CPU_POWER_SCALE_UP;
284 			} else if (!strcmp(unit, "SCALE_DOWN")) {
285 				pkt->unit = CPU_POWER_SCALE_DOWN;
286 			} else if (!strcmp(unit, "SCALE_MAX")) {
287 				pkt->unit = CPU_POWER_SCALE_MAX;
288 			} else if (!strcmp(unit, "SCALE_MIN")) {
289 				pkt->unit = CPU_POWER_SCALE_MIN;
290 			} else if (!strcmp(unit, "ENABLE_TURBO")) {
291 				pkt->unit = CPU_POWER_ENABLE_TURBO;
292 			} else if (!strcmp(unit, "DISABLE_TURBO")) {
293 				pkt->unit = CPU_POWER_DISABLE_TURBO;
294 			} else {
295 				RTE_LOG(ERR, CHANNEL_MONITOR,
296 					"Invalid command received in JSON\n");
297 				return -1;
298 			}
299 		} else {
300 			RTE_LOG(ERR, CHANNEL_MONITOR,
301 				"Unknown key received in JSON string: %s\n",
302 				key);
303 		}
304 
305 		resource_id = get_resource_id_from_vmname(vm_name);
306 		if (resource_id < 0) {
307 			RTE_LOG(ERR, CHANNEL_MONITOR,
308 				"Could not get resource_id from vm_name:%s\n",
309 				vm_name);
310 			return -1;
311 		}
312 		strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ);
313 		pkt->resource_id = resource_id;
314 	}
315 	return 0;
316 }
317 #endif
318 
319 void channel_monitor_exit(void)
320 {
321 	run_loop = 0;
322 	rte_free(global_events_list);
323 }
324 
325 static void
326 core_share(int pNo, int z, int x, int t)
327 {
328 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
329 		if (strcmp(policies[pNo].pkt.vm_name,
330 				lvm_info[x].vm_name) != 0) {
331 			policies[pNo].core_share[z].status = 1;
332 			power_manager_scale_core_max(
333 					policies[pNo].core_share[z].pcpu);
334 		}
335 	}
336 }
337 
338 static void
339 core_share_status(int pNo)
340 {
341 
342 	int noVms = 0, noVcpus = 0, z, x, t;
343 
344 	get_all_vm(&noVms, &noVcpus);
345 
346 	/* Reset Core Share Status. */
347 	for (z = 0; z < noVcpus; z++)
348 		policies[pNo].core_share[z].status = 0;
349 
350 	/* Foreach vcpu in a policy. */
351 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
352 		/* Foreach VM on the platform. */
353 		for (x = 0; x < noVms; x++) {
354 			/* Foreach vcpu of VMs on platform. */
355 			for (t = 0; t < lvm_info[x].num_cpus; t++)
356 				core_share(pNo, z, x, t);
357 		}
358 	}
359 }
360 
361 
362 static int
363 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
364 {
365 	int ret = 0;
366 
367 	if (pol->pkt.policy_to_use == BRANCH_RATIO) {
368 		ci->cd[pcpu].oob_enabled = 1;
369 		ret = add_core_to_monitor(pcpu);
370 		if (ret == 0)
371 			RTE_LOG(INFO, CHANNEL_MONITOR,
372 					"Monitoring pcpu %d OOB for %s\n",
373 					pcpu, pol->pkt.vm_name);
374 		else
375 			RTE_LOG(ERR, CHANNEL_MONITOR,
376 					"Error monitoring pcpu %d OOB for %s\n",
377 					pcpu, pol->pkt.vm_name);
378 
379 	} else {
380 		pol->core_share[count].pcpu = pcpu;
381 		RTE_LOG(INFO, CHANNEL_MONITOR,
382 				"Monitoring pcpu %d for %s\n",
383 				pcpu, pol->pkt.vm_name);
384 	}
385 	return ret;
386 }
387 
388 static void
389 get_pcpu_to_control(struct policy *pol)
390 {
391 
392 	/* Convert vcpu to pcpu. */
393 	struct vm_info info;
394 	int pcpu, count;
395 	struct core_info *ci;
396 
397 	ci = get_core_info();
398 
399 	RTE_LOG(DEBUG, CHANNEL_MONITOR,
400 			"Looking for pcpu for %s\n", pol->pkt.vm_name);
401 
402 	/*
403 	 * So now that we're handling virtual and physical cores, we need to
404 	 * differenciate between them when adding them to the branch monitor.
405 	 * Virtual cores need to be converted to physical cores.
406 	 */
407 	if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
408 		/*
409 		 * If the cores in the policy are virtual, we need to map them
410 		 * to physical core. We look up the vm info and use that for
411 		 * the mapping.
412 		 */
413 		get_info_vm(pol->pkt.vm_name, &info);
414 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
415 			pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
416 			pcpu_monitor(pol, ci, pcpu, count);
417 		}
418 	} else {
419 		/*
420 		 * If the cores in the policy are physical, we just use
421 		 * those core id's directly.
422 		 */
423 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
424 			pcpu = pol->pkt.vcpu_to_control[count];
425 			pcpu_monitor(pol, ci, pcpu, count);
426 		}
427 	}
428 }
429 
430 static int
431 get_pfid(struct policy *pol)
432 {
433 
434 	int i, x, ret = 0;
435 
436 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
437 
438 		RTE_ETH_FOREACH_DEV(x) {
439 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
440 				(struct rte_ether_addr *)&(pol->pkt.vfid[i]));
441 			if (ret != -EINVAL) {
442 				pol->port[i] = x;
443 				break;
444 			}
445 		}
446 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
447 			RTE_LOG(INFO, CHANNEL_MONITOR,
448 				"Error with Policy. MAC not found on "
449 				"attached ports ");
450 			pol->enabled = 0;
451 			return ret;
452 		}
453 		pol->pfid[i] = ret;
454 	}
455 	return 1;
456 }
457 
458 static int
459 update_policy(struct channel_packet *pkt)
460 {
461 
462 	unsigned int updated = 0;
463 	unsigned int i;
464 
465 
466 	RTE_LOG(INFO, CHANNEL_MONITOR,
467 			"Applying policy for %s\n", pkt->vm_name);
468 
469 	for (i = 0; i < RTE_DIM(policies); i++) {
470 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
471 			/* Copy the contents of *pkt into the policy.pkt */
472 			policies[i].pkt = *pkt;
473 			get_pcpu_to_control(&policies[i]);
474 			/* Check Eth dev only for Traffic policy */
475 			if (policies[i].pkt.policy_to_use == TRAFFIC) {
476 				if (get_pfid(&policies[i]) < 0) {
477 					updated = 1;
478 					break;
479 				}
480 			}
481 			core_share_status(i);
482 			policies[i].enabled = 1;
483 			updated = 1;
484 		}
485 	}
486 	if (!updated) {
487 		for (i = 0; i < RTE_DIM(policies); i++) {
488 			if (policies[i].enabled == 0) {
489 				policies[i].pkt = *pkt;
490 				get_pcpu_to_control(&policies[i]);
491 				/* Check Eth dev only for Traffic policy */
492 				if (policies[i].pkt.policy_to_use == TRAFFIC) {
493 					if (get_pfid(&policies[i]) < 0) {
494 						updated = 1;
495 						break;
496 					}
497 				}
498 				core_share_status(i);
499 				policies[i].enabled = 1;
500 				break;
501 			}
502 		}
503 	}
504 	return 0;
505 }
506 
507 static int
508 remove_policy(struct channel_packet *pkt __rte_unused)
509 {
510 	unsigned int i;
511 
512 	/*
513 	 * Disabling the policy is simply a case of setting
514 	 * enabled to 0
515 	 */
516 	for (i = 0; i < RTE_DIM(policies); i++) {
517 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
518 			policies[i].enabled = 0;
519 			return 0;
520 		}
521 	}
522 	return -1;
523 }
524 
525 static uint64_t
526 get_pkt_diff(struct policy *pol)
527 {
528 
529 	uint64_t vsi_pkt_count,
530 		vsi_pkt_total = 0,
531 		vsi_pkt_count_prev_total = 0;
532 	double rdtsc_curr, rdtsc_diff, diff;
533 	int x;
534 	struct rte_eth_stats vf_stats;
535 
536 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
537 
538 		/*Read vsi stats*/
539 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
540 			vsi_pkt_count = vf_stats.ipackets;
541 		else
542 			vsi_pkt_count = -1;
543 
544 		vsi_pkt_total += vsi_pkt_count;
545 
546 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
547 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
548 	}
549 
550 	rdtsc_curr = rte_rdtsc_precise();
551 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
552 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
553 
554 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
555 			((double)rte_get_tsc_hz() / rdtsc_diff);
556 
557 	return diff;
558 }
559 
560 static void
561 apply_traffic_profile(struct policy *pol)
562 {
563 
564 	int count;
565 	uint64_t diff = 0;
566 
567 	diff = get_pkt_diff(pol);
568 
569 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
570 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
571 			if (pol->core_share[count].status != 1)
572 				power_manager_scale_core_max(
573 						pol->core_share[count].pcpu);
574 		}
575 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
576 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
577 			if (pol->core_share[count].status != 1)
578 				power_manager_scale_core_med(
579 						pol->core_share[count].pcpu);
580 		}
581 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
582 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
583 			if (pol->core_share[count].status != 1)
584 				power_manager_scale_core_min(
585 						pol->core_share[count].pcpu);
586 		}
587 	}
588 }
589 
590 static void
591 apply_time_profile(struct policy *pol)
592 {
593 
594 	int count, x;
595 	struct timeval tv;
596 	struct tm *ptm;
597 	char time_string[40];
598 
599 	/* Obtain the time of day, and convert it to a tm struct. */
600 	gettimeofday(&tv, NULL);
601 	ptm = localtime(&tv.tv_sec);
602 	/* Format the date and time, down to a single second. */
603 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
604 
605 	for (x = 0; x < HOURS; x++) {
606 
607 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
608 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
609 				if (pol->core_share[count].status != 1) {
610 					power_manager_scale_core_max(
611 						pol->core_share[count].pcpu);
612 				}
613 			}
614 			break;
615 		} else if (ptm->tm_hour ==
616 				pol->pkt.timer_policy.quiet_hours[x]) {
617 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
618 				if (pol->core_share[count].status != 1) {
619 					power_manager_scale_core_min(
620 						pol->core_share[count].pcpu);
621 			}
622 		}
623 			break;
624 		} else if (ptm->tm_hour ==
625 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
626 			apply_traffic_profile(pol);
627 			break;
628 		}
629 	}
630 }
631 
632 static void
633 apply_workload_profile(struct policy *pol)
634 {
635 
636 	int count;
637 
638 	if (pol->pkt.workload == HIGH) {
639 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
640 			if (pol->core_share[count].status != 1)
641 				power_manager_scale_core_max(
642 						pol->core_share[count].pcpu);
643 		}
644 	} else if (pol->pkt.workload == MEDIUM) {
645 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
646 			if (pol->core_share[count].status != 1)
647 				power_manager_scale_core_med(
648 						pol->core_share[count].pcpu);
649 		}
650 	} else if (pol->pkt.workload == LOW) {
651 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
652 			if (pol->core_share[count].status != 1)
653 				power_manager_scale_core_min(
654 						pol->core_share[count].pcpu);
655 		}
656 	}
657 }
658 
659 static void
660 apply_policy(struct policy *pol)
661 {
662 
663 	struct channel_packet *pkt = &pol->pkt;
664 
665 	/*Check policy to use*/
666 	if (pkt->policy_to_use == TRAFFIC)
667 		apply_traffic_profile(pol);
668 	else if (pkt->policy_to_use == TIME)
669 		apply_time_profile(pol);
670 	else if (pkt->policy_to_use == WORKLOAD)
671 		apply_workload_profile(pol);
672 }
673 
674 static int
675 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
676 {
677 	int ret;
678 
679 	if (chan_info == NULL)
680 		return -1;
681 
682 	if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
683 			CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
684 		return -1;
685 
686 	if (pkt->command == CPU_POWER) {
687 		unsigned int core_num;
688 
689 		if (pkt->core_type == CORE_TYPE_VIRTUAL)
690 			core_num = get_pcpu(chan_info, pkt->resource_id);
691 		else
692 			core_num = pkt->resource_id;
693 
694 		RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
695 			core_num);
696 
697 		switch (pkt->unit) {
698 		case(CPU_POWER_SCALE_MIN):
699 			power_manager_scale_core_min(core_num);
700 			break;
701 		case(CPU_POWER_SCALE_MAX):
702 			power_manager_scale_core_max(core_num);
703 			break;
704 		case(CPU_POWER_SCALE_DOWN):
705 			power_manager_scale_core_down(core_num);
706 			break;
707 		case(CPU_POWER_SCALE_UP):
708 			power_manager_scale_core_up(core_num);
709 			break;
710 		case(CPU_POWER_ENABLE_TURBO):
711 			power_manager_enable_turbo_core(core_num);
712 			break;
713 		case(CPU_POWER_DISABLE_TURBO):
714 			power_manager_disable_turbo_core(core_num);
715 			break;
716 		default:
717 			break;
718 		}
719 	}
720 
721 	if (pkt->command == PKT_POLICY) {
722 		RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
723 				pkt->vm_name);
724 		update_policy(pkt);
725 		policy_is_set = 1;
726 	}
727 
728 	if (pkt->command == PKT_POLICY_REMOVE) {
729 		ret = remove_policy(pkt);
730 		if (ret == 0)
731 			RTE_LOG(INFO, CHANNEL_MONITOR,
732 				 "Removed policy %s\n", pkt->vm_name);
733 		else
734 			RTE_LOG(INFO, CHANNEL_MONITOR,
735 				 "Policy %s does not exist\n", pkt->vm_name);
736 	}
737 
738 	/*
739 	 * Return is not checked as channel status may have been set to DISABLED
740 	 * from management thread
741 	 */
742 	rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
743 			CHANNEL_MGR_CHANNEL_CONNECTED);
744 	return 0;
745 
746 }
747 
748 int
749 add_channel_to_monitor(struct channel_info **chan_info)
750 {
751 	struct channel_info *info = *chan_info;
752 	struct epoll_event event;
753 
754 	event.events = EPOLLIN;
755 	event.data.ptr = info;
756 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
757 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
758 				"to epoll\n", info->channel_path);
759 		return -1;
760 	}
761 	RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
762 			"to monitor\n", info->channel_path);
763 	return 0;
764 }
765 
766 int
767 remove_channel_from_monitor(struct channel_info *chan_info)
768 {
769 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
770 			chan_info->fd, NULL) < 0) {
771 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
772 				"from epoll\n", chan_info->channel_path);
773 		return -1;
774 	}
775 	return 0;
776 }
777 
778 int
779 channel_monitor_init(void)
780 {
781 	global_event_fd = epoll_create1(0);
782 	if (global_event_fd == 0) {
783 		RTE_LOG(ERR, CHANNEL_MONITOR,
784 				"Error creating epoll context with error %s\n",
785 				strerror(errno));
786 		return -1;
787 	}
788 	global_events_list = rte_malloc("epoll_events",
789 			sizeof(*global_events_list)
790 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
791 	if (global_events_list == NULL) {
792 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
793 				"epoll events\n");
794 		return -1;
795 	}
796 	return 0;
797 }
798 
799 static void
800 read_binary_packet(struct channel_info *chan_info)
801 {
802 	struct channel_packet pkt;
803 	void *buffer = &pkt;
804 	int buffer_len = sizeof(pkt);
805 	int n_bytes, err = 0;
806 
807 	while (buffer_len > 0) {
808 		n_bytes = read(chan_info->fd,
809 				buffer, buffer_len);
810 		if (n_bytes == buffer_len)
811 			break;
812 		if (n_bytes < 0) {
813 			err = errno;
814 			RTE_LOG(DEBUG, CHANNEL_MONITOR,
815 				"Received error on "
816 				"channel '%s' read: %s\n",
817 				chan_info->channel_path,
818 				strerror(err));
819 			remove_channel(&chan_info);
820 			break;
821 		}
822 		buffer = (char *)buffer + n_bytes;
823 		buffer_len -= n_bytes;
824 	}
825 	if (!err)
826 		process_request(&pkt, chan_info);
827 }
828 
829 #ifdef USE_JANSSON
830 static void
831 read_json_packet(struct channel_info *chan_info)
832 {
833 	struct channel_packet pkt;
834 	int n_bytes, ret;
835 	json_t *root;
836 	json_error_t error;
837 	const char *resource_name;
838 	char *start, *end;
839 	uint32_t n;
840 
841 
842 	/* read opening brace to closing brace */
843 	do {
844 		int idx = 0;
845 		int indent = 0;
846 		do {
847 			n_bytes = read(chan_info->fd, &json_data[idx], 1);
848 			if (n_bytes == 0)
849 				break;
850 			if (json_data[idx] == '{')
851 				indent++;
852 			if (json_data[idx] == '}')
853 				indent--;
854 			if ((indent > 0) || (idx > 0))
855 				idx++;
856 			if (indent <= 0)
857 				json_data[idx] = 0;
858 			if (idx >= MAX_JSON_STRING_LEN-1)
859 				break;
860 		} while (indent > 0);
861 
862 		json_data[idx] = '\0';
863 
864 		if (strlen(json_data) == 0)
865 			continue;
866 
867 		printf("got [%s]\n", json_data);
868 
869 		root = json_loads(json_data, 0, &error);
870 
871 		if (root) {
872 			resource_name = get_resource_name_from_chn_path(
873 				chan_info->channel_path);
874 			/*
875 			 * Because our data is now in the json
876 			 * object, we can overwrite the pkt
877 			 * with a channel_packet struct, using
878 			 * parse_json_to_pkt()
879 			 */
880 			ret = parse_json_to_pkt(root, &pkt, resource_name);
881 			json_decref(root);
882 			if (ret) {
883 				RTE_LOG(ERR, CHANNEL_MONITOR,
884 					"Error validating JSON profile data\n");
885 				break;
886 			}
887 			start = strstr(pkt.vm_name,
888 					CHANNEL_MGR_FIFO_PATTERN_NAME);
889 			if (start != NULL) {
890 				/* move past pattern to start of fifo id */
891 				start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
892 
893 				end = start;
894 				n = (uint32_t)strtoul(start, &end, 10);
895 
896 				if (end[0] == '\0') {
897 					/* Add core id to core list */
898 					pkt.num_vcpu = 1;
899 					pkt.vcpu_to_control[0] = n;
900 					process_request(&pkt, chan_info);
901 				} else {
902 					RTE_LOG(ERR, CHANNEL_MONITOR,
903 						"Cannot extract core id from fifo name\n");
904 				}
905 			} else {
906 				process_request(&pkt, chan_info);
907 			}
908 		} else {
909 			RTE_LOG(ERR, CHANNEL_MONITOR,
910 					"JSON error on line %d: %s\n",
911 					error.line, error.text);
912 		}
913 	} while (n_bytes > 0);
914 }
915 #endif
916 
917 void
918 run_channel_monitor(void)
919 {
920 	while (run_loop) {
921 		int n_events, i;
922 
923 		n_events = epoll_wait(global_event_fd, global_events_list,
924 				MAX_EVENTS, 1);
925 		if (!run_loop)
926 			break;
927 		for (i = 0; i < n_events; i++) {
928 			struct channel_info *chan_info = (struct channel_info *)
929 					global_events_list[i].data.ptr;
930 			if ((global_events_list[i].events & EPOLLERR) ||
931 				(global_events_list[i].events & EPOLLHUP)) {
932 				RTE_LOG(INFO, CHANNEL_MONITOR,
933 						"Remote closed connection for "
934 						"channel '%s'\n",
935 						chan_info->channel_path);
936 				remove_channel(&chan_info);
937 				continue;
938 			}
939 			if (global_events_list[i].events & EPOLLIN) {
940 
941 				switch (chan_info->type) {
942 				case CHANNEL_TYPE_BINARY:
943 					read_binary_packet(chan_info);
944 					break;
945 #ifdef USE_JANSSON
946 				case CHANNEL_TYPE_JSON:
947 					read_json_packet(chan_info);
948 					break;
949 #endif
950 				default:
951 					break;
952 				}
953 			}
954 		}
955 		rte_delay_us(time_period_ms*1000);
956 		if (policy_is_set) {
957 			unsigned int j;
958 
959 			for (j = 0; j < RTE_DIM(policies); j++) {
960 				if (policies[j].enabled == 1)
961 					apply_policy(&policies[j]);
962 			}
963 		}
964 	}
965 }
966