1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 */ 4 5 #include <unistd.h> 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include <stdint.h> 9 #include <signal.h> 10 #include <errno.h> 11 #include <string.h> 12 #include <fcntl.h> 13 #include <sys/types.h> 14 #include <sys/epoll.h> 15 #include <sys/queue.h> 16 #include <sys/time.h> 17 #include <sys/socket.h> 18 #include <sys/select.h> 19 #ifdef USE_JANSSON 20 #include <jansson.h> 21 #else 22 #pragma message "Jansson dev libs unavailable, not including JSON parsing" 23 #endif 24 #include <rte_string_fns.h> 25 #include <rte_log.h> 26 #include <rte_memory.h> 27 #include <rte_malloc.h> 28 #include <rte_atomic.h> 29 #include <rte_cycles.h> 30 #include <rte_ethdev.h> 31 #include <rte_pmd_i40e.h> 32 33 #include <libvirt/libvirt.h> 34 #include "channel_monitor.h" 35 #include "channel_commands.h" 36 #include "channel_manager.h" 37 #include "power_manager.h" 38 #include "oob_monitor.h" 39 40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1 41 42 #define MAX_EVENTS 256 43 44 uint64_t vsi_pkt_count_prev[384]; 45 uint64_t rdtsc_prev[384]; 46 #define MAX_JSON_STRING_LEN 1024 47 char json_data[MAX_JSON_STRING_LEN]; 48 49 double time_period_ms = 1; 50 static volatile unsigned run_loop = 1; 51 static int global_event_fd; 52 static unsigned int policy_is_set; 53 static struct epoll_event *global_events_list; 54 static struct policy policies[RTE_MAX_LCORE]; 55 56 #ifdef USE_JANSSON 57 58 union PFID { 59 struct rte_ether_addr addr; 60 uint64_t pfid; 61 }; 62 63 static int 64 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr) 65 { 66 int i; 67 char *end; 68 unsigned long o[RTE_ETHER_ADDR_LEN]; 69 70 i = 0; 71 do { 72 errno = 0; 73 o[i] = strtoul(a, &end, 16); 74 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0)) 75 return -1; 76 a = end + 1; 77 } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0); 78 79 /* Junk at the end of line */ 80 if (end[0] != 0) 81 return -1; 82 83 /* Support the format XX:XX:XX:XX:XX:XX */ 84 if (i == RTE_ETHER_ADDR_LEN) { 85 while (i-- != 0) { 86 if (o[i] > UINT8_MAX) 87 return -1; 88 ether_addr->addr_bytes[i] = (uint8_t)o[i]; 89 } 90 /* Support the format XXXX:XXXX:XXXX */ 91 } else if (i == RTE_ETHER_ADDR_LEN / 2) { 92 while (i-- != 0) { 93 if (o[i] > UINT16_MAX) 94 return -1; 95 ether_addr->addr_bytes[i * 2] = 96 (uint8_t)(o[i] >> 8); 97 ether_addr->addr_bytes[i * 2 + 1] = 98 (uint8_t)(o[i] & 0xff); 99 } 100 /* unknown format */ 101 } else 102 return -1; 103 104 return 0; 105 } 106 107 static int 108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac) 109 { 110 union PFID pfid; 111 int ret; 112 113 /* Use port MAC address as the vfid */ 114 ret = str_to_ether_addr(mac, &pfid.addr); 115 116 if (ret != 0) { 117 RTE_LOG(ERR, CHANNEL_MONITOR, 118 "Invalid mac address received in JSON\n"); 119 pkt->vfid[idx] = 0; 120 return -1; 121 } 122 123 printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":" 124 "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n", 125 pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1], 126 pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3], 127 pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]); 128 129 pkt->vfid[idx] = pfid.pfid; 130 return 0; 131 } 132 133 static char* 134 get_resource_name_from_chn_path(const char *channel_path) 135 { 136 char *substr = NULL; 137 138 substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME); 139 140 return substr; 141 } 142 143 static int 144 get_resource_id_from_vmname(const char *vm_name) 145 { 146 int result = -1; 147 int off = 0; 148 149 if (vm_name == NULL) 150 return -1; 151 152 while (vm_name[off] != '\0') { 153 if (isdigit(vm_name[off])) 154 break; 155 off++; 156 } 157 result = atoi(&vm_name[off]); 158 if ((result == 0) && (vm_name[off] != '0')) 159 return -1; 160 161 return result; 162 } 163 164 static int 165 parse_json_to_pkt(json_t *element, struct channel_packet *pkt, 166 const char *vm_name) 167 { 168 const char *key; 169 json_t *value; 170 int ret; 171 int resource_id; 172 173 memset(pkt, 0, sizeof(struct channel_packet)); 174 175 pkt->nb_mac_to_monitor = 0; 176 pkt->t_boost_status.tbEnabled = false; 177 pkt->workload = LOW; 178 pkt->policy_to_use = TIME; 179 pkt->command = PKT_POLICY; 180 pkt->core_type = CORE_TYPE_PHYSICAL; 181 182 if (vm_name == NULL) { 183 RTE_LOG(ERR, CHANNEL_MONITOR, 184 "vm_name is NULL, request rejected !\n"); 185 return -1; 186 } 187 188 json_object_foreach(element, key, value) { 189 if (!strcmp(key, "policy")) { 190 /* Recurse in to get the contents of profile */ 191 ret = parse_json_to_pkt(value, pkt, vm_name); 192 if (ret) 193 return ret; 194 } else if (!strcmp(key, "instruction")) { 195 /* Recurse in to get the contents of instruction */ 196 ret = parse_json_to_pkt(value, pkt, vm_name); 197 if (ret) 198 return ret; 199 } else if (!strcmp(key, "command")) { 200 char command[32]; 201 strlcpy(command, json_string_value(value), 32); 202 if (!strcmp(command, "power")) { 203 pkt->command = CPU_POWER; 204 } else if (!strcmp(command, "create")) { 205 pkt->command = PKT_POLICY; 206 } else if (!strcmp(command, "destroy")) { 207 pkt->command = PKT_POLICY_REMOVE; 208 } else { 209 RTE_LOG(ERR, CHANNEL_MONITOR, 210 "Invalid command received in JSON\n"); 211 return -1; 212 } 213 } else if (!strcmp(key, "policy_type")) { 214 char command[32]; 215 strlcpy(command, json_string_value(value), 32); 216 if (!strcmp(command, "TIME")) { 217 pkt->policy_to_use = TIME; 218 } else if (!strcmp(command, "TRAFFIC")) { 219 pkt->policy_to_use = TRAFFIC; 220 } else if (!strcmp(command, "WORKLOAD")) { 221 pkt->policy_to_use = WORKLOAD; 222 } else if (!strcmp(command, "BRANCH_RATIO")) { 223 pkt->policy_to_use = BRANCH_RATIO; 224 } else { 225 RTE_LOG(ERR, CHANNEL_MONITOR, 226 "Wrong policy_type received in JSON\n"); 227 return -1; 228 } 229 } else if (!strcmp(key, "workload")) { 230 char command[32]; 231 strlcpy(command, json_string_value(value), 32); 232 if (!strcmp(command, "HIGH")) { 233 pkt->workload = HIGH; 234 } else if (!strcmp(command, "MEDIUM")) { 235 pkt->workload = MEDIUM; 236 } else if (!strcmp(command, "LOW")) { 237 pkt->workload = LOW; 238 } else { 239 RTE_LOG(ERR, CHANNEL_MONITOR, 240 "Wrong workload received in JSON\n"); 241 return -1; 242 } 243 } else if (!strcmp(key, "busy_hours")) { 244 unsigned int i; 245 size_t size = json_array_size(value); 246 247 for (i = 0; i < size; i++) { 248 int hour = (int)json_integer_value( 249 json_array_get(value, i)); 250 pkt->timer_policy.busy_hours[i] = hour; 251 } 252 } else if (!strcmp(key, "quiet_hours")) { 253 unsigned int i; 254 size_t size = json_array_size(value); 255 256 for (i = 0; i < size; i++) { 257 int hour = (int)json_integer_value( 258 json_array_get(value, i)); 259 pkt->timer_policy.quiet_hours[i] = hour; 260 } 261 } else if (!strcmp(key, "mac_list")) { 262 unsigned int i; 263 size_t size = json_array_size(value); 264 265 for (i = 0; i < size; i++) { 266 char mac[32]; 267 strlcpy(mac, 268 json_string_value(json_array_get(value, i)), 269 32); 270 set_policy_mac(pkt, i, mac); 271 } 272 pkt->nb_mac_to_monitor = size; 273 } else if (!strcmp(key, "avg_packet_thresh")) { 274 pkt->traffic_policy.avg_max_packet_thresh = 275 (uint32_t)json_integer_value(value); 276 } else if (!strcmp(key, "max_packet_thresh")) { 277 pkt->traffic_policy.max_max_packet_thresh = 278 (uint32_t)json_integer_value(value); 279 } else if (!strcmp(key, "unit")) { 280 char unit[32]; 281 strlcpy(unit, json_string_value(value), 32); 282 if (!strcmp(unit, "SCALE_UP")) { 283 pkt->unit = CPU_POWER_SCALE_UP; 284 } else if (!strcmp(unit, "SCALE_DOWN")) { 285 pkt->unit = CPU_POWER_SCALE_DOWN; 286 } else if (!strcmp(unit, "SCALE_MAX")) { 287 pkt->unit = CPU_POWER_SCALE_MAX; 288 } else if (!strcmp(unit, "SCALE_MIN")) { 289 pkt->unit = CPU_POWER_SCALE_MIN; 290 } else if (!strcmp(unit, "ENABLE_TURBO")) { 291 pkt->unit = CPU_POWER_ENABLE_TURBO; 292 } else if (!strcmp(unit, "DISABLE_TURBO")) { 293 pkt->unit = CPU_POWER_DISABLE_TURBO; 294 } else { 295 RTE_LOG(ERR, CHANNEL_MONITOR, 296 "Invalid command received in JSON\n"); 297 return -1; 298 } 299 } else { 300 RTE_LOG(ERR, CHANNEL_MONITOR, 301 "Unknown key received in JSON string: %s\n", 302 key); 303 } 304 305 resource_id = get_resource_id_from_vmname(vm_name); 306 if (resource_id < 0) { 307 RTE_LOG(ERR, CHANNEL_MONITOR, 308 "Could not get resource_id from vm_name:%s\n", 309 vm_name); 310 return -1; 311 } 312 strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ); 313 pkt->resource_id = resource_id; 314 } 315 return 0; 316 } 317 #endif 318 319 void channel_monitor_exit(void) 320 { 321 run_loop = 0; 322 rte_free(global_events_list); 323 } 324 325 static void 326 core_share(int pNo, int z, int x, int t) 327 { 328 if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) { 329 if (strcmp(policies[pNo].pkt.vm_name, 330 lvm_info[x].vm_name) != 0) { 331 policies[pNo].core_share[z].status = 1; 332 power_manager_scale_core_max( 333 policies[pNo].core_share[z].pcpu); 334 } 335 } 336 } 337 338 static void 339 core_share_status(int pNo) 340 { 341 342 int noVms = 0, noVcpus = 0, z, x, t; 343 344 get_all_vm(&noVms, &noVcpus); 345 346 /* Reset Core Share Status. */ 347 for (z = 0; z < noVcpus; z++) 348 policies[pNo].core_share[z].status = 0; 349 350 /* Foreach vcpu in a policy. */ 351 for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) { 352 /* Foreach VM on the platform. */ 353 for (x = 0; x < noVms; x++) { 354 /* Foreach vcpu of VMs on platform. */ 355 for (t = 0; t < lvm_info[x].num_cpus; t++) 356 core_share(pNo, z, x, t); 357 } 358 } 359 } 360 361 362 static int 363 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count) 364 { 365 int ret = 0; 366 367 if (pol->pkt.policy_to_use == BRANCH_RATIO) { 368 ci->cd[pcpu].oob_enabled = 1; 369 ret = add_core_to_monitor(pcpu); 370 if (ret == 0) 371 RTE_LOG(INFO, CHANNEL_MONITOR, 372 "Monitoring pcpu %d OOB for %s\n", 373 pcpu, pol->pkt.vm_name); 374 else 375 RTE_LOG(ERR, CHANNEL_MONITOR, 376 "Error monitoring pcpu %d OOB for %s\n", 377 pcpu, pol->pkt.vm_name); 378 379 } else { 380 pol->core_share[count].pcpu = pcpu; 381 RTE_LOG(INFO, CHANNEL_MONITOR, 382 "Monitoring pcpu %d for %s\n", 383 pcpu, pol->pkt.vm_name); 384 } 385 return ret; 386 } 387 388 static void 389 get_pcpu_to_control(struct policy *pol) 390 { 391 392 /* Convert vcpu to pcpu. */ 393 struct vm_info info; 394 int pcpu, count; 395 struct core_info *ci; 396 397 ci = get_core_info(); 398 399 RTE_LOG(DEBUG, CHANNEL_MONITOR, 400 "Looking for pcpu for %s\n", pol->pkt.vm_name); 401 402 /* 403 * So now that we're handling virtual and physical cores, we need to 404 * differenciate between them when adding them to the branch monitor. 405 * Virtual cores need to be converted to physical cores. 406 */ 407 if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) { 408 /* 409 * If the cores in the policy are virtual, we need to map them 410 * to physical core. We look up the vm info and use that for 411 * the mapping. 412 */ 413 get_info_vm(pol->pkt.vm_name, &info); 414 for (count = 0; count < pol->pkt.num_vcpu; count++) { 415 pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]]; 416 pcpu_monitor(pol, ci, pcpu, count); 417 } 418 } else { 419 /* 420 * If the cores in the policy are physical, we just use 421 * those core id's directly. 422 */ 423 for (count = 0; count < pol->pkt.num_vcpu; count++) { 424 pcpu = pol->pkt.vcpu_to_control[count]; 425 pcpu_monitor(pol, ci, pcpu, count); 426 } 427 } 428 } 429 430 static int 431 get_pfid(struct policy *pol) 432 { 433 434 int i, x, ret = 0; 435 436 for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) { 437 438 RTE_ETH_FOREACH_DEV(x) { 439 ret = rte_pmd_i40e_query_vfid_by_mac(x, 440 (struct rte_ether_addr *)&(pol->pkt.vfid[i])); 441 if (ret != -EINVAL) { 442 pol->port[i] = x; 443 break; 444 } 445 } 446 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) { 447 RTE_LOG(INFO, CHANNEL_MONITOR, 448 "Error with Policy. MAC not found on " 449 "attached ports "); 450 pol->enabled = 0; 451 return ret; 452 } 453 pol->pfid[i] = ret; 454 } 455 return 1; 456 } 457 458 static int 459 update_policy(struct channel_packet *pkt) 460 { 461 462 unsigned int updated = 0; 463 unsigned int i; 464 465 466 RTE_LOG(INFO, CHANNEL_MONITOR, 467 "Applying policy for %s\n", pkt->vm_name); 468 469 for (i = 0; i < RTE_DIM(policies); i++) { 470 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 471 /* Copy the contents of *pkt into the policy.pkt */ 472 policies[i].pkt = *pkt; 473 get_pcpu_to_control(&policies[i]); 474 /* Check Eth dev only for Traffic policy */ 475 if (policies[i].pkt.policy_to_use == TRAFFIC) { 476 if (get_pfid(&policies[i]) < 0) { 477 updated = 1; 478 break; 479 } 480 } 481 core_share_status(i); 482 policies[i].enabled = 1; 483 updated = 1; 484 } 485 } 486 if (!updated) { 487 for (i = 0; i < RTE_DIM(policies); i++) { 488 if (policies[i].enabled == 0) { 489 policies[i].pkt = *pkt; 490 get_pcpu_to_control(&policies[i]); 491 /* Check Eth dev only for Traffic policy */ 492 if (policies[i].pkt.policy_to_use == TRAFFIC) { 493 if (get_pfid(&policies[i]) < 0) { 494 updated = 1; 495 break; 496 } 497 } 498 core_share_status(i); 499 policies[i].enabled = 1; 500 break; 501 } 502 } 503 } 504 return 0; 505 } 506 507 static int 508 remove_policy(struct channel_packet *pkt __rte_unused) 509 { 510 unsigned int i; 511 512 /* 513 * Disabling the policy is simply a case of setting 514 * enabled to 0 515 */ 516 for (i = 0; i < RTE_DIM(policies); i++) { 517 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 518 policies[i].enabled = 0; 519 return 0; 520 } 521 } 522 return -1; 523 } 524 525 static uint64_t 526 get_pkt_diff(struct policy *pol) 527 { 528 529 uint64_t vsi_pkt_count, 530 vsi_pkt_total = 0, 531 vsi_pkt_count_prev_total = 0; 532 double rdtsc_curr, rdtsc_diff, diff; 533 int x; 534 struct rte_eth_stats vf_stats; 535 536 for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) { 537 538 /*Read vsi stats*/ 539 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0) 540 vsi_pkt_count = vf_stats.ipackets; 541 else 542 vsi_pkt_count = -1; 543 544 vsi_pkt_total += vsi_pkt_count; 545 546 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]]; 547 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count; 548 } 549 550 rdtsc_curr = rte_rdtsc_precise(); 551 rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]]; 552 rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr; 553 554 diff = (vsi_pkt_total - vsi_pkt_count_prev_total) * 555 ((double)rte_get_tsc_hz() / rdtsc_diff); 556 557 return diff; 558 } 559 560 static void 561 apply_traffic_profile(struct policy *pol) 562 { 563 564 int count; 565 uint64_t diff = 0; 566 567 diff = get_pkt_diff(pol); 568 569 if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) { 570 for (count = 0; count < pol->pkt.num_vcpu; count++) { 571 if (pol->core_share[count].status != 1) 572 power_manager_scale_core_max( 573 pol->core_share[count].pcpu); 574 } 575 } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 576 for (count = 0; count < pol->pkt.num_vcpu; count++) { 577 if (pol->core_share[count].status != 1) 578 power_manager_scale_core_med( 579 pol->core_share[count].pcpu); 580 } 581 } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 582 for (count = 0; count < pol->pkt.num_vcpu; count++) { 583 if (pol->core_share[count].status != 1) 584 power_manager_scale_core_min( 585 pol->core_share[count].pcpu); 586 } 587 } 588 } 589 590 static void 591 apply_time_profile(struct policy *pol) 592 { 593 594 int count, x; 595 struct timeval tv; 596 struct tm *ptm; 597 char time_string[40]; 598 599 /* Obtain the time of day, and convert it to a tm struct. */ 600 gettimeofday(&tv, NULL); 601 ptm = localtime(&tv.tv_sec); 602 /* Format the date and time, down to a single second. */ 603 strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm); 604 605 for (x = 0; x < HOURS; x++) { 606 607 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) { 608 for (count = 0; count < pol->pkt.num_vcpu; count++) { 609 if (pol->core_share[count].status != 1) { 610 power_manager_scale_core_max( 611 pol->core_share[count].pcpu); 612 } 613 } 614 break; 615 } else if (ptm->tm_hour == 616 pol->pkt.timer_policy.quiet_hours[x]) { 617 for (count = 0; count < pol->pkt.num_vcpu; count++) { 618 if (pol->core_share[count].status != 1) { 619 power_manager_scale_core_min( 620 pol->core_share[count].pcpu); 621 } 622 } 623 break; 624 } else if (ptm->tm_hour == 625 pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) { 626 apply_traffic_profile(pol); 627 break; 628 } 629 } 630 } 631 632 static void 633 apply_workload_profile(struct policy *pol) 634 { 635 636 int count; 637 638 if (pol->pkt.workload == HIGH) { 639 for (count = 0; count < pol->pkt.num_vcpu; count++) { 640 if (pol->core_share[count].status != 1) 641 power_manager_scale_core_max( 642 pol->core_share[count].pcpu); 643 } 644 } else if (pol->pkt.workload == MEDIUM) { 645 for (count = 0; count < pol->pkt.num_vcpu; count++) { 646 if (pol->core_share[count].status != 1) 647 power_manager_scale_core_med( 648 pol->core_share[count].pcpu); 649 } 650 } else if (pol->pkt.workload == LOW) { 651 for (count = 0; count < pol->pkt.num_vcpu; count++) { 652 if (pol->core_share[count].status != 1) 653 power_manager_scale_core_min( 654 pol->core_share[count].pcpu); 655 } 656 } 657 } 658 659 static void 660 apply_policy(struct policy *pol) 661 { 662 663 struct channel_packet *pkt = &pol->pkt; 664 665 /*Check policy to use*/ 666 if (pkt->policy_to_use == TRAFFIC) 667 apply_traffic_profile(pol); 668 else if (pkt->policy_to_use == TIME) 669 apply_time_profile(pol); 670 else if (pkt->policy_to_use == WORKLOAD) 671 apply_workload_profile(pol); 672 } 673 674 static int 675 process_request(struct channel_packet *pkt, struct channel_info *chan_info) 676 { 677 int ret; 678 679 if (chan_info == NULL) 680 return -1; 681 682 if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED, 683 CHANNEL_MGR_CHANNEL_PROCESSING) == 0) 684 return -1; 685 686 if (pkt->command == CPU_POWER) { 687 unsigned int core_num; 688 689 if (pkt->core_type == CORE_TYPE_VIRTUAL) 690 core_num = get_pcpu(chan_info, pkt->resource_id); 691 else 692 core_num = pkt->resource_id; 693 694 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n", 695 core_num); 696 697 switch (pkt->unit) { 698 case(CPU_POWER_SCALE_MIN): 699 power_manager_scale_core_min(core_num); 700 break; 701 case(CPU_POWER_SCALE_MAX): 702 power_manager_scale_core_max(core_num); 703 break; 704 case(CPU_POWER_SCALE_DOWN): 705 power_manager_scale_core_down(core_num); 706 break; 707 case(CPU_POWER_SCALE_UP): 708 power_manager_scale_core_up(core_num); 709 break; 710 case(CPU_POWER_ENABLE_TURBO): 711 power_manager_enable_turbo_core(core_num); 712 break; 713 case(CPU_POWER_DISABLE_TURBO): 714 power_manager_disable_turbo_core(core_num); 715 break; 716 default: 717 break; 718 } 719 } 720 721 if (pkt->command == PKT_POLICY) { 722 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n", 723 pkt->vm_name); 724 update_policy(pkt); 725 policy_is_set = 1; 726 } 727 728 if (pkt->command == PKT_POLICY_REMOVE) { 729 ret = remove_policy(pkt); 730 if (ret == 0) 731 RTE_LOG(INFO, CHANNEL_MONITOR, 732 "Removed policy %s\n", pkt->vm_name); 733 else 734 RTE_LOG(INFO, CHANNEL_MONITOR, 735 "Policy %s does not exist\n", pkt->vm_name); 736 } 737 738 /* 739 * Return is not checked as channel status may have been set to DISABLED 740 * from management thread 741 */ 742 rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING, 743 CHANNEL_MGR_CHANNEL_CONNECTED); 744 return 0; 745 746 } 747 748 int 749 add_channel_to_monitor(struct channel_info **chan_info) 750 { 751 struct channel_info *info = *chan_info; 752 struct epoll_event event; 753 754 event.events = EPOLLIN; 755 event.data.ptr = info; 756 if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) { 757 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' " 758 "to epoll\n", info->channel_path); 759 return -1; 760 } 761 RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' " 762 "to monitor\n", info->channel_path); 763 return 0; 764 } 765 766 int 767 remove_channel_from_monitor(struct channel_info *chan_info) 768 { 769 if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL, 770 chan_info->fd, NULL) < 0) { 771 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' " 772 "from epoll\n", chan_info->channel_path); 773 return -1; 774 } 775 return 0; 776 } 777 778 int 779 channel_monitor_init(void) 780 { 781 global_event_fd = epoll_create1(0); 782 if (global_event_fd == 0) { 783 RTE_LOG(ERR, CHANNEL_MONITOR, 784 "Error creating epoll context with error %s\n", 785 strerror(errno)); 786 return -1; 787 } 788 global_events_list = rte_malloc("epoll_events", 789 sizeof(*global_events_list) 790 * MAX_EVENTS, RTE_CACHE_LINE_SIZE); 791 if (global_events_list == NULL) { 792 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for " 793 "epoll events\n"); 794 return -1; 795 } 796 return 0; 797 } 798 799 static void 800 read_binary_packet(struct channel_info *chan_info) 801 { 802 struct channel_packet pkt; 803 void *buffer = &pkt; 804 int buffer_len = sizeof(pkt); 805 int n_bytes, err = 0; 806 807 while (buffer_len > 0) { 808 n_bytes = read(chan_info->fd, 809 buffer, buffer_len); 810 if (n_bytes == buffer_len) 811 break; 812 if (n_bytes < 0) { 813 err = errno; 814 RTE_LOG(DEBUG, CHANNEL_MONITOR, 815 "Received error on " 816 "channel '%s' read: %s\n", 817 chan_info->channel_path, 818 strerror(err)); 819 remove_channel(&chan_info); 820 break; 821 } 822 buffer = (char *)buffer + n_bytes; 823 buffer_len -= n_bytes; 824 } 825 if (!err) 826 process_request(&pkt, chan_info); 827 } 828 829 #ifdef USE_JANSSON 830 static void 831 read_json_packet(struct channel_info *chan_info) 832 { 833 struct channel_packet pkt; 834 int n_bytes, ret; 835 json_t *root; 836 json_error_t error; 837 const char *resource_name; 838 char *start, *end; 839 uint32_t n; 840 841 842 /* read opening brace to closing brace */ 843 do { 844 int idx = 0; 845 int indent = 0; 846 do { 847 n_bytes = read(chan_info->fd, &json_data[idx], 1); 848 if (n_bytes == 0) 849 break; 850 if (json_data[idx] == '{') 851 indent++; 852 if (json_data[idx] == '}') 853 indent--; 854 if ((indent > 0) || (idx > 0)) 855 idx++; 856 if (indent <= 0) 857 json_data[idx] = 0; 858 if (idx >= MAX_JSON_STRING_LEN-1) 859 break; 860 } while (indent > 0); 861 862 json_data[idx] = '\0'; 863 864 if (strlen(json_data) == 0) 865 continue; 866 867 printf("got [%s]\n", json_data); 868 869 root = json_loads(json_data, 0, &error); 870 871 if (root) { 872 resource_name = get_resource_name_from_chn_path( 873 chan_info->channel_path); 874 /* 875 * Because our data is now in the json 876 * object, we can overwrite the pkt 877 * with a channel_packet struct, using 878 * parse_json_to_pkt() 879 */ 880 ret = parse_json_to_pkt(root, &pkt, resource_name); 881 json_decref(root); 882 if (ret) { 883 RTE_LOG(ERR, CHANNEL_MONITOR, 884 "Error validating JSON profile data\n"); 885 break; 886 } 887 start = strstr(pkt.vm_name, 888 CHANNEL_MGR_FIFO_PATTERN_NAME); 889 if (start != NULL) { 890 /* move past pattern to start of fifo id */ 891 start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME); 892 893 end = start; 894 n = (uint32_t)strtoul(start, &end, 10); 895 896 if (end[0] == '\0') { 897 /* Add core id to core list */ 898 pkt.num_vcpu = 1; 899 pkt.vcpu_to_control[0] = n; 900 process_request(&pkt, chan_info); 901 } else { 902 RTE_LOG(ERR, CHANNEL_MONITOR, 903 "Cannot extract core id from fifo name\n"); 904 } 905 } else { 906 process_request(&pkt, chan_info); 907 } 908 } else { 909 RTE_LOG(ERR, CHANNEL_MONITOR, 910 "JSON error on line %d: %s\n", 911 error.line, error.text); 912 } 913 } while (n_bytes > 0); 914 } 915 #endif 916 917 void 918 run_channel_monitor(void) 919 { 920 while (run_loop) { 921 int n_events, i; 922 923 n_events = epoll_wait(global_event_fd, global_events_list, 924 MAX_EVENTS, 1); 925 if (!run_loop) 926 break; 927 for (i = 0; i < n_events; i++) { 928 struct channel_info *chan_info = (struct channel_info *) 929 global_events_list[i].data.ptr; 930 if ((global_events_list[i].events & EPOLLERR) || 931 (global_events_list[i].events & EPOLLHUP)) { 932 RTE_LOG(INFO, CHANNEL_MONITOR, 933 "Remote closed connection for " 934 "channel '%s'\n", 935 chan_info->channel_path); 936 remove_channel(&chan_info); 937 continue; 938 } 939 if (global_events_list[i].events & EPOLLIN) { 940 941 switch (chan_info->type) { 942 case CHANNEL_TYPE_BINARY: 943 read_binary_packet(chan_info); 944 break; 945 #ifdef USE_JANSSON 946 case CHANNEL_TYPE_JSON: 947 read_json_packet(chan_info); 948 break; 949 #endif 950 default: 951 break; 952 } 953 } 954 } 955 rte_delay_us(time_period_ms*1000); 956 if (policy_is_set) { 957 unsigned int j; 958 959 for (j = 0; j < RTE_DIM(policies); j++) { 960 if (policies[j].enabled == 1) 961 apply_policy(&policies[j]); 962 } 963 } 964 } 965 } 966