xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision 5ecb687a5698d2d8ec1f3b3b5a7a16bceca3e29c)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32 
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39 
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41 
42 #define MAX_EVENTS 256
43 
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48 
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[MAX_CLIENTS];
55 
56 #ifdef USE_JANSSON
57 
58 union PFID {
59 	struct ether_addr addr;
60 	uint64_t pfid;
61 };
62 
63 static int
64 str_to_ether_addr(const char *a, struct ether_addr *ether_addr)
65 {
66 	int i;
67 	char *end;
68 	unsigned long o[ETHER_ADDR_LEN];
69 
70 	i = 0;
71 	do {
72 		errno = 0;
73 		o[i] = strtoul(a, &end, 16);
74 		if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75 			return -1;
76 		a = end + 1;
77 	} while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78 
79 	/* Junk at the end of line */
80 	if (end[0] != 0)
81 		return -1;
82 
83 	/* Support the format XX:XX:XX:XX:XX:XX */
84 	if (i == ETHER_ADDR_LEN) {
85 		while (i-- != 0) {
86 			if (o[i] > UINT8_MAX)
87 				return -1;
88 			ether_addr->addr_bytes[i] = (uint8_t)o[i];
89 		}
90 	/* Support the format XXXX:XXXX:XXXX */
91 	} else if (i == ETHER_ADDR_LEN / 2) {
92 		while (i-- != 0) {
93 			if (o[i] > UINT16_MAX)
94 				return -1;
95 			ether_addr->addr_bytes[i * 2] =
96 					(uint8_t)(o[i] >> 8);
97 			ether_addr->addr_bytes[i * 2 + 1] =
98 					(uint8_t)(o[i] & 0xff);
99 		}
100 	/* unknown format */
101 	} else
102 		return -1;
103 
104 	return 0;
105 }
106 
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110 	union PFID pfid;
111 	int ret;
112 
113 	/* Use port MAC address as the vfid */
114 	ret = str_to_ether_addr(mac, &pfid.addr);
115 
116 	if (ret != 0) {
117 		RTE_LOG(ERR, CHANNEL_MONITOR,
118 			"Invalid mac address received in JSON\n");
119 		pkt->vfid[idx] = 0;
120 		return -1;
121 	}
122 
123 	printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124 			"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125 			pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126 			pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127 			pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128 
129 	pkt->vfid[idx] = pfid.pfid;
130 	return 0;
131 }
132 
133 
134 static int
135 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
136 {
137 	const char *key;
138 	json_t *value;
139 	int ret;
140 
141 	memset(pkt, 0, sizeof(struct channel_packet));
142 
143 	pkt->nb_mac_to_monitor = 0;
144 	pkt->t_boost_status.tbEnabled = false;
145 	pkt->workload = LOW;
146 	pkt->policy_to_use = TIME;
147 	pkt->command = PKT_POLICY;
148 	pkt->core_type = CORE_TYPE_PHYSICAL;
149 
150 	json_object_foreach(element, key, value) {
151 		if (!strcmp(key, "policy")) {
152 			/* Recurse in to get the contents of profile */
153 			ret = parse_json_to_pkt(value, pkt);
154 			if (ret)
155 				return ret;
156 		} else if (!strcmp(key, "instruction")) {
157 			/* Recurse in to get the contents of instruction */
158 			ret = parse_json_to_pkt(value, pkt);
159 			if (ret)
160 				return ret;
161 		} else if (!strcmp(key, "name")) {
162 			strcpy(pkt->vm_name, json_string_value(value));
163 		} else if (!strcmp(key, "command")) {
164 			char command[32];
165 			strlcpy(command, json_string_value(value), 32);
166 			if (!strcmp(command, "power")) {
167 				pkt->command = CPU_POWER;
168 			} else if (!strcmp(command, "create")) {
169 				pkt->command = PKT_POLICY;
170 			} else if (!strcmp(command, "destroy")) {
171 				pkt->command = PKT_POLICY_REMOVE;
172 			} else {
173 				RTE_LOG(ERR, CHANNEL_MONITOR,
174 					"Invalid command received in JSON\n");
175 				return -1;
176 			}
177 		} else if (!strcmp(key, "policy_type")) {
178 			char command[32];
179 			strlcpy(command, json_string_value(value), 32);
180 			if (!strcmp(command, "TIME")) {
181 				pkt->policy_to_use = TIME;
182 			} else if (!strcmp(command, "TRAFFIC")) {
183 				pkt->policy_to_use = TRAFFIC;
184 			} else if (!strcmp(command, "WORKLOAD")) {
185 				pkt->policy_to_use = WORKLOAD;
186 			} else if (!strcmp(command, "BRANCH_RATIO")) {
187 				pkt->policy_to_use = BRANCH_RATIO;
188 			} else {
189 				RTE_LOG(ERR, CHANNEL_MONITOR,
190 					"Wrong policy_type received in JSON\n");
191 				return -1;
192 			}
193 		} else if (!strcmp(key, "workload")) {
194 			char command[32];
195 			strlcpy(command, json_string_value(value), 32);
196 			if (!strcmp(command, "HIGH")) {
197 				pkt->workload = HIGH;
198 			} else if (!strcmp(command, "MEDIUM")) {
199 				pkt->workload = MEDIUM;
200 			} else if (!strcmp(command, "LOW")) {
201 				pkt->workload = LOW;
202 			} else {
203 				RTE_LOG(ERR, CHANNEL_MONITOR,
204 					"Wrong workload received in JSON\n");
205 				return -1;
206 			}
207 		} else if (!strcmp(key, "busy_hours")) {
208 			unsigned int i;
209 			size_t size = json_array_size(value);
210 
211 			for (i = 0; i < size; i++) {
212 				int hour = (int)json_integer_value(
213 						json_array_get(value, i));
214 				pkt->timer_policy.busy_hours[i] = hour;
215 			}
216 		} else if (!strcmp(key, "quiet_hours")) {
217 			unsigned int i;
218 			size_t size = json_array_size(value);
219 
220 			for (i = 0; i < size; i++) {
221 				int hour = (int)json_integer_value(
222 						json_array_get(value, i));
223 				pkt->timer_policy.quiet_hours[i] = hour;
224 			}
225 		} else if (!strcmp(key, "core_list")) {
226 			unsigned int i;
227 			size_t size = json_array_size(value);
228 
229 			for (i = 0; i < size; i++) {
230 				int core = (int)json_integer_value(
231 						json_array_get(value, i));
232 				pkt->vcpu_to_control[i] = core;
233 			}
234 			pkt->num_vcpu = size;
235 		} else if (!strcmp(key, "mac_list")) {
236 			unsigned int i;
237 			size_t size = json_array_size(value);
238 
239 			for (i = 0; i < size; i++) {
240 				char mac[32];
241 				strlcpy(mac,
242 					json_string_value(json_array_get(value, i)),
243 					32);
244 				set_policy_mac(pkt, i, mac);
245 			}
246 			pkt->nb_mac_to_monitor = size;
247 		} else if (!strcmp(key, "avg_packet_thresh")) {
248 			pkt->traffic_policy.avg_max_packet_thresh =
249 					(uint32_t)json_integer_value(value);
250 		} else if (!strcmp(key, "max_packet_thresh")) {
251 			pkt->traffic_policy.max_max_packet_thresh =
252 					(uint32_t)json_integer_value(value);
253 		} else if (!strcmp(key, "unit")) {
254 			char unit[32];
255 			strlcpy(unit, json_string_value(value), 32);
256 			if (!strcmp(unit, "SCALE_UP")) {
257 				pkt->unit = CPU_POWER_SCALE_UP;
258 			} else if (!strcmp(unit, "SCALE_DOWN")) {
259 				pkt->unit = CPU_POWER_SCALE_DOWN;
260 			} else if (!strcmp(unit, "SCALE_MAX")) {
261 				pkt->unit = CPU_POWER_SCALE_MAX;
262 			} else if (!strcmp(unit, "SCALE_MIN")) {
263 				pkt->unit = CPU_POWER_SCALE_MIN;
264 			} else if (!strcmp(unit, "ENABLE_TURBO")) {
265 				pkt->unit = CPU_POWER_ENABLE_TURBO;
266 			} else if (!strcmp(unit, "DISABLE_TURBO")) {
267 				pkt->unit = CPU_POWER_DISABLE_TURBO;
268 			} else {
269 				RTE_LOG(ERR, CHANNEL_MONITOR,
270 					"Invalid command received in JSON\n");
271 				return -1;
272 			}
273 		} else if (!strcmp(key, "resource_id")) {
274 			pkt->resource_id = (uint32_t)json_integer_value(value);
275 		} else {
276 			RTE_LOG(ERR, CHANNEL_MONITOR,
277 				"Unknown key received in JSON string: %s\n",
278 				key);
279 		}
280 	}
281 	return 0;
282 }
283 #endif
284 
285 void channel_monitor_exit(void)
286 {
287 	run_loop = 0;
288 	rte_free(global_events_list);
289 }
290 
291 static void
292 core_share(int pNo, int z, int x, int t)
293 {
294 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
295 		if (strcmp(policies[pNo].pkt.vm_name,
296 				lvm_info[x].vm_name) != 0) {
297 			policies[pNo].core_share[z].status = 1;
298 			power_manager_scale_core_max(
299 					policies[pNo].core_share[z].pcpu);
300 		}
301 	}
302 }
303 
304 static void
305 core_share_status(int pNo)
306 {
307 
308 	int noVms = 0, noVcpus = 0, z, x, t;
309 
310 	get_all_vm(&noVms, &noVcpus);
311 
312 	/* Reset Core Share Status. */
313 	for (z = 0; z < noVcpus; z++)
314 		policies[pNo].core_share[z].status = 0;
315 
316 	/* Foreach vcpu in a policy. */
317 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
318 		/* Foreach VM on the platform. */
319 		for (x = 0; x < noVms; x++) {
320 			/* Foreach vcpu of VMs on platform. */
321 			for (t = 0; t < lvm_info[x].num_cpus; t++)
322 				core_share(pNo, z, x, t);
323 		}
324 	}
325 }
326 
327 
328 static int
329 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
330 {
331 	int ret = 0;
332 
333 	if (pol->pkt.policy_to_use == BRANCH_RATIO) {
334 		ci->cd[pcpu].oob_enabled = 1;
335 		ret = add_core_to_monitor(pcpu);
336 		if (ret == 0)
337 			RTE_LOG(INFO, CHANNEL_MONITOR,
338 					"Monitoring pcpu %d OOB for %s\n",
339 					pcpu, pol->pkt.vm_name);
340 		else
341 			RTE_LOG(ERR, CHANNEL_MONITOR,
342 					"Error monitoring pcpu %d OOB for %s\n",
343 					pcpu, pol->pkt.vm_name);
344 
345 	} else {
346 		pol->core_share[count].pcpu = pcpu;
347 		RTE_LOG(INFO, CHANNEL_MONITOR,
348 				"Monitoring pcpu %d for %s\n",
349 				pcpu, pol->pkt.vm_name);
350 	}
351 	return ret;
352 }
353 
354 static void
355 get_pcpu_to_control(struct policy *pol)
356 {
357 
358 	/* Convert vcpu to pcpu. */
359 	struct vm_info info;
360 	int pcpu, count;
361 	struct core_info *ci;
362 
363 	ci = get_core_info();
364 
365 	RTE_LOG(DEBUG, CHANNEL_MONITOR,
366 			"Looking for pcpu for %s\n", pol->pkt.vm_name);
367 
368 	/*
369 	 * So now that we're handling virtual and physical cores, we need to
370 	 * differenciate between them when adding them to the branch monitor.
371 	 * Virtual cores need to be converted to physical cores.
372 	 */
373 	if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
374 		/*
375 		 * If the cores in the policy are virtual, we need to map them
376 		 * to physical core. We look up the vm info and use that for
377 		 * the mapping.
378 		 */
379 		get_info_vm(pol->pkt.vm_name, &info);
380 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
381 			pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
382 			pcpu_monitor(pol, ci, pcpu, count);
383 		}
384 	} else {
385 		/*
386 		 * If the cores in the policy are physical, we just use
387 		 * those core id's directly.
388 		 */
389 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
390 			pcpu = pol->pkt.vcpu_to_control[count];
391 			pcpu_monitor(pol, ci, pcpu, count);
392 		}
393 	}
394 }
395 
396 static int
397 get_pfid(struct policy *pol)
398 {
399 
400 	int i, x, ret = 0;
401 
402 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
403 
404 		RTE_ETH_FOREACH_DEV(x) {
405 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
406 				(struct ether_addr *)&(pol->pkt.vfid[i]));
407 			if (ret != -EINVAL) {
408 				pol->port[i] = x;
409 				break;
410 			}
411 		}
412 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
413 			RTE_LOG(INFO, CHANNEL_MONITOR,
414 				"Error with Policy. MAC not found on "
415 				"attached ports ");
416 			pol->enabled = 0;
417 			return ret;
418 		}
419 		pol->pfid[i] = ret;
420 	}
421 	return 1;
422 }
423 
424 static int
425 update_policy(struct channel_packet *pkt)
426 {
427 
428 	unsigned int updated = 0;
429 	int i;
430 
431 
432 	RTE_LOG(INFO, CHANNEL_MONITOR,
433 			"Applying policy for %s\n", pkt->vm_name);
434 
435 	for (i = 0; i < MAX_CLIENTS; i++) {
436 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
437 			/* Copy the contents of *pkt into the policy.pkt */
438 			policies[i].pkt = *pkt;
439 			get_pcpu_to_control(&policies[i]);
440 			if (get_pfid(&policies[i]) < 0) {
441 				updated = 1;
442 				break;
443 			}
444 			core_share_status(i);
445 			policies[i].enabled = 1;
446 			updated = 1;
447 		}
448 	}
449 	if (!updated) {
450 		for (i = 0; i < MAX_CLIENTS; i++) {
451 			if (policies[i].enabled == 0) {
452 				policies[i].pkt = *pkt;
453 				get_pcpu_to_control(&policies[i]);
454 				if (get_pfid(&policies[i]) < 0)
455 					break;
456 				core_share_status(i);
457 				policies[i].enabled = 1;
458 				break;
459 			}
460 		}
461 	}
462 	return 0;
463 }
464 
465 static int
466 remove_policy(struct channel_packet *pkt __rte_unused)
467 {
468 	int i;
469 
470 	/*
471 	 * Disabling the policy is simply a case of setting
472 	 * enabled to 0
473 	 */
474 	for (i = 0; i < MAX_CLIENTS; i++) {
475 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
476 			policies[i].enabled = 0;
477 			return 0;
478 		}
479 	}
480 	return -1;
481 }
482 
483 static uint64_t
484 get_pkt_diff(struct policy *pol)
485 {
486 
487 	uint64_t vsi_pkt_count,
488 		vsi_pkt_total = 0,
489 		vsi_pkt_count_prev_total = 0;
490 	double rdtsc_curr, rdtsc_diff, diff;
491 	int x;
492 	struct rte_eth_stats vf_stats;
493 
494 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
495 
496 		/*Read vsi stats*/
497 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
498 			vsi_pkt_count = vf_stats.ipackets;
499 		else
500 			vsi_pkt_count = -1;
501 
502 		vsi_pkt_total += vsi_pkt_count;
503 
504 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
505 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
506 	}
507 
508 	rdtsc_curr = rte_rdtsc_precise();
509 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
510 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
511 
512 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
513 			((double)rte_get_tsc_hz() / rdtsc_diff);
514 
515 	return diff;
516 }
517 
518 static void
519 apply_traffic_profile(struct policy *pol)
520 {
521 
522 	int count;
523 	uint64_t diff = 0;
524 
525 	diff = get_pkt_diff(pol);
526 
527 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
528 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
529 			if (pol->core_share[count].status != 1)
530 				power_manager_scale_core_max(
531 						pol->core_share[count].pcpu);
532 		}
533 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
534 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
535 			if (pol->core_share[count].status != 1)
536 				power_manager_scale_core_med(
537 						pol->core_share[count].pcpu);
538 		}
539 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
540 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
541 			if (pol->core_share[count].status != 1)
542 				power_manager_scale_core_min(
543 						pol->core_share[count].pcpu);
544 		}
545 	}
546 }
547 
548 static void
549 apply_time_profile(struct policy *pol)
550 {
551 
552 	int count, x;
553 	struct timeval tv;
554 	struct tm *ptm;
555 	char time_string[40];
556 
557 	/* Obtain the time of day, and convert it to a tm struct. */
558 	gettimeofday(&tv, NULL);
559 	ptm = localtime(&tv.tv_sec);
560 	/* Format the date and time, down to a single second. */
561 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
562 
563 	for (x = 0; x < HOURS; x++) {
564 
565 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
566 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
567 				if (pol->core_share[count].status != 1) {
568 					power_manager_scale_core_max(
569 						pol->core_share[count].pcpu);
570 				}
571 			}
572 			break;
573 		} else if (ptm->tm_hour ==
574 				pol->pkt.timer_policy.quiet_hours[x]) {
575 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
576 				if (pol->core_share[count].status != 1) {
577 					power_manager_scale_core_min(
578 						pol->core_share[count].pcpu);
579 			}
580 		}
581 			break;
582 		} else if (ptm->tm_hour ==
583 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
584 			apply_traffic_profile(pol);
585 			break;
586 		}
587 	}
588 }
589 
590 static void
591 apply_workload_profile(struct policy *pol)
592 {
593 
594 	int count;
595 
596 	if (pol->pkt.workload == HIGH) {
597 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
598 			if (pol->core_share[count].status != 1)
599 				power_manager_scale_core_max(
600 						pol->core_share[count].pcpu);
601 		}
602 	} else if (pol->pkt.workload == MEDIUM) {
603 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
604 			if (pol->core_share[count].status != 1)
605 				power_manager_scale_core_med(
606 						pol->core_share[count].pcpu);
607 		}
608 	} else if (pol->pkt.workload == LOW) {
609 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
610 			if (pol->core_share[count].status != 1)
611 				power_manager_scale_core_min(
612 						pol->core_share[count].pcpu);
613 		}
614 	}
615 }
616 
617 static void
618 apply_policy(struct policy *pol)
619 {
620 
621 	struct channel_packet *pkt = &pol->pkt;
622 
623 	/*Check policy to use*/
624 	if (pkt->policy_to_use == TRAFFIC)
625 		apply_traffic_profile(pol);
626 	else if (pkt->policy_to_use == TIME)
627 		apply_time_profile(pol);
628 	else if (pkt->policy_to_use == WORKLOAD)
629 		apply_workload_profile(pol);
630 }
631 
632 static int
633 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
634 {
635 	int ret;
636 
637 	if (chan_info == NULL)
638 		return -1;
639 
640 	if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
641 			CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
642 		return -1;
643 
644 	if (pkt->command == CPU_POWER) {
645 		unsigned int core_num;
646 
647 		if (pkt->core_type == CORE_TYPE_VIRTUAL)
648 			core_num = get_pcpu(chan_info, pkt->resource_id);
649 		else
650 			core_num = pkt->resource_id;
651 
652 		RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
653 			core_num);
654 
655 		switch (pkt->unit) {
656 		case(CPU_POWER_SCALE_MIN):
657 			power_manager_scale_core_min(core_num);
658 			break;
659 		case(CPU_POWER_SCALE_MAX):
660 			power_manager_scale_core_max(core_num);
661 			break;
662 		case(CPU_POWER_SCALE_DOWN):
663 			power_manager_scale_core_down(core_num);
664 			break;
665 		case(CPU_POWER_SCALE_UP):
666 			power_manager_scale_core_up(core_num);
667 			break;
668 		case(CPU_POWER_ENABLE_TURBO):
669 			power_manager_enable_turbo_core(core_num);
670 			break;
671 		case(CPU_POWER_DISABLE_TURBO):
672 			power_manager_disable_turbo_core(core_num);
673 			break;
674 		default:
675 			break;
676 		}
677 	}
678 
679 	if (pkt->command == PKT_POLICY) {
680 		RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
681 				pkt->vm_name);
682 		update_policy(pkt);
683 		policy_is_set = 1;
684 	}
685 
686 	if (pkt->command == PKT_POLICY_REMOVE) {
687 		ret = remove_policy(pkt);
688 		if (ret == 0)
689 			RTE_LOG(INFO, CHANNEL_MONITOR,
690 				 "Removed policy %s\n", pkt->vm_name);
691 		else
692 			RTE_LOG(INFO, CHANNEL_MONITOR,
693 				 "Policy %s does not exist\n", pkt->vm_name);
694 	}
695 
696 	/*
697 	 * Return is not checked as channel status may have been set to DISABLED
698 	 * from management thread
699 	 */
700 	rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
701 			CHANNEL_MGR_CHANNEL_CONNECTED);
702 	return 0;
703 
704 }
705 
706 int
707 add_channel_to_monitor(struct channel_info **chan_info)
708 {
709 	struct channel_info *info = *chan_info;
710 	struct epoll_event event;
711 
712 	event.events = EPOLLIN;
713 	event.data.ptr = info;
714 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
715 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
716 				"to epoll\n", info->channel_path);
717 		return -1;
718 	}
719 	RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
720 			"to monitor\n", info->channel_path);
721 	return 0;
722 }
723 
724 int
725 remove_channel_from_monitor(struct channel_info *chan_info)
726 {
727 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
728 			chan_info->fd, NULL) < 0) {
729 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
730 				"from epoll\n", chan_info->channel_path);
731 		return -1;
732 	}
733 	return 0;
734 }
735 
736 int
737 channel_monitor_init(void)
738 {
739 	global_event_fd = epoll_create1(0);
740 	if (global_event_fd == 0) {
741 		RTE_LOG(ERR, CHANNEL_MONITOR,
742 				"Error creating epoll context with error %s\n",
743 				strerror(errno));
744 		return -1;
745 	}
746 	global_events_list = rte_malloc("epoll_events",
747 			sizeof(*global_events_list)
748 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
749 	if (global_events_list == NULL) {
750 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
751 				"epoll events\n");
752 		return -1;
753 	}
754 	return 0;
755 }
756 
757 static void
758 read_binary_packet(struct channel_info *chan_info)
759 {
760 	struct channel_packet pkt;
761 	void *buffer = &pkt;
762 	int buffer_len = sizeof(pkt);
763 	int n_bytes, err = 0;
764 
765 	while (buffer_len > 0) {
766 		n_bytes = read(chan_info->fd,
767 				buffer, buffer_len);
768 		if (n_bytes == buffer_len)
769 			break;
770 		if (n_bytes < 0) {
771 			err = errno;
772 			RTE_LOG(DEBUG, CHANNEL_MONITOR,
773 				"Received error on "
774 				"channel '%s' read: %s\n",
775 				chan_info->channel_path,
776 				strerror(err));
777 			remove_channel(&chan_info);
778 			break;
779 		}
780 		buffer = (char *)buffer + n_bytes;
781 		buffer_len -= n_bytes;
782 	}
783 	if (!err)
784 		process_request(&pkt, chan_info);
785 }
786 
787 #ifdef USE_JANSSON
788 static void
789 read_json_packet(struct channel_info *chan_info)
790 {
791 	struct channel_packet pkt;
792 	int n_bytes, ret;
793 	json_t *root;
794 	json_error_t error;
795 
796 	/* read opening brace to closing brace */
797 	do {
798 		int idx = 0;
799 		int indent = 0;
800 		do {
801 			n_bytes = read(chan_info->fd, &json_data[idx], 1);
802 			if (n_bytes == 0)
803 				break;
804 			if (json_data[idx] == '{')
805 				indent++;
806 			if (json_data[idx] == '}')
807 				indent--;
808 			if ((indent > 0) || (idx > 0))
809 				idx++;
810 			if (indent == 0)
811 				json_data[idx] = 0;
812 			if (idx >= MAX_JSON_STRING_LEN-1)
813 				break;
814 		} while (indent > 0);
815 
816 		if (indent > 0)
817 			/*
818 			 * We've broken out of the read loop without getting
819 			 * a closing brace, so throw away the data
820 			 */
821 			json_data[idx] = 0;
822 
823 		if (strlen(json_data) == 0)
824 			continue;
825 
826 		printf("got [%s]\n", json_data);
827 
828 		root = json_loads(json_data, 0, &error);
829 
830 		if (root) {
831 			/*
832 			 * Because our data is now in the json
833 			 * object, we can overwrite the pkt
834 			 * with a channel_packet struct, using
835 			 * parse_json_to_pkt()
836 			 */
837 			ret = parse_json_to_pkt(root, &pkt);
838 			json_decref(root);
839 			if (ret) {
840 				RTE_LOG(ERR, CHANNEL_MONITOR,
841 					"Error validating JSON profile data\n");
842 				break;
843 			}
844 			process_request(&pkt, chan_info);
845 		} else {
846 			RTE_LOG(ERR, CHANNEL_MONITOR,
847 					"JSON error on line %d: %s\n",
848 					error.line, error.text);
849 		}
850 	} while (n_bytes > 0);
851 }
852 #endif
853 
854 void
855 run_channel_monitor(void)
856 {
857 	while (run_loop) {
858 		int n_events, i;
859 
860 		n_events = epoll_wait(global_event_fd, global_events_list,
861 				MAX_EVENTS, 1);
862 		if (!run_loop)
863 			break;
864 		for (i = 0; i < n_events; i++) {
865 			struct channel_info *chan_info = (struct channel_info *)
866 					global_events_list[i].data.ptr;
867 			if ((global_events_list[i].events & EPOLLERR) ||
868 				(global_events_list[i].events & EPOLLHUP)) {
869 				RTE_LOG(INFO, CHANNEL_MONITOR,
870 						"Remote closed connection for "
871 						"channel '%s'\n",
872 						chan_info->channel_path);
873 				remove_channel(&chan_info);
874 				continue;
875 			}
876 			if (global_events_list[i].events & EPOLLIN) {
877 
878 				switch (chan_info->type) {
879 				case CHANNEL_TYPE_BINARY:
880 					read_binary_packet(chan_info);
881 					break;
882 #ifdef USE_JANSSON
883 				case CHANNEL_TYPE_JSON:
884 					read_json_packet(chan_info);
885 					break;
886 #endif
887 				default:
888 					break;
889 				}
890 			}
891 		}
892 		rte_delay_us(time_period_ms*1000);
893 		if (policy_is_set) {
894 			int j;
895 
896 			for (j = 0; j < MAX_CLIENTS; j++) {
897 				if (policies[j].enabled == 1)
898 					apply_policy(&policies[j]);
899 			}
900 		}
901 	}
902 }
903