xref: /dpdk/examples/vm_power_manager/channel_monitor.c (revision 0de94bcac7fc9bd4264716ab4851c1376b5a2582)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32 
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39 
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41 
42 #define MAX_EVENTS 256
43 
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48 
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[RTE_MAX_LCORE];
55 
56 #ifdef USE_JANSSON
57 
58 union PFID {
59 	struct rte_ether_addr addr;
60 	uint64_t pfid;
61 };
62 
63 static int
64 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
65 {
66 	int i;
67 	char *end;
68 	unsigned long o[RTE_ETHER_ADDR_LEN];
69 
70 	i = 0;
71 	do {
72 		errno = 0;
73 		o[i] = strtoul(a, &end, 16);
74 		if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75 			return -1;
76 		a = end + 1;
77 	} while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78 
79 	/* Junk at the end of line */
80 	if (end[0] != 0)
81 		return -1;
82 
83 	/* Support the format XX:XX:XX:XX:XX:XX */
84 	if (i == RTE_ETHER_ADDR_LEN) {
85 		while (i-- != 0) {
86 			if (o[i] > UINT8_MAX)
87 				return -1;
88 			ether_addr->addr_bytes[i] = (uint8_t)o[i];
89 		}
90 	/* Support the format XXXX:XXXX:XXXX */
91 	} else if (i == RTE_ETHER_ADDR_LEN / 2) {
92 		while (i-- != 0) {
93 			if (o[i] > UINT16_MAX)
94 				return -1;
95 			ether_addr->addr_bytes[i * 2] =
96 					(uint8_t)(o[i] >> 8);
97 			ether_addr->addr_bytes[i * 2 + 1] =
98 					(uint8_t)(o[i] & 0xff);
99 		}
100 	/* unknown format */
101 	} else
102 		return -1;
103 
104 	return 0;
105 }
106 
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110 	union PFID pfid;
111 	int ret;
112 
113 	/* Use port MAC address as the vfid */
114 	ret = str_to_ether_addr(mac, &pfid.addr);
115 
116 	if (ret != 0) {
117 		RTE_LOG(ERR, CHANNEL_MONITOR,
118 			"Invalid mac address received in JSON\n");
119 		pkt->vfid[idx] = 0;
120 		return -1;
121 	}
122 
123 	printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124 			"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125 			pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126 			pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127 			pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128 
129 	pkt->vfid[idx] = pfid.pfid;
130 	return 0;
131 }
132 
133 static char*
134 get_resource_name_from_chn_path(const char *channel_path)
135 {
136 	char *substr = NULL;
137 
138 	substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
139 
140 	return substr;
141 }
142 
143 static int
144 get_resource_id_from_vmname(const char *vm_name)
145 {
146 	int result = -1;
147 	int off = 0;
148 
149 	if (vm_name == NULL)
150 		return -1;
151 
152 	while (vm_name[off] != '\0') {
153 		if (isdigit(vm_name[off]))
154 			break;
155 		off++;
156 	}
157 	result = atoi(&vm_name[off]);
158 	if ((result == 0) && (vm_name[off] != '0'))
159 		return -1;
160 
161 	return result;
162 }
163 
164 static int
165 parse_json_to_pkt(json_t *element, struct channel_packet *pkt,
166 					const char *vm_name)
167 {
168 	const char *key;
169 	json_t *value;
170 	int ret;
171 	int resource_id;
172 
173 	memset(pkt, 0, sizeof(struct channel_packet));
174 
175 	pkt->nb_mac_to_monitor = 0;
176 	pkt->t_boost_status.tbEnabled = false;
177 	pkt->workload = LOW;
178 	pkt->policy_to_use = TIME;
179 	pkt->command = PKT_POLICY;
180 	pkt->core_type = CORE_TYPE_PHYSICAL;
181 
182 	if (vm_name == NULL) {
183 		RTE_LOG(ERR, CHANNEL_MONITOR,
184 			"vm_name is NULL, request rejected !\n");
185 		return -1;
186 	}
187 
188 	json_object_foreach(element, key, value) {
189 		if (!strcmp(key, "policy")) {
190 			/* Recurse in to get the contents of profile */
191 			ret = parse_json_to_pkt(value, pkt, vm_name);
192 			if (ret)
193 				return ret;
194 		} else if (!strcmp(key, "instruction")) {
195 			/* Recurse in to get the contents of instruction */
196 			ret = parse_json_to_pkt(value, pkt, vm_name);
197 			if (ret)
198 				return ret;
199 		} else if (!strcmp(key, "command")) {
200 			char command[32];
201 			strlcpy(command, json_string_value(value), 32);
202 			if (!strcmp(command, "power")) {
203 				pkt->command = CPU_POWER;
204 			} else if (!strcmp(command, "create")) {
205 				pkt->command = PKT_POLICY;
206 			} else if (!strcmp(command, "destroy")) {
207 				pkt->command = PKT_POLICY_REMOVE;
208 			} else {
209 				RTE_LOG(ERR, CHANNEL_MONITOR,
210 					"Invalid command received in JSON\n");
211 				return -1;
212 			}
213 		} else if (!strcmp(key, "policy_type")) {
214 			char command[32];
215 			strlcpy(command, json_string_value(value), 32);
216 			if (!strcmp(command, "TIME")) {
217 				pkt->policy_to_use = TIME;
218 			} else if (!strcmp(command, "TRAFFIC")) {
219 				pkt->policy_to_use = TRAFFIC;
220 			} else if (!strcmp(command, "WORKLOAD")) {
221 				pkt->policy_to_use = WORKLOAD;
222 			} else if (!strcmp(command, "BRANCH_RATIO")) {
223 				pkt->policy_to_use = BRANCH_RATIO;
224 			} else {
225 				RTE_LOG(ERR, CHANNEL_MONITOR,
226 					"Wrong policy_type received in JSON\n");
227 				return -1;
228 			}
229 		} else if (!strcmp(key, "workload")) {
230 			char command[32];
231 			strlcpy(command, json_string_value(value), 32);
232 			if (!strcmp(command, "HIGH")) {
233 				pkt->workload = HIGH;
234 			} else if (!strcmp(command, "MEDIUM")) {
235 				pkt->workload = MEDIUM;
236 			} else if (!strcmp(command, "LOW")) {
237 				pkt->workload = LOW;
238 			} else {
239 				RTE_LOG(ERR, CHANNEL_MONITOR,
240 					"Wrong workload received in JSON\n");
241 				return -1;
242 			}
243 		} else if (!strcmp(key, "busy_hours")) {
244 			unsigned int i;
245 			size_t size = json_array_size(value);
246 
247 			for (i = 0; i < size; i++) {
248 				int hour = (int)json_integer_value(
249 						json_array_get(value, i));
250 				pkt->timer_policy.busy_hours[i] = hour;
251 			}
252 		} else if (!strcmp(key, "quiet_hours")) {
253 			unsigned int i;
254 			size_t size = json_array_size(value);
255 
256 			for (i = 0; i < size; i++) {
257 				int hour = (int)json_integer_value(
258 						json_array_get(value, i));
259 				pkt->timer_policy.quiet_hours[i] = hour;
260 			}
261 		} else if (!strcmp(key, "mac_list")) {
262 			unsigned int i;
263 			size_t size = json_array_size(value);
264 
265 			for (i = 0; i < size; i++) {
266 				char mac[32];
267 				strlcpy(mac,
268 					json_string_value(json_array_get(value, i)),
269 					32);
270 				set_policy_mac(pkt, i, mac);
271 			}
272 			pkt->nb_mac_to_monitor = size;
273 		} else if (!strcmp(key, "avg_packet_thresh")) {
274 			pkt->traffic_policy.avg_max_packet_thresh =
275 					(uint32_t)json_integer_value(value);
276 		} else if (!strcmp(key, "max_packet_thresh")) {
277 			pkt->traffic_policy.max_max_packet_thresh =
278 					(uint32_t)json_integer_value(value);
279 		} else if (!strcmp(key, "unit")) {
280 			char unit[32];
281 			strlcpy(unit, json_string_value(value), 32);
282 			if (!strcmp(unit, "SCALE_UP")) {
283 				pkt->unit = CPU_POWER_SCALE_UP;
284 			} else if (!strcmp(unit, "SCALE_DOWN")) {
285 				pkt->unit = CPU_POWER_SCALE_DOWN;
286 			} else if (!strcmp(unit, "SCALE_MAX")) {
287 				pkt->unit = CPU_POWER_SCALE_MAX;
288 			} else if (!strcmp(unit, "SCALE_MIN")) {
289 				pkt->unit = CPU_POWER_SCALE_MIN;
290 			} else if (!strcmp(unit, "ENABLE_TURBO")) {
291 				pkt->unit = CPU_POWER_ENABLE_TURBO;
292 			} else if (!strcmp(unit, "DISABLE_TURBO")) {
293 				pkt->unit = CPU_POWER_DISABLE_TURBO;
294 			} else {
295 				RTE_LOG(ERR, CHANNEL_MONITOR,
296 					"Invalid command received in JSON\n");
297 				return -1;
298 			}
299 		} else {
300 			RTE_LOG(ERR, CHANNEL_MONITOR,
301 				"Unknown key received in JSON string: %s\n",
302 				key);
303 		}
304 
305 		resource_id = get_resource_id_from_vmname(vm_name);
306 		if (resource_id < 0) {
307 			RTE_LOG(ERR, CHANNEL_MONITOR,
308 				"Could not get resource_id from vm_name:%s\n",
309 				vm_name);
310 			return -1;
311 		}
312 		strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ);
313 		pkt->resource_id = resource_id;
314 	}
315 	return 0;
316 }
317 #endif
318 
319 void channel_monitor_exit(void)
320 {
321 	run_loop = 0;
322 	rte_free(global_events_list);
323 }
324 
325 static void
326 core_share(int pNo, int z, int x, int t)
327 {
328 	if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
329 		if (strcmp(policies[pNo].pkt.vm_name,
330 				lvm_info[x].vm_name) != 0) {
331 			policies[pNo].core_share[z].status = 1;
332 			power_manager_scale_core_max(
333 					policies[pNo].core_share[z].pcpu);
334 		}
335 	}
336 }
337 
338 static void
339 core_share_status(int pNo)
340 {
341 
342 	int noVms = 0, noVcpus = 0, z, x, t;
343 
344 	get_all_vm(&noVms, &noVcpus);
345 
346 	/* Reset Core Share Status. */
347 	for (z = 0; z < noVcpus; z++)
348 		policies[pNo].core_share[z].status = 0;
349 
350 	/* Foreach vcpu in a policy. */
351 	for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
352 		/* Foreach VM on the platform. */
353 		for (x = 0; x < noVms; x++) {
354 			/* Foreach vcpu of VMs on platform. */
355 			for (t = 0; t < lvm_info[x].num_cpus; t++)
356 				core_share(pNo, z, x, t);
357 		}
358 	}
359 }
360 
361 
362 static int
363 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
364 {
365 	int ret = 0;
366 
367 	if (pol->pkt.policy_to_use == BRANCH_RATIO) {
368 		ci->cd[pcpu].oob_enabled = 1;
369 		ret = add_core_to_monitor(pcpu);
370 		if (ret == 0)
371 			RTE_LOG(INFO, CHANNEL_MONITOR,
372 					"Monitoring pcpu %d OOB for %s\n",
373 					pcpu, pol->pkt.vm_name);
374 		else
375 			RTE_LOG(ERR, CHANNEL_MONITOR,
376 					"Error monitoring pcpu %d OOB for %s\n",
377 					pcpu, pol->pkt.vm_name);
378 
379 	} else {
380 		pol->core_share[count].pcpu = pcpu;
381 		RTE_LOG(INFO, CHANNEL_MONITOR,
382 				"Monitoring pcpu %d for %s\n",
383 				pcpu, pol->pkt.vm_name);
384 	}
385 	return ret;
386 }
387 
388 static void
389 get_pcpu_to_control(struct policy *pol)
390 {
391 
392 	/* Convert vcpu to pcpu. */
393 	struct vm_info info;
394 	int pcpu, count;
395 	struct core_info *ci;
396 
397 	ci = get_core_info();
398 
399 	RTE_LOG(DEBUG, CHANNEL_MONITOR,
400 			"Looking for pcpu for %s\n", pol->pkt.vm_name);
401 
402 	/*
403 	 * So now that we're handling virtual and physical cores, we need to
404 	 * differenciate between them when adding them to the branch monitor.
405 	 * Virtual cores need to be converted to physical cores.
406 	 */
407 	if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
408 		/*
409 		 * If the cores in the policy are virtual, we need to map them
410 		 * to physical core. We look up the vm info and use that for
411 		 * the mapping.
412 		 */
413 		get_info_vm(pol->pkt.vm_name, &info);
414 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
415 			pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
416 			pcpu_monitor(pol, ci, pcpu, count);
417 		}
418 	} else {
419 		/*
420 		 * If the cores in the policy are physical, we just use
421 		 * those core id's directly.
422 		 */
423 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
424 			pcpu = pol->pkt.vcpu_to_control[count];
425 			pcpu_monitor(pol, ci, pcpu, count);
426 		}
427 	}
428 }
429 
430 static int
431 get_pfid(struct policy *pol)
432 {
433 
434 	int i, x, ret = 0;
435 
436 	for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
437 
438 		RTE_ETH_FOREACH_DEV(x) {
439 			ret = rte_pmd_i40e_query_vfid_by_mac(x,
440 				(struct rte_ether_addr *)&(pol->pkt.vfid[i]));
441 			if (ret != -EINVAL) {
442 				pol->port[i] = x;
443 				break;
444 			}
445 		}
446 		if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
447 			RTE_LOG(INFO, CHANNEL_MONITOR,
448 				"Error with Policy. MAC not found on "
449 				"attached ports ");
450 			pol->enabled = 0;
451 			return ret;
452 		}
453 		pol->pfid[i] = ret;
454 	}
455 	return 1;
456 }
457 
458 static int
459 update_policy(struct channel_packet *pkt)
460 {
461 
462 	unsigned int updated = 0;
463 	unsigned int i;
464 
465 
466 	RTE_LOG(INFO, CHANNEL_MONITOR,
467 			"Applying policy for %s\n", pkt->vm_name);
468 
469 	for (i = 0; i < RTE_DIM(policies); i++) {
470 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
471 			/* Copy the contents of *pkt into the policy.pkt */
472 			policies[i].pkt = *pkt;
473 			get_pcpu_to_control(&policies[i]);
474 			/* Check Eth dev only for Traffic policy */
475 			if (policies[i].pkt.policy_to_use == TRAFFIC) {
476 				if (get_pfid(&policies[i]) < 0) {
477 					updated = 1;
478 					break;
479 				}
480 			}
481 			core_share_status(i);
482 			policies[i].enabled = 1;
483 			updated = 1;
484 		}
485 	}
486 	if (!updated) {
487 		for (i = 0; i < RTE_DIM(policies); i++) {
488 			if (policies[i].enabled == 0) {
489 				policies[i].pkt = *pkt;
490 				get_pcpu_to_control(&policies[i]);
491 				/* Check Eth dev only for Traffic policy */
492 				if (policies[i].pkt.policy_to_use == TRAFFIC) {
493 					if (get_pfid(&policies[i]) < 0) {
494 						updated = 1;
495 						break;
496 					}
497 				}
498 				core_share_status(i);
499 				policies[i].enabled = 1;
500 				break;
501 			}
502 		}
503 	}
504 	return 0;
505 }
506 
507 static int
508 remove_policy(struct channel_packet *pkt __rte_unused)
509 {
510 	unsigned int i;
511 
512 	/*
513 	 * Disabling the policy is simply a case of setting
514 	 * enabled to 0
515 	 */
516 	for (i = 0; i < RTE_DIM(policies); i++) {
517 		if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
518 			policies[i].enabled = 0;
519 			return 0;
520 		}
521 	}
522 	return -1;
523 }
524 
525 static uint64_t
526 get_pkt_diff(struct policy *pol)
527 {
528 
529 	uint64_t vsi_pkt_count,
530 		vsi_pkt_total = 0,
531 		vsi_pkt_count_prev_total = 0;
532 	double rdtsc_curr, rdtsc_diff, diff;
533 	int x;
534 	struct rte_eth_stats vf_stats;
535 
536 	for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
537 
538 		/*Read vsi stats*/
539 		if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
540 			vsi_pkt_count = vf_stats.ipackets;
541 		else
542 			vsi_pkt_count = -1;
543 
544 		vsi_pkt_total += vsi_pkt_count;
545 
546 		vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
547 		vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
548 	}
549 
550 	rdtsc_curr = rte_rdtsc_precise();
551 	rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
552 	rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
553 
554 	diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
555 			((double)rte_get_tsc_hz() / rdtsc_diff);
556 
557 	return diff;
558 }
559 
560 static void
561 apply_traffic_profile(struct policy *pol)
562 {
563 
564 	int count;
565 	uint64_t diff = 0;
566 
567 	diff = get_pkt_diff(pol);
568 
569 	if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
570 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
571 			if (pol->core_share[count].status != 1)
572 				power_manager_scale_core_max(
573 						pol->core_share[count].pcpu);
574 		}
575 	} else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
576 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
577 			if (pol->core_share[count].status != 1)
578 				power_manager_scale_core_med(
579 						pol->core_share[count].pcpu);
580 		}
581 	} else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
582 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
583 			if (pol->core_share[count].status != 1)
584 				power_manager_scale_core_min(
585 						pol->core_share[count].pcpu);
586 		}
587 	}
588 }
589 
590 static void
591 apply_time_profile(struct policy *pol)
592 {
593 
594 	int count, x;
595 	struct timeval tv;
596 	struct tm *ptm;
597 	char time_string[40];
598 
599 	/* Obtain the time of day, and convert it to a tm struct. */
600 	gettimeofday(&tv, NULL);
601 	ptm = localtime(&tv.tv_sec);
602 	/* Format the date and time, down to a single second. */
603 	strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
604 
605 	for (x = 0; x < HOURS; x++) {
606 
607 		if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
608 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
609 				if (pol->core_share[count].status != 1) {
610 					power_manager_scale_core_max(
611 						pol->core_share[count].pcpu);
612 				}
613 			}
614 			break;
615 		} else if (ptm->tm_hour ==
616 				pol->pkt.timer_policy.quiet_hours[x]) {
617 			for (count = 0; count < pol->pkt.num_vcpu; count++) {
618 				if (pol->core_share[count].status != 1) {
619 					power_manager_scale_core_min(
620 						pol->core_share[count].pcpu);
621 			}
622 		}
623 			break;
624 		} else if (ptm->tm_hour ==
625 			pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
626 			apply_traffic_profile(pol);
627 			break;
628 		}
629 	}
630 }
631 
632 static void
633 apply_workload_profile(struct policy *pol)
634 {
635 
636 	int count;
637 
638 	if (pol->pkt.workload == HIGH) {
639 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
640 			if (pol->core_share[count].status != 1)
641 				power_manager_scale_core_max(
642 						pol->core_share[count].pcpu);
643 		}
644 	} else if (pol->pkt.workload == MEDIUM) {
645 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
646 			if (pol->core_share[count].status != 1)
647 				power_manager_scale_core_med(
648 						pol->core_share[count].pcpu);
649 		}
650 	} else if (pol->pkt.workload == LOW) {
651 		for (count = 0; count < pol->pkt.num_vcpu; count++) {
652 			if (pol->core_share[count].status != 1)
653 				power_manager_scale_core_min(
654 						pol->core_share[count].pcpu);
655 		}
656 	}
657 }
658 
659 static void
660 apply_policy(struct policy *pol)
661 {
662 
663 	struct channel_packet *pkt = &pol->pkt;
664 
665 	/*Check policy to use*/
666 	if (pkt->policy_to_use == TRAFFIC)
667 		apply_traffic_profile(pol);
668 	else if (pkt->policy_to_use == TIME)
669 		apply_time_profile(pol);
670 	else if (pkt->policy_to_use == WORKLOAD)
671 		apply_workload_profile(pol);
672 }
673 
674 static int
675 write_binary_packet(struct channel_packet *pkt, struct channel_info *chan_info)
676 {
677 	int ret, buffer_len = sizeof(*pkt);
678 	void *buffer = pkt;
679 
680 	if (chan_info->fd < 0) {
681 		RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n");
682 		return -1;
683 	}
684 
685 	while (buffer_len > 0) {
686 		ret = write(chan_info->fd, buffer, buffer_len);
687 		if (ret == -1) {
688 			if (errno == EINTR)
689 				continue;
690 			RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n",
691 					strerror(errno));
692 			return -1;
693 		}
694 		buffer = (char *)buffer + ret;
695 		buffer_len -= ret;
696 	}
697 	return 0;
698 }
699 
700 static int
701 send_ack_for_received_cmd(struct channel_packet *pkt,
702 		struct channel_info *chan_info,
703 		uint32_t command)
704 {
705 	pkt->command = command;
706 	return write_binary_packet(pkt, chan_info);
707 }
708 
709 static int
710 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
711 {
712 	int ret;
713 
714 	if (chan_info == NULL)
715 		return -1;
716 
717 	if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
718 			CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
719 		return -1;
720 
721 	if (pkt->command == CPU_POWER) {
722 		unsigned int core_num;
723 
724 		if (pkt->core_type == CORE_TYPE_VIRTUAL)
725 			core_num = get_pcpu(chan_info, pkt->resource_id);
726 		else
727 			core_num = pkt->resource_id;
728 
729 		RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
730 			core_num);
731 
732 		bool valid_unit = true;
733 		int scale_res;
734 
735 		switch (pkt->unit) {
736 		case(CPU_POWER_SCALE_MIN):
737 			scale_res = power_manager_scale_core_min(core_num);
738 			break;
739 		case(CPU_POWER_SCALE_MAX):
740 			scale_res = power_manager_scale_core_max(core_num);
741 			break;
742 		case(CPU_POWER_SCALE_DOWN):
743 			scale_res = power_manager_scale_core_down(core_num);
744 			break;
745 		case(CPU_POWER_SCALE_UP):
746 			scale_res = power_manager_scale_core_up(core_num);
747 			break;
748 		case(CPU_POWER_ENABLE_TURBO):
749 			scale_res = power_manager_enable_turbo_core(core_num);
750 			break;
751 		case(CPU_POWER_DISABLE_TURBO):
752 			scale_res = power_manager_disable_turbo_core(core_num);
753 			break;
754 		default:
755 			valid_unit = false;
756 			break;
757 		}
758 
759 		if (valid_unit) {
760 			ret = send_ack_for_received_cmd(pkt,
761 					chan_info,
762 					scale_res > 0 ?
763 						CPU_POWER_CMD_ACK :
764 						CPU_POWER_CMD_NACK);
765 			if (ret < 0)
766 				RTE_LOG(DEBUG, CHANNEL_MONITOR, "Error during sending ack command.\n");
767 		} else
768 			RTE_LOG(DEBUG, CHANNEL_MONITOR, "Unexpected unit type.\n");
769 
770 	}
771 
772 	if (pkt->command == PKT_POLICY) {
773 		RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
774 				pkt->vm_name);
775 		int ret = send_ack_for_received_cmd(pkt,
776 				chan_info,
777 				CPU_POWER_CMD_ACK);
778 		if (ret < 0)
779 			RTE_LOG(DEBUG, CHANNEL_MONITOR, "Error during sending ack command.\n");
780 		update_policy(pkt);
781 		policy_is_set = 1;
782 	}
783 
784 	if (pkt->command == PKT_POLICY_REMOVE) {
785 		ret = remove_policy(pkt);
786 		if (ret == 0)
787 			RTE_LOG(INFO, CHANNEL_MONITOR,
788 				 "Removed policy %s\n", pkt->vm_name);
789 		else
790 			RTE_LOG(INFO, CHANNEL_MONITOR,
791 				 "Policy %s does not exist\n", pkt->vm_name);
792 	}
793 
794 	/*
795 	 * Return is not checked as channel status may have been set to DISABLED
796 	 * from management thread
797 	 */
798 	rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
799 			CHANNEL_MGR_CHANNEL_CONNECTED);
800 	return 0;
801 
802 }
803 
804 int
805 add_channel_to_monitor(struct channel_info **chan_info)
806 {
807 	struct channel_info *info = *chan_info;
808 	struct epoll_event event;
809 
810 	event.events = EPOLLIN;
811 	event.data.ptr = info;
812 	if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
813 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
814 				"to epoll\n", info->channel_path);
815 		return -1;
816 	}
817 	RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
818 			"to monitor\n", info->channel_path);
819 	return 0;
820 }
821 
822 int
823 remove_channel_from_monitor(struct channel_info *chan_info)
824 {
825 	if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
826 			chan_info->fd, NULL) < 0) {
827 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
828 				"from epoll\n", chan_info->channel_path);
829 		return -1;
830 	}
831 	return 0;
832 }
833 
834 int
835 channel_monitor_init(void)
836 {
837 	global_event_fd = epoll_create1(0);
838 	if (global_event_fd == 0) {
839 		RTE_LOG(ERR, CHANNEL_MONITOR,
840 				"Error creating epoll context with error %s\n",
841 				strerror(errno));
842 		return -1;
843 	}
844 	global_events_list = rte_malloc("epoll_events",
845 			sizeof(*global_events_list)
846 			* MAX_EVENTS, RTE_CACHE_LINE_SIZE);
847 	if (global_events_list == NULL) {
848 		RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
849 				"epoll events\n");
850 		return -1;
851 	}
852 	return 0;
853 }
854 
855 static void
856 read_binary_packet(struct channel_info *chan_info)
857 {
858 	struct channel_packet pkt;
859 	void *buffer = &pkt;
860 	int buffer_len = sizeof(pkt);
861 	int n_bytes, err = 0;
862 
863 	while (buffer_len > 0) {
864 		n_bytes = read(chan_info->fd,
865 				buffer, buffer_len);
866 		if (n_bytes == buffer_len)
867 			break;
868 		if (n_bytes < 0) {
869 			err = errno;
870 			RTE_LOG(DEBUG, CHANNEL_MONITOR,
871 				"Received error on "
872 				"channel '%s' read: %s\n",
873 				chan_info->channel_path,
874 				strerror(err));
875 			remove_channel(&chan_info);
876 			break;
877 		}
878 		buffer = (char *)buffer + n_bytes;
879 		buffer_len -= n_bytes;
880 	}
881 	if (!err)
882 		process_request(&pkt, chan_info);
883 }
884 
885 #ifdef USE_JANSSON
886 static void
887 read_json_packet(struct channel_info *chan_info)
888 {
889 	struct channel_packet pkt;
890 	int n_bytes, ret;
891 	json_t *root;
892 	json_error_t error;
893 	const char *resource_name;
894 	char *start, *end;
895 	uint32_t n;
896 
897 
898 	/* read opening brace to closing brace */
899 	do {
900 		int idx = 0;
901 		int indent = 0;
902 		do {
903 			n_bytes = read(chan_info->fd, &json_data[idx], 1);
904 			if (n_bytes == 0)
905 				break;
906 			if (json_data[idx] == '{')
907 				indent++;
908 			if (json_data[idx] == '}')
909 				indent--;
910 			if ((indent > 0) || (idx > 0))
911 				idx++;
912 			if (indent <= 0)
913 				json_data[idx] = 0;
914 			if (idx >= MAX_JSON_STRING_LEN-1)
915 				break;
916 		} while (indent > 0);
917 
918 		json_data[idx] = '\0';
919 
920 		if (strlen(json_data) == 0)
921 			continue;
922 
923 		printf("got [%s]\n", json_data);
924 
925 		root = json_loads(json_data, 0, &error);
926 
927 		if (root) {
928 			resource_name = get_resource_name_from_chn_path(
929 				chan_info->channel_path);
930 			/*
931 			 * Because our data is now in the json
932 			 * object, we can overwrite the pkt
933 			 * with a channel_packet struct, using
934 			 * parse_json_to_pkt()
935 			 */
936 			ret = parse_json_to_pkt(root, &pkt, resource_name);
937 			json_decref(root);
938 			if (ret) {
939 				RTE_LOG(ERR, CHANNEL_MONITOR,
940 					"Error validating JSON profile data\n");
941 				break;
942 			}
943 			start = strstr(pkt.vm_name,
944 					CHANNEL_MGR_FIFO_PATTERN_NAME);
945 			if (start != NULL) {
946 				/* move past pattern to start of fifo id */
947 				start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
948 
949 				end = start;
950 				n = (uint32_t)strtoul(start, &end, 10);
951 
952 				if (end[0] == '\0') {
953 					/* Add core id to core list */
954 					pkt.num_vcpu = 1;
955 					pkt.vcpu_to_control[0] = n;
956 					process_request(&pkt, chan_info);
957 				} else {
958 					RTE_LOG(ERR, CHANNEL_MONITOR,
959 						"Cannot extract core id from fifo name\n");
960 				}
961 			} else {
962 				process_request(&pkt, chan_info);
963 			}
964 		} else {
965 			RTE_LOG(ERR, CHANNEL_MONITOR,
966 					"JSON error on line %d: %s\n",
967 					error.line, error.text);
968 		}
969 	} while (n_bytes > 0);
970 }
971 #endif
972 
973 void
974 run_channel_monitor(void)
975 {
976 	while (run_loop) {
977 		int n_events, i;
978 
979 		n_events = epoll_wait(global_event_fd, global_events_list,
980 				MAX_EVENTS, 1);
981 		if (!run_loop)
982 			break;
983 		for (i = 0; i < n_events; i++) {
984 			struct channel_info *chan_info = (struct channel_info *)
985 					global_events_list[i].data.ptr;
986 			if ((global_events_list[i].events & EPOLLERR) ||
987 				(global_events_list[i].events & EPOLLHUP)) {
988 				RTE_LOG(INFO, CHANNEL_MONITOR,
989 						"Remote closed connection for "
990 						"channel '%s'\n",
991 						chan_info->channel_path);
992 				remove_channel(&chan_info);
993 				continue;
994 			}
995 			if (global_events_list[i].events & EPOLLIN) {
996 
997 				switch (chan_info->type) {
998 				case CHANNEL_TYPE_BINARY:
999 					read_binary_packet(chan_info);
1000 					break;
1001 #ifdef USE_JANSSON
1002 				case CHANNEL_TYPE_JSON:
1003 					read_json_packet(chan_info);
1004 					break;
1005 #endif
1006 				default:
1007 					break;
1008 				}
1009 			}
1010 		}
1011 		rte_delay_us(time_period_ms*1000);
1012 		if (policy_is_set) {
1013 			unsigned int j;
1014 
1015 			for (j = 0; j < RTE_DIM(policies); j++) {
1016 				if (policies[j].enabled == 1)
1017 					apply_policy(&policies[j]);
1018 			}
1019 		}
1020 	}
1021 }
1022