1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 */ 4 5 #include <unistd.h> 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include <stdint.h> 9 #include <signal.h> 10 #include <errno.h> 11 #include <string.h> 12 #include <fcntl.h> 13 #include <sys/types.h> 14 #include <sys/epoll.h> 15 #include <sys/queue.h> 16 #include <sys/time.h> 17 #include <sys/socket.h> 18 #include <sys/select.h> 19 #ifdef USE_JANSSON 20 #include <jansson.h> 21 #else 22 #pragma message "Jansson dev libs unavailable, not including JSON parsing" 23 #endif 24 #include <rte_string_fns.h> 25 #include <rte_log.h> 26 #include <rte_memory.h> 27 #include <rte_malloc.h> 28 #include <rte_atomic.h> 29 #include <rte_cycles.h> 30 #include <rte_ethdev.h> 31 #include <rte_pmd_i40e.h> 32 33 #include <libvirt/libvirt.h> 34 #include "channel_monitor.h" 35 #include "channel_commands.h" 36 #include "channel_manager.h" 37 #include "power_manager.h" 38 #include "oob_monitor.h" 39 40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1 41 42 #define MAX_EVENTS 256 43 44 uint64_t vsi_pkt_count_prev[384]; 45 uint64_t rdtsc_prev[384]; 46 #define MAX_JSON_STRING_LEN 1024 47 char json_data[MAX_JSON_STRING_LEN]; 48 49 double time_period_ms = 1; 50 static volatile unsigned run_loop = 1; 51 static int global_event_fd; 52 static unsigned int policy_is_set; 53 static struct epoll_event *global_events_list; 54 static struct policy policies[RTE_MAX_LCORE]; 55 56 #ifdef USE_JANSSON 57 58 union PFID { 59 struct rte_ether_addr addr; 60 uint64_t pfid; 61 }; 62 63 static int 64 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr) 65 { 66 int i; 67 char *end; 68 unsigned long o[RTE_ETHER_ADDR_LEN]; 69 70 i = 0; 71 do { 72 errno = 0; 73 o[i] = strtoul(a, &end, 16); 74 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0)) 75 return -1; 76 a = end + 1; 77 } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0); 78 79 /* Junk at the end of line */ 80 if (end[0] != 0) 81 return -1; 82 83 /* Support the format XX:XX:XX:XX:XX:XX */ 84 if (i == RTE_ETHER_ADDR_LEN) { 85 while (i-- != 0) { 86 if (o[i] > UINT8_MAX) 87 return -1; 88 ether_addr->addr_bytes[i] = (uint8_t)o[i]; 89 } 90 /* Support the format XXXX:XXXX:XXXX */ 91 } else if (i == RTE_ETHER_ADDR_LEN / 2) { 92 while (i-- != 0) { 93 if (o[i] > UINT16_MAX) 94 return -1; 95 ether_addr->addr_bytes[i * 2] = 96 (uint8_t)(o[i] >> 8); 97 ether_addr->addr_bytes[i * 2 + 1] = 98 (uint8_t)(o[i] & 0xff); 99 } 100 /* unknown format */ 101 } else 102 return -1; 103 104 return 0; 105 } 106 107 static int 108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac) 109 { 110 union PFID pfid; 111 int ret; 112 113 /* Use port MAC address as the vfid */ 114 ret = str_to_ether_addr(mac, &pfid.addr); 115 116 if (ret != 0) { 117 RTE_LOG(ERR, CHANNEL_MONITOR, 118 "Invalid mac address received in JSON\n"); 119 pkt->vfid[idx] = 0; 120 return -1; 121 } 122 123 printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":" 124 "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n", 125 pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1], 126 pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3], 127 pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]); 128 129 pkt->vfid[idx] = pfid.pfid; 130 return 0; 131 } 132 133 static char* 134 get_resource_name_from_chn_path(const char *channel_path) 135 { 136 char *substr = NULL; 137 138 substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME); 139 140 return substr; 141 } 142 143 static int 144 get_resource_id_from_vmname(const char *vm_name) 145 { 146 int result = -1; 147 int off = 0; 148 149 if (vm_name == NULL) 150 return -1; 151 152 while (vm_name[off] != '\0') { 153 if (isdigit(vm_name[off])) 154 break; 155 off++; 156 } 157 result = atoi(&vm_name[off]); 158 if ((result == 0) && (vm_name[off] != '0')) 159 return -1; 160 161 return result; 162 } 163 164 static int 165 parse_json_to_pkt(json_t *element, struct channel_packet *pkt, 166 const char *vm_name) 167 { 168 const char *key; 169 json_t *value; 170 int ret; 171 int resource_id; 172 173 memset(pkt, 0, sizeof(struct channel_packet)); 174 175 pkt->nb_mac_to_monitor = 0; 176 pkt->t_boost_status.tbEnabled = false; 177 pkt->workload = LOW; 178 pkt->policy_to_use = TIME; 179 pkt->command = PKT_POLICY; 180 pkt->core_type = CORE_TYPE_PHYSICAL; 181 182 if (vm_name == NULL) { 183 RTE_LOG(ERR, CHANNEL_MONITOR, 184 "vm_name is NULL, request rejected !\n"); 185 return -1; 186 } 187 188 json_object_foreach(element, key, value) { 189 if (!strcmp(key, "policy")) { 190 /* Recurse in to get the contents of profile */ 191 ret = parse_json_to_pkt(value, pkt, vm_name); 192 if (ret) 193 return ret; 194 } else if (!strcmp(key, "instruction")) { 195 /* Recurse in to get the contents of instruction */ 196 ret = parse_json_to_pkt(value, pkt, vm_name); 197 if (ret) 198 return ret; 199 } else if (!strcmp(key, "command")) { 200 char command[32]; 201 strlcpy(command, json_string_value(value), 32); 202 if (!strcmp(command, "power")) { 203 pkt->command = CPU_POWER; 204 } else if (!strcmp(command, "create")) { 205 pkt->command = PKT_POLICY; 206 } else if (!strcmp(command, "destroy")) { 207 pkt->command = PKT_POLICY_REMOVE; 208 } else { 209 RTE_LOG(ERR, CHANNEL_MONITOR, 210 "Invalid command received in JSON\n"); 211 return -1; 212 } 213 } else if (!strcmp(key, "policy_type")) { 214 char command[32]; 215 strlcpy(command, json_string_value(value), 32); 216 if (!strcmp(command, "TIME")) { 217 pkt->policy_to_use = TIME; 218 } else if (!strcmp(command, "TRAFFIC")) { 219 pkt->policy_to_use = TRAFFIC; 220 } else if (!strcmp(command, "WORKLOAD")) { 221 pkt->policy_to_use = WORKLOAD; 222 } else if (!strcmp(command, "BRANCH_RATIO")) { 223 pkt->policy_to_use = BRANCH_RATIO; 224 } else { 225 RTE_LOG(ERR, CHANNEL_MONITOR, 226 "Wrong policy_type received in JSON\n"); 227 return -1; 228 } 229 } else if (!strcmp(key, "workload")) { 230 char command[32]; 231 strlcpy(command, json_string_value(value), 32); 232 if (!strcmp(command, "HIGH")) { 233 pkt->workload = HIGH; 234 } else if (!strcmp(command, "MEDIUM")) { 235 pkt->workload = MEDIUM; 236 } else if (!strcmp(command, "LOW")) { 237 pkt->workload = LOW; 238 } else { 239 RTE_LOG(ERR, CHANNEL_MONITOR, 240 "Wrong workload received in JSON\n"); 241 return -1; 242 } 243 } else if (!strcmp(key, "busy_hours")) { 244 unsigned int i; 245 size_t size = json_array_size(value); 246 247 for (i = 0; i < size; i++) { 248 int hour = (int)json_integer_value( 249 json_array_get(value, i)); 250 pkt->timer_policy.busy_hours[i] = hour; 251 } 252 } else if (!strcmp(key, "quiet_hours")) { 253 unsigned int i; 254 size_t size = json_array_size(value); 255 256 for (i = 0; i < size; i++) { 257 int hour = (int)json_integer_value( 258 json_array_get(value, i)); 259 pkt->timer_policy.quiet_hours[i] = hour; 260 } 261 } else if (!strcmp(key, "mac_list")) { 262 unsigned int i; 263 size_t size = json_array_size(value); 264 265 for (i = 0; i < size; i++) { 266 char mac[32]; 267 strlcpy(mac, 268 json_string_value(json_array_get(value, i)), 269 32); 270 set_policy_mac(pkt, i, mac); 271 } 272 pkt->nb_mac_to_monitor = size; 273 } else if (!strcmp(key, "avg_packet_thresh")) { 274 pkt->traffic_policy.avg_max_packet_thresh = 275 (uint32_t)json_integer_value(value); 276 } else if (!strcmp(key, "max_packet_thresh")) { 277 pkt->traffic_policy.max_max_packet_thresh = 278 (uint32_t)json_integer_value(value); 279 } else if (!strcmp(key, "unit")) { 280 char unit[32]; 281 strlcpy(unit, json_string_value(value), 32); 282 if (!strcmp(unit, "SCALE_UP")) { 283 pkt->unit = CPU_POWER_SCALE_UP; 284 } else if (!strcmp(unit, "SCALE_DOWN")) { 285 pkt->unit = CPU_POWER_SCALE_DOWN; 286 } else if (!strcmp(unit, "SCALE_MAX")) { 287 pkt->unit = CPU_POWER_SCALE_MAX; 288 } else if (!strcmp(unit, "SCALE_MIN")) { 289 pkt->unit = CPU_POWER_SCALE_MIN; 290 } else if (!strcmp(unit, "ENABLE_TURBO")) { 291 pkt->unit = CPU_POWER_ENABLE_TURBO; 292 } else if (!strcmp(unit, "DISABLE_TURBO")) { 293 pkt->unit = CPU_POWER_DISABLE_TURBO; 294 } else { 295 RTE_LOG(ERR, CHANNEL_MONITOR, 296 "Invalid command received in JSON\n"); 297 return -1; 298 } 299 } else { 300 RTE_LOG(ERR, CHANNEL_MONITOR, 301 "Unknown key received in JSON string: %s\n", 302 key); 303 } 304 305 resource_id = get_resource_id_from_vmname(vm_name); 306 if (resource_id < 0) { 307 RTE_LOG(ERR, CHANNEL_MONITOR, 308 "Could not get resource_id from vm_name:%s\n", 309 vm_name); 310 return -1; 311 } 312 strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ); 313 pkt->resource_id = resource_id; 314 } 315 return 0; 316 } 317 #endif 318 319 void channel_monitor_exit(void) 320 { 321 run_loop = 0; 322 rte_free(global_events_list); 323 } 324 325 static void 326 core_share(int pNo, int z, int x, int t) 327 { 328 if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) { 329 if (strcmp(policies[pNo].pkt.vm_name, 330 lvm_info[x].vm_name) != 0) { 331 policies[pNo].core_share[z].status = 1; 332 power_manager_scale_core_max( 333 policies[pNo].core_share[z].pcpu); 334 } 335 } 336 } 337 338 static void 339 core_share_status(int pNo) 340 { 341 342 int noVms = 0, noVcpus = 0, z, x, t; 343 344 get_all_vm(&noVms, &noVcpus); 345 346 /* Reset Core Share Status. */ 347 for (z = 0; z < noVcpus; z++) 348 policies[pNo].core_share[z].status = 0; 349 350 /* Foreach vcpu in a policy. */ 351 for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) { 352 /* Foreach VM on the platform. */ 353 for (x = 0; x < noVms; x++) { 354 /* Foreach vcpu of VMs on platform. */ 355 for (t = 0; t < lvm_info[x].num_cpus; t++) 356 core_share(pNo, z, x, t); 357 } 358 } 359 } 360 361 362 static int 363 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count) 364 { 365 int ret = 0; 366 367 if (pol->pkt.policy_to_use == BRANCH_RATIO) { 368 ci->cd[pcpu].oob_enabled = 1; 369 ret = add_core_to_monitor(pcpu); 370 if (ret == 0) 371 RTE_LOG(INFO, CHANNEL_MONITOR, 372 "Monitoring pcpu %d OOB for %s\n", 373 pcpu, pol->pkt.vm_name); 374 else 375 RTE_LOG(ERR, CHANNEL_MONITOR, 376 "Error monitoring pcpu %d OOB for %s\n", 377 pcpu, pol->pkt.vm_name); 378 379 } else { 380 pol->core_share[count].pcpu = pcpu; 381 RTE_LOG(INFO, CHANNEL_MONITOR, 382 "Monitoring pcpu %d for %s\n", 383 pcpu, pol->pkt.vm_name); 384 } 385 return ret; 386 } 387 388 static void 389 get_pcpu_to_control(struct policy *pol) 390 { 391 392 /* Convert vcpu to pcpu. */ 393 struct vm_info info; 394 int pcpu, count; 395 struct core_info *ci; 396 397 ci = get_core_info(); 398 399 RTE_LOG(DEBUG, CHANNEL_MONITOR, 400 "Looking for pcpu for %s\n", pol->pkt.vm_name); 401 402 /* 403 * So now that we're handling virtual and physical cores, we need to 404 * differenciate between them when adding them to the branch monitor. 405 * Virtual cores need to be converted to physical cores. 406 */ 407 if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) { 408 /* 409 * If the cores in the policy are virtual, we need to map them 410 * to physical core. We look up the vm info and use that for 411 * the mapping. 412 */ 413 get_info_vm(pol->pkt.vm_name, &info); 414 for (count = 0; count < pol->pkt.num_vcpu; count++) { 415 pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]]; 416 pcpu_monitor(pol, ci, pcpu, count); 417 } 418 } else { 419 /* 420 * If the cores in the policy are physical, we just use 421 * those core id's directly. 422 */ 423 for (count = 0; count < pol->pkt.num_vcpu; count++) { 424 pcpu = pol->pkt.vcpu_to_control[count]; 425 pcpu_monitor(pol, ci, pcpu, count); 426 } 427 } 428 } 429 430 static int 431 get_pfid(struct policy *pol) 432 { 433 434 int i, x, ret = 0; 435 436 for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) { 437 438 RTE_ETH_FOREACH_DEV(x) { 439 ret = rte_pmd_i40e_query_vfid_by_mac(x, 440 (struct rte_ether_addr *)&(pol->pkt.vfid[i])); 441 if (ret != -EINVAL) { 442 pol->port[i] = x; 443 break; 444 } 445 } 446 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) { 447 RTE_LOG(INFO, CHANNEL_MONITOR, 448 "Error with Policy. MAC not found on " 449 "attached ports "); 450 pol->enabled = 0; 451 return ret; 452 } 453 pol->pfid[i] = ret; 454 } 455 return 1; 456 } 457 458 static int 459 update_policy(struct channel_packet *pkt) 460 { 461 462 unsigned int updated = 0; 463 unsigned int i; 464 465 466 RTE_LOG(INFO, CHANNEL_MONITOR, 467 "Applying policy for %s\n", pkt->vm_name); 468 469 for (i = 0; i < RTE_DIM(policies); i++) { 470 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 471 /* Copy the contents of *pkt into the policy.pkt */ 472 policies[i].pkt = *pkt; 473 get_pcpu_to_control(&policies[i]); 474 /* Check Eth dev only for Traffic policy */ 475 if (policies[i].pkt.policy_to_use == TRAFFIC) { 476 if (get_pfid(&policies[i]) < 0) { 477 updated = 1; 478 break; 479 } 480 } 481 core_share_status(i); 482 policies[i].enabled = 1; 483 updated = 1; 484 } 485 } 486 if (!updated) { 487 for (i = 0; i < RTE_DIM(policies); i++) { 488 if (policies[i].enabled == 0) { 489 policies[i].pkt = *pkt; 490 get_pcpu_to_control(&policies[i]); 491 /* Check Eth dev only for Traffic policy */ 492 if (policies[i].pkt.policy_to_use == TRAFFIC) { 493 if (get_pfid(&policies[i]) < 0) { 494 updated = 1; 495 break; 496 } 497 } 498 core_share_status(i); 499 policies[i].enabled = 1; 500 break; 501 } 502 } 503 } 504 return 0; 505 } 506 507 static int 508 remove_policy(struct channel_packet *pkt __rte_unused) 509 { 510 unsigned int i; 511 512 /* 513 * Disabling the policy is simply a case of setting 514 * enabled to 0 515 */ 516 for (i = 0; i < RTE_DIM(policies); i++) { 517 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 518 policies[i].enabled = 0; 519 return 0; 520 } 521 } 522 return -1; 523 } 524 525 static uint64_t 526 get_pkt_diff(struct policy *pol) 527 { 528 529 uint64_t vsi_pkt_count, 530 vsi_pkt_total = 0, 531 vsi_pkt_count_prev_total = 0; 532 double rdtsc_curr, rdtsc_diff, diff; 533 int x; 534 struct rte_eth_stats vf_stats; 535 536 for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) { 537 538 /*Read vsi stats*/ 539 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0) 540 vsi_pkt_count = vf_stats.ipackets; 541 else 542 vsi_pkt_count = -1; 543 544 vsi_pkt_total += vsi_pkt_count; 545 546 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]]; 547 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count; 548 } 549 550 rdtsc_curr = rte_rdtsc_precise(); 551 rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]]; 552 rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr; 553 554 diff = (vsi_pkt_total - vsi_pkt_count_prev_total) * 555 ((double)rte_get_tsc_hz() / rdtsc_diff); 556 557 return diff; 558 } 559 560 static void 561 apply_traffic_profile(struct policy *pol) 562 { 563 564 int count; 565 uint64_t diff = 0; 566 567 diff = get_pkt_diff(pol); 568 569 if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) { 570 for (count = 0; count < pol->pkt.num_vcpu; count++) { 571 if (pol->core_share[count].status != 1) 572 power_manager_scale_core_max( 573 pol->core_share[count].pcpu); 574 } 575 } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 576 for (count = 0; count < pol->pkt.num_vcpu; count++) { 577 if (pol->core_share[count].status != 1) 578 power_manager_scale_core_med( 579 pol->core_share[count].pcpu); 580 } 581 } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 582 for (count = 0; count < pol->pkt.num_vcpu; count++) { 583 if (pol->core_share[count].status != 1) 584 power_manager_scale_core_min( 585 pol->core_share[count].pcpu); 586 } 587 } 588 } 589 590 static void 591 apply_time_profile(struct policy *pol) 592 { 593 594 int count, x; 595 struct timeval tv; 596 struct tm *ptm; 597 char time_string[40]; 598 599 /* Obtain the time of day, and convert it to a tm struct. */ 600 gettimeofday(&tv, NULL); 601 ptm = localtime(&tv.tv_sec); 602 /* Format the date and time, down to a single second. */ 603 strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm); 604 605 for (x = 0; x < HOURS; x++) { 606 607 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) { 608 for (count = 0; count < pol->pkt.num_vcpu; count++) { 609 if (pol->core_share[count].status != 1) { 610 power_manager_scale_core_max( 611 pol->core_share[count].pcpu); 612 } 613 } 614 break; 615 } else if (ptm->tm_hour == 616 pol->pkt.timer_policy.quiet_hours[x]) { 617 for (count = 0; count < pol->pkt.num_vcpu; count++) { 618 if (pol->core_share[count].status != 1) { 619 power_manager_scale_core_min( 620 pol->core_share[count].pcpu); 621 } 622 } 623 break; 624 } else if (ptm->tm_hour == 625 pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) { 626 apply_traffic_profile(pol); 627 break; 628 } 629 } 630 } 631 632 static void 633 apply_workload_profile(struct policy *pol) 634 { 635 636 int count; 637 638 if (pol->pkt.workload == HIGH) { 639 for (count = 0; count < pol->pkt.num_vcpu; count++) { 640 if (pol->core_share[count].status != 1) 641 power_manager_scale_core_max( 642 pol->core_share[count].pcpu); 643 } 644 } else if (pol->pkt.workload == MEDIUM) { 645 for (count = 0; count < pol->pkt.num_vcpu; count++) { 646 if (pol->core_share[count].status != 1) 647 power_manager_scale_core_med( 648 pol->core_share[count].pcpu); 649 } 650 } else if (pol->pkt.workload == LOW) { 651 for (count = 0; count < pol->pkt.num_vcpu; count++) { 652 if (pol->core_share[count].status != 1) 653 power_manager_scale_core_min( 654 pol->core_share[count].pcpu); 655 } 656 } 657 } 658 659 static void 660 apply_policy(struct policy *pol) 661 { 662 663 struct channel_packet *pkt = &pol->pkt; 664 665 /*Check policy to use*/ 666 if (pkt->policy_to_use == TRAFFIC) 667 apply_traffic_profile(pol); 668 else if (pkt->policy_to_use == TIME) 669 apply_time_profile(pol); 670 else if (pkt->policy_to_use == WORKLOAD) 671 apply_workload_profile(pol); 672 } 673 674 static int 675 write_binary_packet(struct channel_packet *pkt, struct channel_info *chan_info) 676 { 677 int ret, buffer_len = sizeof(*pkt); 678 void *buffer = pkt; 679 680 if (chan_info->fd < 0) { 681 RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n"); 682 return -1; 683 } 684 685 while (buffer_len > 0) { 686 ret = write(chan_info->fd, buffer, buffer_len); 687 if (ret == -1) { 688 if (errno == EINTR) 689 continue; 690 RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n", 691 strerror(errno)); 692 return -1; 693 } 694 buffer = (char *)buffer + ret; 695 buffer_len -= ret; 696 } 697 return 0; 698 } 699 700 static int 701 send_ack_for_received_cmd(struct channel_packet *pkt, 702 struct channel_info *chan_info, 703 uint32_t command) 704 { 705 pkt->command = command; 706 return write_binary_packet(pkt, chan_info); 707 } 708 709 static int 710 process_request(struct channel_packet *pkt, struct channel_info *chan_info) 711 { 712 int ret; 713 714 if (chan_info == NULL) 715 return -1; 716 717 if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED, 718 CHANNEL_MGR_CHANNEL_PROCESSING) == 0) 719 return -1; 720 721 if (pkt->command == CPU_POWER) { 722 unsigned int core_num; 723 724 if (pkt->core_type == CORE_TYPE_VIRTUAL) 725 core_num = get_pcpu(chan_info, pkt->resource_id); 726 else 727 core_num = pkt->resource_id; 728 729 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n", 730 core_num); 731 732 bool valid_unit = true; 733 int scale_res; 734 735 switch (pkt->unit) { 736 case(CPU_POWER_SCALE_MIN): 737 scale_res = power_manager_scale_core_min(core_num); 738 break; 739 case(CPU_POWER_SCALE_MAX): 740 scale_res = power_manager_scale_core_max(core_num); 741 break; 742 case(CPU_POWER_SCALE_DOWN): 743 scale_res = power_manager_scale_core_down(core_num); 744 break; 745 case(CPU_POWER_SCALE_UP): 746 scale_res = power_manager_scale_core_up(core_num); 747 break; 748 case(CPU_POWER_ENABLE_TURBO): 749 scale_res = power_manager_enable_turbo_core(core_num); 750 break; 751 case(CPU_POWER_DISABLE_TURBO): 752 scale_res = power_manager_disable_turbo_core(core_num); 753 break; 754 default: 755 valid_unit = false; 756 break; 757 } 758 759 if (valid_unit) { 760 ret = send_ack_for_received_cmd(pkt, 761 chan_info, 762 scale_res > 0 ? 763 CPU_POWER_CMD_ACK : 764 CPU_POWER_CMD_NACK); 765 if (ret < 0) 766 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Error during sending ack command.\n"); 767 } else 768 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Unexpected unit type.\n"); 769 770 } 771 772 if (pkt->command == PKT_POLICY) { 773 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n", 774 pkt->vm_name); 775 int ret = send_ack_for_received_cmd(pkt, 776 chan_info, 777 CPU_POWER_CMD_ACK); 778 if (ret < 0) 779 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Error during sending ack command.\n"); 780 update_policy(pkt); 781 policy_is_set = 1; 782 } 783 784 if (pkt->command == PKT_POLICY_REMOVE) { 785 ret = remove_policy(pkt); 786 if (ret == 0) 787 RTE_LOG(INFO, CHANNEL_MONITOR, 788 "Removed policy %s\n", pkt->vm_name); 789 else 790 RTE_LOG(INFO, CHANNEL_MONITOR, 791 "Policy %s does not exist\n", pkt->vm_name); 792 } 793 794 /* 795 * Return is not checked as channel status may have been set to DISABLED 796 * from management thread 797 */ 798 rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING, 799 CHANNEL_MGR_CHANNEL_CONNECTED); 800 return 0; 801 802 } 803 804 int 805 add_channel_to_monitor(struct channel_info **chan_info) 806 { 807 struct channel_info *info = *chan_info; 808 struct epoll_event event; 809 810 event.events = EPOLLIN; 811 event.data.ptr = info; 812 if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) { 813 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' " 814 "to epoll\n", info->channel_path); 815 return -1; 816 } 817 RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' " 818 "to monitor\n", info->channel_path); 819 return 0; 820 } 821 822 int 823 remove_channel_from_monitor(struct channel_info *chan_info) 824 { 825 if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL, 826 chan_info->fd, NULL) < 0) { 827 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' " 828 "from epoll\n", chan_info->channel_path); 829 return -1; 830 } 831 return 0; 832 } 833 834 int 835 channel_monitor_init(void) 836 { 837 global_event_fd = epoll_create1(0); 838 if (global_event_fd == 0) { 839 RTE_LOG(ERR, CHANNEL_MONITOR, 840 "Error creating epoll context with error %s\n", 841 strerror(errno)); 842 return -1; 843 } 844 global_events_list = rte_malloc("epoll_events", 845 sizeof(*global_events_list) 846 * MAX_EVENTS, RTE_CACHE_LINE_SIZE); 847 if (global_events_list == NULL) { 848 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for " 849 "epoll events\n"); 850 return -1; 851 } 852 return 0; 853 } 854 855 static void 856 read_binary_packet(struct channel_info *chan_info) 857 { 858 struct channel_packet pkt; 859 void *buffer = &pkt; 860 int buffer_len = sizeof(pkt); 861 int n_bytes, err = 0; 862 863 while (buffer_len > 0) { 864 n_bytes = read(chan_info->fd, 865 buffer, buffer_len); 866 if (n_bytes == buffer_len) 867 break; 868 if (n_bytes < 0) { 869 err = errno; 870 RTE_LOG(DEBUG, CHANNEL_MONITOR, 871 "Received error on " 872 "channel '%s' read: %s\n", 873 chan_info->channel_path, 874 strerror(err)); 875 remove_channel(&chan_info); 876 break; 877 } 878 buffer = (char *)buffer + n_bytes; 879 buffer_len -= n_bytes; 880 } 881 if (!err) 882 process_request(&pkt, chan_info); 883 } 884 885 #ifdef USE_JANSSON 886 static void 887 read_json_packet(struct channel_info *chan_info) 888 { 889 struct channel_packet pkt; 890 int n_bytes, ret; 891 json_t *root; 892 json_error_t error; 893 const char *resource_name; 894 char *start, *end; 895 uint32_t n; 896 897 898 /* read opening brace to closing brace */ 899 do { 900 int idx = 0; 901 int indent = 0; 902 do { 903 n_bytes = read(chan_info->fd, &json_data[idx], 1); 904 if (n_bytes == 0) 905 break; 906 if (json_data[idx] == '{') 907 indent++; 908 if (json_data[idx] == '}') 909 indent--; 910 if ((indent > 0) || (idx > 0)) 911 idx++; 912 if (indent <= 0) 913 json_data[idx] = 0; 914 if (idx >= MAX_JSON_STRING_LEN-1) 915 break; 916 } while (indent > 0); 917 918 json_data[idx] = '\0'; 919 920 if (strlen(json_data) == 0) 921 continue; 922 923 printf("got [%s]\n", json_data); 924 925 root = json_loads(json_data, 0, &error); 926 927 if (root) { 928 resource_name = get_resource_name_from_chn_path( 929 chan_info->channel_path); 930 /* 931 * Because our data is now in the json 932 * object, we can overwrite the pkt 933 * with a channel_packet struct, using 934 * parse_json_to_pkt() 935 */ 936 ret = parse_json_to_pkt(root, &pkt, resource_name); 937 json_decref(root); 938 if (ret) { 939 RTE_LOG(ERR, CHANNEL_MONITOR, 940 "Error validating JSON profile data\n"); 941 break; 942 } 943 start = strstr(pkt.vm_name, 944 CHANNEL_MGR_FIFO_PATTERN_NAME); 945 if (start != NULL) { 946 /* move past pattern to start of fifo id */ 947 start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME); 948 949 end = start; 950 n = (uint32_t)strtoul(start, &end, 10); 951 952 if (end[0] == '\0') { 953 /* Add core id to core list */ 954 pkt.num_vcpu = 1; 955 pkt.vcpu_to_control[0] = n; 956 process_request(&pkt, chan_info); 957 } else { 958 RTE_LOG(ERR, CHANNEL_MONITOR, 959 "Cannot extract core id from fifo name\n"); 960 } 961 } else { 962 process_request(&pkt, chan_info); 963 } 964 } else { 965 RTE_LOG(ERR, CHANNEL_MONITOR, 966 "JSON error on line %d: %s\n", 967 error.line, error.text); 968 } 969 } while (n_bytes > 0); 970 } 971 #endif 972 973 void 974 run_channel_monitor(void) 975 { 976 while (run_loop) { 977 int n_events, i; 978 979 n_events = epoll_wait(global_event_fd, global_events_list, 980 MAX_EVENTS, 1); 981 if (!run_loop) 982 break; 983 for (i = 0; i < n_events; i++) { 984 struct channel_info *chan_info = (struct channel_info *) 985 global_events_list[i].data.ptr; 986 if ((global_events_list[i].events & EPOLLERR) || 987 (global_events_list[i].events & EPOLLHUP)) { 988 RTE_LOG(INFO, CHANNEL_MONITOR, 989 "Remote closed connection for " 990 "channel '%s'\n", 991 chan_info->channel_path); 992 remove_channel(&chan_info); 993 continue; 994 } 995 if (global_events_list[i].events & EPOLLIN) { 996 997 switch (chan_info->type) { 998 case CHANNEL_TYPE_BINARY: 999 read_binary_packet(chan_info); 1000 break; 1001 #ifdef USE_JANSSON 1002 case CHANNEL_TYPE_JSON: 1003 read_json_packet(chan_info); 1004 break; 1005 #endif 1006 default: 1007 break; 1008 } 1009 } 1010 } 1011 rte_delay_us(time_period_ms*1000); 1012 if (policy_is_set) { 1013 unsigned int j; 1014 1015 for (j = 0; j < RTE_DIM(policies); j++) { 1016 if (policies[j].enabled == 1) 1017 apply_policy(&policies[j]); 1018 } 1019 } 1020 } 1021 } 1022