1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 */ 4 5 #include <unistd.h> 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include <stdint.h> 9 #include <signal.h> 10 #include <errno.h> 11 #include <string.h> 12 #include <fcntl.h> 13 #include <sys/types.h> 14 #include <sys/epoll.h> 15 #include <sys/queue.h> 16 #include <sys/time.h> 17 #include <sys/socket.h> 18 #include <sys/select.h> 19 #ifdef USE_JANSSON 20 #include <jansson.h> 21 #else 22 #pragma message "Jansson dev libs unavailable, not including JSON parsing" 23 #endif 24 #include <rte_string_fns.h> 25 #include <rte_log.h> 26 #include <rte_memory.h> 27 #include <rte_malloc.h> 28 #include <rte_atomic.h> 29 #include <rte_cycles.h> 30 #include <rte_ethdev.h> 31 #include <rte_pmd_i40e.h> 32 33 #include <libvirt/libvirt.h> 34 #include "channel_monitor.h" 35 #include "channel_commands.h" 36 #include "channel_manager.h" 37 #include "power_manager.h" 38 #include "oob_monitor.h" 39 40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1 41 42 #define MAX_EVENTS 256 43 44 uint64_t vsi_pkt_count_prev[384]; 45 uint64_t rdtsc_prev[384]; 46 #define MAX_JSON_STRING_LEN 1024 47 char json_data[MAX_JSON_STRING_LEN]; 48 49 double time_period_ms = 1; 50 static volatile unsigned run_loop = 1; 51 static int global_event_fd; 52 static unsigned int policy_is_set; 53 static struct epoll_event *global_events_list; 54 static struct policy policies[MAX_CLIENTS]; 55 56 #ifdef USE_JANSSON 57 58 union PFID { 59 struct rte_ether_addr addr; 60 uint64_t pfid; 61 }; 62 63 static int 64 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr) 65 { 66 int i; 67 char *end; 68 unsigned long o[RTE_ETHER_ADDR_LEN]; 69 70 i = 0; 71 do { 72 errno = 0; 73 o[i] = strtoul(a, &end, 16); 74 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0)) 75 return -1; 76 a = end + 1; 77 } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0); 78 79 /* Junk at the end of line */ 80 if (end[0] != 0) 81 return -1; 82 83 /* Support the format XX:XX:XX:XX:XX:XX */ 84 if (i == RTE_ETHER_ADDR_LEN) { 85 while (i-- != 0) { 86 if (o[i] > UINT8_MAX) 87 return -1; 88 ether_addr->addr_bytes[i] = (uint8_t)o[i]; 89 } 90 /* Support the format XXXX:XXXX:XXXX */ 91 } else if (i == RTE_ETHER_ADDR_LEN / 2) { 92 while (i-- != 0) { 93 if (o[i] > UINT16_MAX) 94 return -1; 95 ether_addr->addr_bytes[i * 2] = 96 (uint8_t)(o[i] >> 8); 97 ether_addr->addr_bytes[i * 2 + 1] = 98 (uint8_t)(o[i] & 0xff); 99 } 100 /* unknown format */ 101 } else 102 return -1; 103 104 return 0; 105 } 106 107 static int 108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac) 109 { 110 union PFID pfid; 111 int ret; 112 113 /* Use port MAC address as the vfid */ 114 ret = str_to_ether_addr(mac, &pfid.addr); 115 116 if (ret != 0) { 117 RTE_LOG(ERR, CHANNEL_MONITOR, 118 "Invalid mac address received in JSON\n"); 119 pkt->vfid[idx] = 0; 120 return -1; 121 } 122 123 printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":" 124 "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n", 125 pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1], 126 pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3], 127 pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]); 128 129 pkt->vfid[idx] = pfid.pfid; 130 return 0; 131 } 132 133 134 static int 135 parse_json_to_pkt(json_t *element, struct channel_packet *pkt) 136 { 137 const char *key; 138 json_t *value; 139 int ret; 140 141 memset(pkt, 0, sizeof(struct channel_packet)); 142 143 pkt->nb_mac_to_monitor = 0; 144 pkt->t_boost_status.tbEnabled = false; 145 pkt->workload = LOW; 146 pkt->policy_to_use = TIME; 147 pkt->command = PKT_POLICY; 148 pkt->core_type = CORE_TYPE_PHYSICAL; 149 150 json_object_foreach(element, key, value) { 151 if (!strcmp(key, "policy")) { 152 /* Recurse in to get the contents of profile */ 153 ret = parse_json_to_pkt(value, pkt); 154 if (ret) 155 return ret; 156 } else if (!strcmp(key, "instruction")) { 157 /* Recurse in to get the contents of instruction */ 158 ret = parse_json_to_pkt(value, pkt); 159 if (ret) 160 return ret; 161 } else if (!strcmp(key, "name")) { 162 strlcpy(pkt->vm_name, json_string_value(value), 163 sizeof(pkt->vm_name)); 164 } else if (!strcmp(key, "command")) { 165 char command[32]; 166 strlcpy(command, json_string_value(value), 32); 167 if (!strcmp(command, "power")) { 168 pkt->command = CPU_POWER; 169 } else if (!strcmp(command, "create")) { 170 pkt->command = PKT_POLICY; 171 } else if (!strcmp(command, "destroy")) { 172 pkt->command = PKT_POLICY_REMOVE; 173 } else { 174 RTE_LOG(ERR, CHANNEL_MONITOR, 175 "Invalid command received in JSON\n"); 176 return -1; 177 } 178 } else if (!strcmp(key, "policy_type")) { 179 char command[32]; 180 strlcpy(command, json_string_value(value), 32); 181 if (!strcmp(command, "TIME")) { 182 pkt->policy_to_use = TIME; 183 } else if (!strcmp(command, "TRAFFIC")) { 184 pkt->policy_to_use = TRAFFIC; 185 } else if (!strcmp(command, "WORKLOAD")) { 186 pkt->policy_to_use = WORKLOAD; 187 } else if (!strcmp(command, "BRANCH_RATIO")) { 188 pkt->policy_to_use = BRANCH_RATIO; 189 } else { 190 RTE_LOG(ERR, CHANNEL_MONITOR, 191 "Wrong policy_type received in JSON\n"); 192 return -1; 193 } 194 } else if (!strcmp(key, "workload")) { 195 char command[32]; 196 strlcpy(command, json_string_value(value), 32); 197 if (!strcmp(command, "HIGH")) { 198 pkt->workload = HIGH; 199 } else if (!strcmp(command, "MEDIUM")) { 200 pkt->workload = MEDIUM; 201 } else if (!strcmp(command, "LOW")) { 202 pkt->workload = LOW; 203 } else { 204 RTE_LOG(ERR, CHANNEL_MONITOR, 205 "Wrong workload received in JSON\n"); 206 return -1; 207 } 208 } else if (!strcmp(key, "busy_hours")) { 209 unsigned int i; 210 size_t size = json_array_size(value); 211 212 for (i = 0; i < size; i++) { 213 int hour = (int)json_integer_value( 214 json_array_get(value, i)); 215 pkt->timer_policy.busy_hours[i] = hour; 216 } 217 } else if (!strcmp(key, "quiet_hours")) { 218 unsigned int i; 219 size_t size = json_array_size(value); 220 221 for (i = 0; i < size; i++) { 222 int hour = (int)json_integer_value( 223 json_array_get(value, i)); 224 pkt->timer_policy.quiet_hours[i] = hour; 225 } 226 } else if (!strcmp(key, "core_list")) { 227 unsigned int i; 228 size_t size = json_array_size(value); 229 230 for (i = 0; i < size; i++) { 231 int core = (int)json_integer_value( 232 json_array_get(value, i)); 233 pkt->vcpu_to_control[i] = core; 234 } 235 pkt->num_vcpu = size; 236 } else if (!strcmp(key, "mac_list")) { 237 unsigned int i; 238 size_t size = json_array_size(value); 239 240 for (i = 0; i < size; i++) { 241 char mac[32]; 242 strlcpy(mac, 243 json_string_value(json_array_get(value, i)), 244 32); 245 set_policy_mac(pkt, i, mac); 246 } 247 pkt->nb_mac_to_monitor = size; 248 } else if (!strcmp(key, "avg_packet_thresh")) { 249 pkt->traffic_policy.avg_max_packet_thresh = 250 (uint32_t)json_integer_value(value); 251 } else if (!strcmp(key, "max_packet_thresh")) { 252 pkt->traffic_policy.max_max_packet_thresh = 253 (uint32_t)json_integer_value(value); 254 } else if (!strcmp(key, "unit")) { 255 char unit[32]; 256 strlcpy(unit, json_string_value(value), 32); 257 if (!strcmp(unit, "SCALE_UP")) { 258 pkt->unit = CPU_POWER_SCALE_UP; 259 } else if (!strcmp(unit, "SCALE_DOWN")) { 260 pkt->unit = CPU_POWER_SCALE_DOWN; 261 } else if (!strcmp(unit, "SCALE_MAX")) { 262 pkt->unit = CPU_POWER_SCALE_MAX; 263 } else if (!strcmp(unit, "SCALE_MIN")) { 264 pkt->unit = CPU_POWER_SCALE_MIN; 265 } else if (!strcmp(unit, "ENABLE_TURBO")) { 266 pkt->unit = CPU_POWER_ENABLE_TURBO; 267 } else if (!strcmp(unit, "DISABLE_TURBO")) { 268 pkt->unit = CPU_POWER_DISABLE_TURBO; 269 } else { 270 RTE_LOG(ERR, CHANNEL_MONITOR, 271 "Invalid command received in JSON\n"); 272 return -1; 273 } 274 } else if (!strcmp(key, "resource_id")) { 275 pkt->resource_id = (uint32_t)json_integer_value(value); 276 } else { 277 RTE_LOG(ERR, CHANNEL_MONITOR, 278 "Unknown key received in JSON string: %s\n", 279 key); 280 } 281 } 282 return 0; 283 } 284 #endif 285 286 void channel_monitor_exit(void) 287 { 288 run_loop = 0; 289 rte_free(global_events_list); 290 } 291 292 static void 293 core_share(int pNo, int z, int x, int t) 294 { 295 if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) { 296 if (strcmp(policies[pNo].pkt.vm_name, 297 lvm_info[x].vm_name) != 0) { 298 policies[pNo].core_share[z].status = 1; 299 power_manager_scale_core_max( 300 policies[pNo].core_share[z].pcpu); 301 } 302 } 303 } 304 305 static void 306 core_share_status(int pNo) 307 { 308 309 int noVms = 0, noVcpus = 0, z, x, t; 310 311 get_all_vm(&noVms, &noVcpus); 312 313 /* Reset Core Share Status. */ 314 for (z = 0; z < noVcpus; z++) 315 policies[pNo].core_share[z].status = 0; 316 317 /* Foreach vcpu in a policy. */ 318 for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) { 319 /* Foreach VM on the platform. */ 320 for (x = 0; x < noVms; x++) { 321 /* Foreach vcpu of VMs on platform. */ 322 for (t = 0; t < lvm_info[x].num_cpus; t++) 323 core_share(pNo, z, x, t); 324 } 325 } 326 } 327 328 329 static int 330 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count) 331 { 332 int ret = 0; 333 334 if (pol->pkt.policy_to_use == BRANCH_RATIO) { 335 ci->cd[pcpu].oob_enabled = 1; 336 ret = add_core_to_monitor(pcpu); 337 if (ret == 0) 338 RTE_LOG(INFO, CHANNEL_MONITOR, 339 "Monitoring pcpu %d OOB for %s\n", 340 pcpu, pol->pkt.vm_name); 341 else 342 RTE_LOG(ERR, CHANNEL_MONITOR, 343 "Error monitoring pcpu %d OOB for %s\n", 344 pcpu, pol->pkt.vm_name); 345 346 } else { 347 pol->core_share[count].pcpu = pcpu; 348 RTE_LOG(INFO, CHANNEL_MONITOR, 349 "Monitoring pcpu %d for %s\n", 350 pcpu, pol->pkt.vm_name); 351 } 352 return ret; 353 } 354 355 static void 356 get_pcpu_to_control(struct policy *pol) 357 { 358 359 /* Convert vcpu to pcpu. */ 360 struct vm_info info; 361 int pcpu, count; 362 struct core_info *ci; 363 364 ci = get_core_info(); 365 366 RTE_LOG(DEBUG, CHANNEL_MONITOR, 367 "Looking for pcpu for %s\n", pol->pkt.vm_name); 368 369 /* 370 * So now that we're handling virtual and physical cores, we need to 371 * differenciate between them when adding them to the branch monitor. 372 * Virtual cores need to be converted to physical cores. 373 */ 374 if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) { 375 /* 376 * If the cores in the policy are virtual, we need to map them 377 * to physical core. We look up the vm info and use that for 378 * the mapping. 379 */ 380 get_info_vm(pol->pkt.vm_name, &info); 381 for (count = 0; count < pol->pkt.num_vcpu; count++) { 382 pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]]; 383 pcpu_monitor(pol, ci, pcpu, count); 384 } 385 } else { 386 /* 387 * If the cores in the policy are physical, we just use 388 * those core id's directly. 389 */ 390 for (count = 0; count < pol->pkt.num_vcpu; count++) { 391 pcpu = pol->pkt.vcpu_to_control[count]; 392 pcpu_monitor(pol, ci, pcpu, count); 393 } 394 } 395 } 396 397 static int 398 get_pfid(struct policy *pol) 399 { 400 401 int i, x, ret = 0; 402 403 for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) { 404 405 RTE_ETH_FOREACH_DEV(x) { 406 ret = rte_pmd_i40e_query_vfid_by_mac(x, 407 (struct rte_ether_addr *)&(pol->pkt.vfid[i])); 408 if (ret != -EINVAL) { 409 pol->port[i] = x; 410 break; 411 } 412 } 413 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) { 414 RTE_LOG(INFO, CHANNEL_MONITOR, 415 "Error with Policy. MAC not found on " 416 "attached ports "); 417 pol->enabled = 0; 418 return ret; 419 } 420 pol->pfid[i] = ret; 421 } 422 return 1; 423 } 424 425 static int 426 update_policy(struct channel_packet *pkt) 427 { 428 429 unsigned int updated = 0; 430 int i; 431 432 433 RTE_LOG(INFO, CHANNEL_MONITOR, 434 "Applying policy for %s\n", pkt->vm_name); 435 436 for (i = 0; i < MAX_CLIENTS; i++) { 437 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 438 /* Copy the contents of *pkt into the policy.pkt */ 439 policies[i].pkt = *pkt; 440 get_pcpu_to_control(&policies[i]); 441 /* Check Eth dev only for Traffic policy */ 442 if (policies[i].pkt.policy_to_use == TRAFFIC) { 443 if (get_pfid(&policies[i]) < 0) { 444 updated = 1; 445 break; 446 } 447 } 448 core_share_status(i); 449 policies[i].enabled = 1; 450 updated = 1; 451 } 452 } 453 if (!updated) { 454 for (i = 0; i < MAX_CLIENTS; i++) { 455 if (policies[i].enabled == 0) { 456 policies[i].pkt = *pkt; 457 get_pcpu_to_control(&policies[i]); 458 /* Check Eth dev only for Traffic policy */ 459 if (policies[i].pkt.policy_to_use == TRAFFIC) { 460 if (get_pfid(&policies[i]) < 0) { 461 updated = 1; 462 break; 463 } 464 } 465 core_share_status(i); 466 policies[i].enabled = 1; 467 break; 468 } 469 } 470 } 471 return 0; 472 } 473 474 static int 475 remove_policy(struct channel_packet *pkt __rte_unused) 476 { 477 int i; 478 479 /* 480 * Disabling the policy is simply a case of setting 481 * enabled to 0 482 */ 483 for (i = 0; i < MAX_CLIENTS; i++) { 484 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 485 policies[i].enabled = 0; 486 return 0; 487 } 488 } 489 return -1; 490 } 491 492 static uint64_t 493 get_pkt_diff(struct policy *pol) 494 { 495 496 uint64_t vsi_pkt_count, 497 vsi_pkt_total = 0, 498 vsi_pkt_count_prev_total = 0; 499 double rdtsc_curr, rdtsc_diff, diff; 500 int x; 501 struct rte_eth_stats vf_stats; 502 503 for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) { 504 505 /*Read vsi stats*/ 506 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0) 507 vsi_pkt_count = vf_stats.ipackets; 508 else 509 vsi_pkt_count = -1; 510 511 vsi_pkt_total += vsi_pkt_count; 512 513 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]]; 514 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count; 515 } 516 517 rdtsc_curr = rte_rdtsc_precise(); 518 rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]]; 519 rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr; 520 521 diff = (vsi_pkt_total - vsi_pkt_count_prev_total) * 522 ((double)rte_get_tsc_hz() / rdtsc_diff); 523 524 return diff; 525 } 526 527 static void 528 apply_traffic_profile(struct policy *pol) 529 { 530 531 int count; 532 uint64_t diff = 0; 533 534 diff = get_pkt_diff(pol); 535 536 if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) { 537 for (count = 0; count < pol->pkt.num_vcpu; count++) { 538 if (pol->core_share[count].status != 1) 539 power_manager_scale_core_max( 540 pol->core_share[count].pcpu); 541 } 542 } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 543 for (count = 0; count < pol->pkt.num_vcpu; count++) { 544 if (pol->core_share[count].status != 1) 545 power_manager_scale_core_med( 546 pol->core_share[count].pcpu); 547 } 548 } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 549 for (count = 0; count < pol->pkt.num_vcpu; count++) { 550 if (pol->core_share[count].status != 1) 551 power_manager_scale_core_min( 552 pol->core_share[count].pcpu); 553 } 554 } 555 } 556 557 static void 558 apply_time_profile(struct policy *pol) 559 { 560 561 int count, x; 562 struct timeval tv; 563 struct tm *ptm; 564 char time_string[40]; 565 566 /* Obtain the time of day, and convert it to a tm struct. */ 567 gettimeofday(&tv, NULL); 568 ptm = localtime(&tv.tv_sec); 569 /* Format the date and time, down to a single second. */ 570 strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm); 571 572 for (x = 0; x < HOURS; x++) { 573 574 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) { 575 for (count = 0; count < pol->pkt.num_vcpu; count++) { 576 if (pol->core_share[count].status != 1) { 577 power_manager_scale_core_max( 578 pol->core_share[count].pcpu); 579 } 580 } 581 break; 582 } else if (ptm->tm_hour == 583 pol->pkt.timer_policy.quiet_hours[x]) { 584 for (count = 0; count < pol->pkt.num_vcpu; count++) { 585 if (pol->core_share[count].status != 1) { 586 power_manager_scale_core_min( 587 pol->core_share[count].pcpu); 588 } 589 } 590 break; 591 } else if (ptm->tm_hour == 592 pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) { 593 apply_traffic_profile(pol); 594 break; 595 } 596 } 597 } 598 599 static void 600 apply_workload_profile(struct policy *pol) 601 { 602 603 int count; 604 605 if (pol->pkt.workload == HIGH) { 606 for (count = 0; count < pol->pkt.num_vcpu; count++) { 607 if (pol->core_share[count].status != 1) 608 power_manager_scale_core_max( 609 pol->core_share[count].pcpu); 610 } 611 } else if (pol->pkt.workload == MEDIUM) { 612 for (count = 0; count < pol->pkt.num_vcpu; count++) { 613 if (pol->core_share[count].status != 1) 614 power_manager_scale_core_med( 615 pol->core_share[count].pcpu); 616 } 617 } else if (pol->pkt.workload == LOW) { 618 for (count = 0; count < pol->pkt.num_vcpu; count++) { 619 if (pol->core_share[count].status != 1) 620 power_manager_scale_core_min( 621 pol->core_share[count].pcpu); 622 } 623 } 624 } 625 626 static void 627 apply_policy(struct policy *pol) 628 { 629 630 struct channel_packet *pkt = &pol->pkt; 631 632 /*Check policy to use*/ 633 if (pkt->policy_to_use == TRAFFIC) 634 apply_traffic_profile(pol); 635 else if (pkt->policy_to_use == TIME) 636 apply_time_profile(pol); 637 else if (pkt->policy_to_use == WORKLOAD) 638 apply_workload_profile(pol); 639 } 640 641 static int 642 process_request(struct channel_packet *pkt, struct channel_info *chan_info) 643 { 644 int ret; 645 646 if (chan_info == NULL) 647 return -1; 648 649 if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED, 650 CHANNEL_MGR_CHANNEL_PROCESSING) == 0) 651 return -1; 652 653 if (pkt->command == CPU_POWER) { 654 unsigned int core_num; 655 656 if (pkt->core_type == CORE_TYPE_VIRTUAL) 657 core_num = get_pcpu(chan_info, pkt->resource_id); 658 else 659 core_num = pkt->resource_id; 660 661 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n", 662 core_num); 663 664 switch (pkt->unit) { 665 case(CPU_POWER_SCALE_MIN): 666 power_manager_scale_core_min(core_num); 667 break; 668 case(CPU_POWER_SCALE_MAX): 669 power_manager_scale_core_max(core_num); 670 break; 671 case(CPU_POWER_SCALE_DOWN): 672 power_manager_scale_core_down(core_num); 673 break; 674 case(CPU_POWER_SCALE_UP): 675 power_manager_scale_core_up(core_num); 676 break; 677 case(CPU_POWER_ENABLE_TURBO): 678 power_manager_enable_turbo_core(core_num); 679 break; 680 case(CPU_POWER_DISABLE_TURBO): 681 power_manager_disable_turbo_core(core_num); 682 break; 683 default: 684 break; 685 } 686 } 687 688 if (pkt->command == PKT_POLICY) { 689 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n", 690 pkt->vm_name); 691 update_policy(pkt); 692 policy_is_set = 1; 693 } 694 695 if (pkt->command == PKT_POLICY_REMOVE) { 696 ret = remove_policy(pkt); 697 if (ret == 0) 698 RTE_LOG(INFO, CHANNEL_MONITOR, 699 "Removed policy %s\n", pkt->vm_name); 700 else 701 RTE_LOG(INFO, CHANNEL_MONITOR, 702 "Policy %s does not exist\n", pkt->vm_name); 703 } 704 705 /* 706 * Return is not checked as channel status may have been set to DISABLED 707 * from management thread 708 */ 709 rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING, 710 CHANNEL_MGR_CHANNEL_CONNECTED); 711 return 0; 712 713 } 714 715 int 716 add_channel_to_monitor(struct channel_info **chan_info) 717 { 718 struct channel_info *info = *chan_info; 719 struct epoll_event event; 720 721 event.events = EPOLLIN; 722 event.data.ptr = info; 723 if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) { 724 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' " 725 "to epoll\n", info->channel_path); 726 return -1; 727 } 728 RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' " 729 "to monitor\n", info->channel_path); 730 return 0; 731 } 732 733 int 734 remove_channel_from_monitor(struct channel_info *chan_info) 735 { 736 if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL, 737 chan_info->fd, NULL) < 0) { 738 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' " 739 "from epoll\n", chan_info->channel_path); 740 return -1; 741 } 742 return 0; 743 } 744 745 int 746 channel_monitor_init(void) 747 { 748 global_event_fd = epoll_create1(0); 749 if (global_event_fd == 0) { 750 RTE_LOG(ERR, CHANNEL_MONITOR, 751 "Error creating epoll context with error %s\n", 752 strerror(errno)); 753 return -1; 754 } 755 global_events_list = rte_malloc("epoll_events", 756 sizeof(*global_events_list) 757 * MAX_EVENTS, RTE_CACHE_LINE_SIZE); 758 if (global_events_list == NULL) { 759 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for " 760 "epoll events\n"); 761 return -1; 762 } 763 return 0; 764 } 765 766 static void 767 read_binary_packet(struct channel_info *chan_info) 768 { 769 struct channel_packet pkt; 770 void *buffer = &pkt; 771 int buffer_len = sizeof(pkt); 772 int n_bytes, err = 0; 773 774 while (buffer_len > 0) { 775 n_bytes = read(chan_info->fd, 776 buffer, buffer_len); 777 if (n_bytes == buffer_len) 778 break; 779 if (n_bytes < 0) { 780 err = errno; 781 RTE_LOG(DEBUG, CHANNEL_MONITOR, 782 "Received error on " 783 "channel '%s' read: %s\n", 784 chan_info->channel_path, 785 strerror(err)); 786 remove_channel(&chan_info); 787 break; 788 } 789 buffer = (char *)buffer + n_bytes; 790 buffer_len -= n_bytes; 791 } 792 if (!err) 793 process_request(&pkt, chan_info); 794 } 795 796 #ifdef USE_JANSSON 797 static void 798 read_json_packet(struct channel_info *chan_info) 799 { 800 struct channel_packet pkt; 801 int n_bytes, ret; 802 json_t *root; 803 json_error_t error; 804 805 /* read opening brace to closing brace */ 806 do { 807 int idx = 0; 808 int indent = 0; 809 do { 810 n_bytes = read(chan_info->fd, &json_data[idx], 1); 811 if (n_bytes == 0) 812 break; 813 if (json_data[idx] == '{') 814 indent++; 815 if (json_data[idx] == '}') 816 indent--; 817 if ((indent > 0) || (idx > 0)) 818 idx++; 819 if (indent <= 0) 820 json_data[idx] = 0; 821 if (idx >= MAX_JSON_STRING_LEN-1) 822 break; 823 } while (indent > 0); 824 825 json_data[idx] = '\0'; 826 827 if (strlen(json_data) == 0) 828 continue; 829 830 printf("got [%s]\n", json_data); 831 832 root = json_loads(json_data, 0, &error); 833 834 if (root) { 835 /* 836 * Because our data is now in the json 837 * object, we can overwrite the pkt 838 * with a channel_packet struct, using 839 * parse_json_to_pkt() 840 */ 841 ret = parse_json_to_pkt(root, &pkt); 842 json_decref(root); 843 if (ret) { 844 RTE_LOG(ERR, CHANNEL_MONITOR, 845 "Error validating JSON profile data\n"); 846 break; 847 } 848 process_request(&pkt, chan_info); 849 } else { 850 RTE_LOG(ERR, CHANNEL_MONITOR, 851 "JSON error on line %d: %s\n", 852 error.line, error.text); 853 } 854 } while (n_bytes > 0); 855 } 856 #endif 857 858 void 859 run_channel_monitor(void) 860 { 861 while (run_loop) { 862 int n_events, i; 863 864 n_events = epoll_wait(global_event_fd, global_events_list, 865 MAX_EVENTS, 1); 866 if (!run_loop) 867 break; 868 for (i = 0; i < n_events; i++) { 869 struct channel_info *chan_info = (struct channel_info *) 870 global_events_list[i].data.ptr; 871 if ((global_events_list[i].events & EPOLLERR) || 872 (global_events_list[i].events & EPOLLHUP)) { 873 RTE_LOG(INFO, CHANNEL_MONITOR, 874 "Remote closed connection for " 875 "channel '%s'\n", 876 chan_info->channel_path); 877 remove_channel(&chan_info); 878 continue; 879 } 880 if (global_events_list[i].events & EPOLLIN) { 881 882 switch (chan_info->type) { 883 case CHANNEL_TYPE_BINARY: 884 read_binary_packet(chan_info); 885 break; 886 #ifdef USE_JANSSON 887 case CHANNEL_TYPE_JSON: 888 read_json_packet(chan_info); 889 break; 890 #endif 891 default: 892 break; 893 } 894 } 895 } 896 rte_delay_us(time_period_ms*1000); 897 if (policy_is_set) { 898 int j; 899 900 for (j = 0; j < MAX_CLIENTS; j++) { 901 if (policies[j].enabled == 1) 902 apply_policy(&policies[j]); 903 } 904 } 905 } 906 } 907