xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision 4c2caea070fb1ceb16688870d44bd4016ce5d0e4)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32 
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39 
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41 
42 #define MAX_EVENTS 256
43 
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48 
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[MAX_CLIENTS];
55 
56 #ifdef USE_JANSSON
57 
58 union PFID {
59 	struct ether_addr addr;
60 	uint64_t pfid;
61 };
62 
63 static int
64 str_to_ether_addr(const char *a, struct ether_addr *ether_addr)
65 {
66 	int i;
67 	char *end;
68 	unsigned long o[ETHER_ADDR_LEN];
69 
70 	i = 0;
71 	do {
72 		errno = 0;
73 		o[i] = strtoul(a, &end, 16);
74 		if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75 			return -1;
76 		a = end + 1;
77 	} while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78 
79 	/* Junk at the end of line */
80 	if (end[0] != 0)
81 		return -1;
82 
83 	/* Support the format XX:XX:XX:XX:XX:XX */
84 	if (i == ETHER_ADDR_LEN) {
85 		while (i-- != 0) {
86 			if (o[i] > UINT8_MAX)
87 				return -1;
88 			ether_addr->addr_bytes[i] = (uint8_t)o[i];
89 		}
90 	/* Support the format XXXX:XXXX:XXXX */
91 	} else if (i == ETHER_ADDR_LEN / 2) {
92 		while (i-- != 0) {
93 			if (o[i] > UINT16_MAX)
94 				return -1;
95 			ether_addr->addr_bytes[i * 2] =
96 					(uint8_t)(o[i] >> 8);
97 			ether_addr->addr_bytes[i * 2 + 1] =
98 					(uint8_t)(o[i] & 0xff);
99 		}
100 	/* unknown format */
101 	} else
102 		return -1;
103 
104 	return 0;
105 }
106 
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110 	union PFID pfid;
111 	int ret;
112 
113 	/* Use port MAC address as the vfid */
114 	ret = str_to_ether_addr(mac, &pfid.addr);
115 
116 	if (ret != 0) {
117 		RTE_LOG(ERR, CHANNEL_MONITOR,
118 			"Invalid mac address received in JSON\n");
119 		pkt->vfid[idx] = 0;
120 		return -1;
121 	}
122 
123 	printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124 			"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125 			pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126 			pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127 			pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128 
129 	pkt->vfid[idx] = pfid.pfid;
130 	return 0;
131 }
132 
133 
134 static int
135 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
136 {
137 	const char *key;
138 	json_t *value;
139 	int ret;
140 
141 	memset(pkt, 0, sizeof(struct channel_packet));
142 
143 	pkt->nb_mac_to_monitor = 0;
144 	pkt->t_boost_status.tbEnabled = false;
145 	pkt->workload = LOW;
146 	pkt->policy_to_use = TIME;
147 	pkt->command = PKT_POLICY;
148 	pkt->core_type = CORE_TYPE_PHYSICAL;
149 
150 	json_object_foreach(element, key, value) {
151 		if (!strcmp(key, "policy")) {
152 			/* Recurse in to get the contents of profile */
153 			ret = parse_json_to_pkt(value, pkt);
154 			if (ret)
155 				return ret;
156 		} else if (!strcmp(key, "instruction")) {
157 			/* Recurse in to get the contents of instruction */
158 			ret = parse_json_to_pkt(value, pkt);
159 			if (ret)
160 				return ret;
161 		} else if (!strcmp(key, "name")) {
162 			strcpy(pkt->vm_name, json_string_value(value));
163 		} else if (!strcmp(key, "command")) {
164 			char command[32];
165 			strlcpy(command, json_string_value(value), 32);
166 			if (!strcmp(command, "power")) {
167 				pkt->command = CPU_POWER;
168 			} else if (!strcmp(command, "create")) {
169 				pkt->command = PKT_POLICY;
170 			} else if (!strcmp(command, "destroy")) {
171 				pkt->command = PKT_POLICY_REMOVE;
172 			} else {
173 				RTE_LOG(ERR, CHANNEL_MONITOR,
174 					"Invalid command received in JSON\n");
175 				return -1;
176 			}
177 		} else if (!strcmp(key, "policy_type")) {
178 			char command[32];
179 			strlcpy(command, json_string_value(value), 32);
180 			if (!strcmp(command, "TIME")) {
181 				pkt->policy_to_use = TIME;
182 			} else if (!strcmp(command, "TRAFFIC")) {
183 				pkt->policy_to_use = TRAFFIC;
184 			} else if (!strcmp(command, "WORKLOAD")) {
185 				pkt->policy_to_use = WORKLOAD;
186 			} else if (!strcmp(command, "BRANCH_RATIO")) {
187 				pkt->policy_to_use = BRANCH_RATIO;
188 			} else {
189 				RTE_LOG(ERR, CHANNEL_MONITOR,
190 					"Wrong policy_type received in JSON\n");
191 				return -1;
192 			}
193 		} else if (!strcmp(key, "workload")) {
194 			char command[32];
195 			strlcpy(command, json_string_value(value), 32);
196 			if (!strcmp(command, "HIGH")) {
197 				pkt->workload = HIGH;
198 			} else if (!strcmp(command, "MEDIUM")) {
199 				pkt->workload = MEDIUM;
200 			} else if (!strcmp(command, "LOW")) {
201 				pkt->workload = LOW;
202 			} else {
203 				RTE_LOG(ERR, CHANNEL_MONITOR,
204 					"Wrong workload received in JSON\n");
205 				return -1;
206 			}
207 		} else if (!strcmp(key, "busy_hours")) {
208 			unsigned int i;
209 			size_t size = json_array_size(value);
210 
211 			for (i = 0; i < size; i++) {
212 				int hour = (int)json_integer_value(
213 						json_array_get(value, i));
214 				pkt->timer_policy.busy_hours[i] = hour;
215 			}
216 		} else if (!strcmp(key, "quiet_hours")) {
217 			unsigned int i;
218 			size_t size = json_array_size(value);
219 
220 			for (i = 0; i < size; i++) {
221 				int hour = (int)json_integer_value(
222 						json_array_get(value, i));
223 				pkt->timer_policy.quiet_hours[i] = hour;
224 			}
225 		} else if (!strcmp(key, "core_list")) {
226 			unsigned int i;
227 			size_t size = json_array_size(value);
228 
229 			for (i = 0; i < size; i++) {
230 				int core = (int)json_integer_value(
231 						json_array_get(value, i));
232 				pkt->vcpu_to_control[i] = core;
233 			}
234 			pkt->num_vcpu = size;
235 		} else if (!strcmp(key, "mac_list")) {
236 			unsigned int i;
237 			size_t size = json_array_size(value);
238 
239 			for (i = 0; i < size; i++) {
240 				char mac[32];
241 				strlcpy(mac,
242 					json_string_value(json_array_get(value, i)),
243 					32);
244 				set_policy_mac(pkt, i, mac);
245 			}
246 			pkt->nb_mac_to_monitor = size;
247 		} else if (!strcmp(key, "avg_packet_thresh")) {
248 			pkt->traffic_policy.avg_max_packet_thresh =
249 					(uint32_t)json_integer_value(value);
250 		} else if (!strcmp(key, "max_packet_thresh")) {
251 			pkt->traffic_policy.max_max_packet_thresh =
252 					(uint32_t)json_integer_value(value);
253 		} else if (!strcmp(key, "unit")) {
254 			char unit[32];
255 			strlcpy(unit, json_string_value(value), 32);
256 			if (!strcmp(unit, "SCALE_UP")) {
257 				pkt->unit = CPU_POWER_SCALE_UP;
258 			} else if (!strcmp(unit, "SCALE_DOWN")) {
259 				pkt->unit = CPU_POWER_SCALE_DOWN;
260 			} else if (!strcmp(unit, "SCALE_MAX")) {
261 				pkt->unit = CPU_POWER_SCALE_MAX;
262 			} else if (!strcmp(unit, "SCALE_MIN")) {
263 				pkt->unit = CPU_POWER_SCALE_MIN;
264 			} else if (!strcmp(unit, "ENABLE_TURBO")) {
265 				pkt->unit = CPU_POWER_ENABLE_TURBO;
266 			} else if (!strcmp(unit, "DISABLE_TURBO")) {
267 				pkt->unit = CPU_POWER_DISABLE_TURBO;
268 			} else {
269 				RTE_LOG(ERR, CHANNEL_MONITOR,
270 					"Invalid command received in JSON\n");
271 				return -1;
272 			}
273 		} else if (!strcmp(key, "resource_id")) {
274 			pkt->resource_id = (uint32_t)json_integer_value(value);
275 		} else {
276 			RTE_LOG(ERR, CHANNEL_MONITOR,
277 				"Unknown key received in JSON string: %s\n",
278 				key);
279 		}
280 	}
281 	return 0;
282 }
283 #endif
284 
285 void channel_monitor_exit(void)
286 {
287 	run_loop = 0;
288 	rte_free(global_events_list);
289 }
290 
291 static void
292 core_share(int pNo, int z, int x, int t)
293 {
294 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
295 		if (strcmp(policies[pNo].pkt.vm_name,
296 				lvm_info[x].vm_name) != 0) {
297 			policies[pNo].core_share[z].status = 1;
298 			power_manager_scale_core_max(
299 					policies[pNo].core_share[z].pcpu);
300 		}
301 	}
302 }
303 
304 static void
305 core_share_status(int pNo)
306 {
307 
308 	int noVms = 0, noVcpus = 0, z, x, t;
309 
310 	get_all_vm(&noVms, &noVcpus);
311 
312 	/* Reset Core Share Status. */
313 	for (z = 0; z < noVcpus; z++)
314 		policies[pNo].core_share[z].status = 0;
315 
316 	/* Foreach vcpu in a policy. */
317 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
318 		/* Foreach VM on the platform. */
319 		for (x = 0; x < noVms; x++) {
320 			/* Foreach vcpu of VMs on platform. */
321 			for (t = 0; t < lvm_info[x].num_cpus; t++)
322 				core_share(pNo, z, x, t);
323 		}
324 	}
325 }
326 
327 
328 static int
329 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
330 {
331 	int ret = 0;
332 
333 	if (pol->pkt.policy_to_use == BRANCH_RATIO) {
334 		ci->cd[pcpu].oob_enabled = 1;
335 		ret = add_core_to_monitor(pcpu);
336 		if (ret == 0)
337 			RTE_LOG(INFO, CHANNEL_MONITOR,
338 					"Monitoring pcpu %d OOB for %s\n",
339 					pcpu, pol->pkt.vm_name);
340 		else
341 			RTE_LOG(ERR, CHANNEL_MONITOR,
342 					"Error monitoring pcpu %d OOB for %s\n",
343 					pcpu, pol->pkt.vm_name);
344 
345 	} else {
346 		pol->core_share[count].pcpu = pcpu;
347 		RTE_LOG(INFO, CHANNEL_MONITOR,
348 				"Monitoring pcpu %d for %s\n",
349 				pcpu, pol->pkt.vm_name);
350 	}
351 	return ret;
352 }
353 
354 static void
355 get_pcpu_to_control(struct policy *pol)
356 {
357 
358 	/* Convert vcpu to pcpu. */
359 	struct vm_info info;
360 	int pcpu, count;
361 	struct core_info *ci;
362 
363 	ci = get_core_info();
364 
365 	RTE_LOG(DEBUG, CHANNEL_MONITOR,
366 			"Looking for pcpu for %s\n", pol->pkt.vm_name);
367 
368 	/*
369 	 * So now that we're handling virtual and physical cores, we need to
370 	 * differenciate between them when adding them to the branch monitor.
371 	 * Virtual cores need to be converted to physical cores.
372 	 */
373 	if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
374 		/*
375 		 * If the cores in the policy are virtual, we need to map them
376 		 * to physical core. We look up the vm info and use that for
377 		 * the mapping.
378 		 */
379 		get_info_vm(pol->pkt.vm_name, &info);
380 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
381 			pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
382 			pcpu_monitor(pol, ci, pcpu, count);
383 		}
384 	} else {
385 		/*
386 		 * If the cores in the policy are physical, we just use
387 		 * those core id's directly.
388 		 */
389 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
390 			pcpu = pol->pkt.vcpu_to_control[count];
391 			pcpu_monitor(pol, ci, pcpu, count);
392 		}
393 	}
394 }
395 
396 static int
397 get_pfid(struct policy *pol)
398 {
399 
400 	int i, x, ret = 0;
401 
402 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
403 
404 		RTE_ETH_FOREACH_DEV(x) {
405 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
406 				(struct ether_addr *)&(pol->pkt.vfid[i]));
407 			if (ret != -EINVAL) {
408 				pol->port[i] = x;
409 				break;
410 			}
411 		}
412 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
413 			RTE_LOG(INFO, CHANNEL_MONITOR,
414 				"Error with Policy. MAC not found on "
415 				"attached ports ");
416 			pol->enabled = 0;
417 			return ret;
418 		}
419 		pol->pfid[i] = ret;
420 	}
421 	return 1;
422 }
423 
424 static int
425 update_policy(struct channel_packet *pkt)
426 {
427 
428 	unsigned int updated = 0;
429 	int i;
430 
431 
432 	RTE_LOG(INFO, CHANNEL_MONITOR,
433 			"Applying policy for %s\n", pkt->vm_name);
434 
435 	for (i = 0; i < MAX_CLIENTS; i++) {
436 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
437 			/* Copy the contents of *pkt into the policy.pkt */
438 			policies[i].pkt = *pkt;
439 			get_pcpu_to_control(&policies[i]);
440 			/* Check Eth dev only for Traffic policy */
441 			if (policies[i].pkt.policy_to_use == TRAFFIC) {
442 				if (get_pfid(&policies[i]) < 0) {
443 					updated = 1;
444 					break;
445 				}
446 			}
447 			core_share_status(i);
448 			policies[i].enabled = 1;
449 			updated = 1;
450 		}
451 	}
452 	if (!updated) {
453 		for (i = 0; i < MAX_CLIENTS; i++) {
454 			if (policies[i].enabled == 0) {
455 				policies[i].pkt = *pkt;
456 				get_pcpu_to_control(&policies[i]);
457 				/* Check Eth dev only for Traffic policy */
458 				if (policies[i].pkt.policy_to_use == TRAFFIC) {
459 					if (get_pfid(&policies[i]) < 0) {
460 						updated = 1;
461 						break;
462 					}
463 				}
464 				core_share_status(i);
465 				policies[i].enabled = 1;
466 				break;
467 			}
468 		}
469 	}
470 	return 0;
471 }
472 
473 static int
474 remove_policy(struct channel_packet *pkt __rte_unused)
475 {
476 	int i;
477 
478 	/*
479 	 * Disabling the policy is simply a case of setting
480 	 * enabled to 0
481 	 */
482 	for (i = 0; i < MAX_CLIENTS; i++) {
483 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
484 			policies[i].enabled = 0;
485 			return 0;
486 		}
487 	}
488 	return -1;
489 }
490 
491 static uint64_t
492 get_pkt_diff(struct policy *pol)
493 {
494 
495 	uint64_t vsi_pkt_count,
496 		vsi_pkt_total = 0,
497 		vsi_pkt_count_prev_total = 0;
498 	double rdtsc_curr, rdtsc_diff, diff;
499 	int x;
500 	struct rte_eth_stats vf_stats;
501 
502 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
503 
504 		/*Read vsi stats*/
505 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
506 			vsi_pkt_count = vf_stats.ipackets;
507 		else
508 			vsi_pkt_count = -1;
509 
510 		vsi_pkt_total += vsi_pkt_count;
511 
512 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
513 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
514 	}
515 
516 	rdtsc_curr = rte_rdtsc_precise();
517 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
518 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
519 
520 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
521 			((double)rte_get_tsc_hz() / rdtsc_diff);
522 
523 	return diff;
524 }
525 
526 static void
527 apply_traffic_profile(struct policy *pol)
528 {
529 
530 	int count;
531 	uint64_t diff = 0;
532 
533 	diff = get_pkt_diff(pol);
534 
535 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
536 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
537 			if (pol->core_share[count].status != 1)
538 				power_manager_scale_core_max(
539 						pol->core_share[count].pcpu);
540 		}
541 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
542 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
543 			if (pol->core_share[count].status != 1)
544 				power_manager_scale_core_med(
545 						pol->core_share[count].pcpu);
546 		}
547 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
548 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
549 			if (pol->core_share[count].status != 1)
550 				power_manager_scale_core_min(
551 						pol->core_share[count].pcpu);
552 		}
553 	}
554 }
555 
556 static void
557 apply_time_profile(struct policy *pol)
558 {
559 
560 	int count, x;
561 	struct timeval tv;
562 	struct tm *ptm;
563 	char time_string[40];
564 
565 	/* Obtain the time of day, and convert it to a tm struct. */
566 	gettimeofday(&tv, NULL);
567 	ptm = localtime(&tv.tv_sec);
568 	/* Format the date and time, down to a single second. */
569 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
570 
571 	for (x = 0; x < HOURS; x++) {
572 
573 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
574 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
575 				if (pol->core_share[count].status != 1) {
576 					power_manager_scale_core_max(
577 						pol->core_share[count].pcpu);
578 				}
579 			}
580 			break;
581 		} else if (ptm->tm_hour ==
582 				pol->pkt.timer_policy.quiet_hours[x]) {
583 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
584 				if (pol->core_share[count].status != 1) {
585 					power_manager_scale_core_min(
586 						pol->core_share[count].pcpu);
587 			}
588 		}
589 			break;
590 		} else if (ptm->tm_hour ==
591 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
592 			apply_traffic_profile(pol);
593 			break;
594 		}
595 	}
596 }
597 
598 static void
599 apply_workload_profile(struct policy *pol)
600 {
601 
602 	int count;
603 
604 	if (pol->pkt.workload == HIGH) {
605 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
606 			if (pol->core_share[count].status != 1)
607 				power_manager_scale_core_max(
608 						pol->core_share[count].pcpu);
609 		}
610 	} else if (pol->pkt.workload == MEDIUM) {
611 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
612 			if (pol->core_share[count].status != 1)
613 				power_manager_scale_core_med(
614 						pol->core_share[count].pcpu);
615 		}
616 	} else if (pol->pkt.workload == LOW) {
617 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
618 			if (pol->core_share[count].status != 1)
619 				power_manager_scale_core_min(
620 						pol->core_share[count].pcpu);
621 		}
622 	}
623 }
624 
625 static void
626 apply_policy(struct policy *pol)
627 {
628 
629 	struct channel_packet *pkt = &pol->pkt;
630 
631 	/*Check policy to use*/
632 	if (pkt->policy_to_use == TRAFFIC)
633 		apply_traffic_profile(pol);
634 	else if (pkt->policy_to_use == TIME)
635 		apply_time_profile(pol);
636 	else if (pkt->policy_to_use == WORKLOAD)
637 		apply_workload_profile(pol);
638 }
639 
640 static int
641 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
642 {
643 	int ret;
644 
645 	if (chan_info == NULL)
646 		return -1;
647 
648 	if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
649 			CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
650 		return -1;
651 
652 	if (pkt->command == CPU_POWER) {
653 		unsigned int core_num;
654 
655 		if (pkt->core_type == CORE_TYPE_VIRTUAL)
656 			core_num = get_pcpu(chan_info, pkt->resource_id);
657 		else
658 			core_num = pkt->resource_id;
659 
660 		RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
661 			core_num);
662 
663 		switch (pkt->unit) {
664 		case(CPU_POWER_SCALE_MIN):
665 			power_manager_scale_core_min(core_num);
666 			break;
667 		case(CPU_POWER_SCALE_MAX):
668 			power_manager_scale_core_max(core_num);
669 			break;
670 		case(CPU_POWER_SCALE_DOWN):
671 			power_manager_scale_core_down(core_num);
672 			break;
673 		case(CPU_POWER_SCALE_UP):
674 			power_manager_scale_core_up(core_num);
675 			break;
676 		case(CPU_POWER_ENABLE_TURBO):
677 			power_manager_enable_turbo_core(core_num);
678 			break;
679 		case(CPU_POWER_DISABLE_TURBO):
680 			power_manager_disable_turbo_core(core_num);
681 			break;
682 		default:
683 			break;
684 		}
685 	}
686 
687 	if (pkt->command == PKT_POLICY) {
688 		RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
689 				pkt->vm_name);
690 		update_policy(pkt);
691 		policy_is_set = 1;
692 	}
693 
694 	if (pkt->command == PKT_POLICY_REMOVE) {
695 		ret = remove_policy(pkt);
696 		if (ret == 0)
697 			RTE_LOG(INFO, CHANNEL_MONITOR,
698 				 "Removed policy %s\n", pkt->vm_name);
699 		else
700 			RTE_LOG(INFO, CHANNEL_MONITOR,
701 				 "Policy %s does not exist\n", pkt->vm_name);
702 	}
703 
704 	/*
705 	 * Return is not checked as channel status may have been set to DISABLED
706 	 * from management thread
707 	 */
708 	rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
709 			CHANNEL_MGR_CHANNEL_CONNECTED);
710 	return 0;
711 
712 }
713 
714 int
715 add_channel_to_monitor(struct channel_info **chan_info)
716 {
717 	struct channel_info *info = *chan_info;
718 	struct epoll_event event;
719 
720 	event.events = EPOLLIN;
721 	event.data.ptr = info;
722 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
723 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
724 				"to epoll\n", info->channel_path);
725 		return -1;
726 	}
727 	RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
728 			"to monitor\n", info->channel_path);
729 	return 0;
730 }
731 
732 int
733 remove_channel_from_monitor(struct channel_info *chan_info)
734 {
735 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
736 			chan_info->fd, NULL) < 0) {
737 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
738 				"from epoll\n", chan_info->channel_path);
739 		return -1;
740 	}
741 	return 0;
742 }
743 
744 int
745 channel_monitor_init(void)
746 {
747 	global_event_fd = epoll_create1(0);
748 	if (global_event_fd == 0) {
749 		RTE_LOG(ERR, CHANNEL_MONITOR,
750 				"Error creating epoll context with error %s\n",
751 				strerror(errno));
752 		return -1;
753 	}
754 	global_events_list = rte_malloc("epoll_events",
755 			sizeof(*global_events_list)
756 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
757 	if (global_events_list == NULL) {
758 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
759 				"epoll events\n");
760 		return -1;
761 	}
762 	return 0;
763 }
764 
765 static void
766 read_binary_packet(struct channel_info *chan_info)
767 {
768 	struct channel_packet pkt;
769 	void *buffer = &pkt;
770 	int buffer_len = sizeof(pkt);
771 	int n_bytes, err = 0;
772 
773 	while (buffer_len > 0) {
774 		n_bytes = read(chan_info->fd,
775 				buffer, buffer_len);
776 		if (n_bytes == buffer_len)
777 			break;
778 		if (n_bytes < 0) {
779 			err = errno;
780 			RTE_LOG(DEBUG, CHANNEL_MONITOR,
781 				"Received error on "
782 				"channel '%s' read: %s\n",
783 				chan_info->channel_path,
784 				strerror(err));
785 			remove_channel(&chan_info);
786 			break;
787 		}
788 		buffer = (char *)buffer + n_bytes;
789 		buffer_len -= n_bytes;
790 	}
791 	if (!err)
792 		process_request(&pkt, chan_info);
793 }
794 
795 #ifdef USE_JANSSON
796 static void
797 read_json_packet(struct channel_info *chan_info)
798 {
799 	struct channel_packet pkt;
800 	int n_bytes, ret;
801 	json_t *root;
802 	json_error_t error;
803 
804 	/* read opening brace to closing brace */
805 	do {
806 		int idx = 0;
807 		int indent = 0;
808 		do {
809 			n_bytes = read(chan_info->fd, &json_data[idx], 1);
810 			if (n_bytes == 0)
811 				break;
812 			if (json_data[idx] == '{')
813 				indent++;
814 			if (json_data[idx] == '}')
815 				indent--;
816 			if ((indent > 0) || (idx > 0))
817 				idx++;
818 			if (indent <= 0)
819 				json_data[idx] = 0;
820 			if (idx >= MAX_JSON_STRING_LEN-1)
821 				break;
822 		} while (indent > 0);
823 
824 		if (indent > 0)
825 			/*
826 			 * We've broken out of the read loop without getting
827 			 * a closing brace, so throw away the data
828 			 */
829 			json_data[idx] = 0;
830 
831 		if (strlen(json_data) == 0)
832 			continue;
833 
834 		printf("got [%s]\n", json_data);
835 
836 		root = json_loads(json_data, 0, &error);
837 
838 		if (root) {
839 			/*
840 			 * Because our data is now in the json
841 			 * object, we can overwrite the pkt
842 			 * with a channel_packet struct, using
843 			 * parse_json_to_pkt()
844 			 */
845 			ret = parse_json_to_pkt(root, &pkt);
846 			json_decref(root);
847 			if (ret) {
848 				RTE_LOG(ERR, CHANNEL_MONITOR,
849 					"Error validating JSON profile data\n");
850 				break;
851 			}
852 			process_request(&pkt, chan_info);
853 		} else {
854 			RTE_LOG(ERR, CHANNEL_MONITOR,
855 					"JSON error on line %d: %s\n",
856 					error.line, error.text);
857 		}
858 	} while (n_bytes > 0);
859 }
860 #endif
861 
862 void
863 run_channel_monitor(void)
864 {
865 	while (run_loop) {
866 		int n_events, i;
867 
868 		n_events = epoll_wait(global_event_fd, global_events_list,
869 				MAX_EVENTS, 1);
870 		if (!run_loop)
871 			break;
872 		for (i = 0; i < n_events; i++) {
873 			struct channel_info *chan_info = (struct channel_info *)
874 					global_events_list[i].data.ptr;
875 			if ((global_events_list[i].events & EPOLLERR) ||
876 				(global_events_list[i].events & EPOLLHUP)) {
877 				RTE_LOG(INFO, CHANNEL_MONITOR,
878 						"Remote closed connection for "
879 						"channel '%s'\n",
880 						chan_info->channel_path);
881 				remove_channel(&chan_info);
882 				continue;
883 			}
884 			if (global_events_list[i].events & EPOLLIN) {
885 
886 				switch (chan_info->type) {
887 				case CHANNEL_TYPE_BINARY:
888 					read_binary_packet(chan_info);
889 					break;
890 #ifdef USE_JANSSON
891 				case CHANNEL_TYPE_JSON:
892 					read_json_packet(chan_info);
893 					break;
894 #endif
895 				default:
896 					break;
897 				}
898 			}
899 		}
900 		rte_delay_us(time_period_ms*1000);
901 		if (policy_is_set) {
902 			int j;
903 
904 			for (j = 0; j < MAX_CLIENTS; j++) {
905 				if (policies[j].enabled == 1)
906 					apply_policy(&policies[j]);
907 			}
908 		}
909 	}
910 }
911