1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 */ 4 5 #include <unistd.h> 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include <stdint.h> 9 #include <signal.h> 10 #include <errno.h> 11 #include <string.h> 12 #include <fcntl.h> 13 #include <sys/types.h> 14 #include <sys/epoll.h> 15 #include <sys/queue.h> 16 #include <sys/time.h> 17 #include <sys/socket.h> 18 #include <sys/select.h> 19 #ifdef USE_JANSSON 20 #include <jansson.h> 21 #else 22 #pragma message "Jansson dev libs unavailable, not including JSON parsing" 23 #endif 24 #include <rte_string_fns.h> 25 #include <rte_log.h> 26 #include <rte_memory.h> 27 #include <rte_malloc.h> 28 #include <rte_atomic.h> 29 #include <rte_cycles.h> 30 #include <rte_ethdev.h> 31 #include <rte_pmd_i40e.h> 32 33 #include <libvirt/libvirt.h> 34 #include "channel_monitor.h" 35 #include "channel_commands.h" 36 #include "channel_manager.h" 37 #include "power_manager.h" 38 #include "oob_monitor.h" 39 40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1 41 42 #define MAX_EVENTS 256 43 44 uint64_t vsi_pkt_count_prev[384]; 45 uint64_t rdtsc_prev[384]; 46 #define MAX_JSON_STRING_LEN 1024 47 char json_data[MAX_JSON_STRING_LEN]; 48 49 double time_period_ms = 1; 50 static volatile unsigned run_loop = 1; 51 static int global_event_fd; 52 static unsigned int policy_is_set; 53 static struct epoll_event *global_events_list; 54 static struct policy policies[MAX_CLIENTS]; 55 56 #ifdef USE_JANSSON 57 58 union PFID { 59 struct ether_addr addr; 60 uint64_t pfid; 61 }; 62 63 static int 64 str_to_ether_addr(const char *a, struct ether_addr *ether_addr) 65 { 66 int i; 67 char *end; 68 unsigned long o[ETHER_ADDR_LEN]; 69 70 i = 0; 71 do { 72 errno = 0; 73 o[i] = strtoul(a, &end, 16); 74 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0)) 75 return -1; 76 a = end + 1; 77 } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0); 78 79 /* Junk at the end of line */ 80 if (end[0] != 0) 81 return -1; 82 83 /* Support the format XX:XX:XX:XX:XX:XX */ 84 if (i == ETHER_ADDR_LEN) { 85 while (i-- != 0) { 86 if (o[i] > UINT8_MAX) 87 return -1; 88 ether_addr->addr_bytes[i] = (uint8_t)o[i]; 89 } 90 /* Support the format XXXX:XXXX:XXXX */ 91 } else if (i == ETHER_ADDR_LEN / 2) { 92 while (i-- != 0) { 93 if (o[i] > UINT16_MAX) 94 return -1; 95 ether_addr->addr_bytes[i * 2] = 96 (uint8_t)(o[i] >> 8); 97 ether_addr->addr_bytes[i * 2 + 1] = 98 (uint8_t)(o[i] & 0xff); 99 } 100 /* unknown format */ 101 } else 102 return -1; 103 104 return 0; 105 } 106 107 static int 108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac) 109 { 110 union PFID pfid; 111 int ret; 112 113 /* Use port MAC address as the vfid */ 114 ret = str_to_ether_addr(mac, &pfid.addr); 115 116 if (ret != 0) { 117 RTE_LOG(ERR, CHANNEL_MONITOR, 118 "Invalid mac address received in JSON\n"); 119 pkt->vfid[idx] = 0; 120 return -1; 121 } 122 123 printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":" 124 "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n", 125 pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1], 126 pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3], 127 pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]); 128 129 pkt->vfid[idx] = pfid.pfid; 130 return 0; 131 } 132 133 134 static int 135 parse_json_to_pkt(json_t *element, struct channel_packet *pkt) 136 { 137 const char *key; 138 json_t *value; 139 int ret; 140 141 memset(pkt, 0, sizeof(struct channel_packet)); 142 143 pkt->nb_mac_to_monitor = 0; 144 pkt->t_boost_status.tbEnabled = false; 145 pkt->workload = LOW; 146 pkt->policy_to_use = TIME; 147 pkt->command = PKT_POLICY; 148 pkt->core_type = CORE_TYPE_PHYSICAL; 149 150 json_object_foreach(element, key, value) { 151 if (!strcmp(key, "policy")) { 152 /* Recurse in to get the contents of profile */ 153 ret = parse_json_to_pkt(value, pkt); 154 if (ret) 155 return ret; 156 } else if (!strcmp(key, "instruction")) { 157 /* Recurse in to get the contents of instruction */ 158 ret = parse_json_to_pkt(value, pkt); 159 if (ret) 160 return ret; 161 } else if (!strcmp(key, "name")) { 162 strcpy(pkt->vm_name, json_string_value(value)); 163 } else if (!strcmp(key, "command")) { 164 char command[32]; 165 strlcpy(command, json_string_value(value), 32); 166 if (!strcmp(command, "power")) { 167 pkt->command = CPU_POWER; 168 } else if (!strcmp(command, "create")) { 169 pkt->command = PKT_POLICY; 170 } else if (!strcmp(command, "destroy")) { 171 pkt->command = PKT_POLICY_REMOVE; 172 } else { 173 RTE_LOG(ERR, CHANNEL_MONITOR, 174 "Invalid command received in JSON\n"); 175 return -1; 176 } 177 } else if (!strcmp(key, "policy_type")) { 178 char command[32]; 179 strlcpy(command, json_string_value(value), 32); 180 if (!strcmp(command, "TIME")) { 181 pkt->policy_to_use = TIME; 182 } else if (!strcmp(command, "TRAFFIC")) { 183 pkt->policy_to_use = TRAFFIC; 184 } else if (!strcmp(command, "WORKLOAD")) { 185 pkt->policy_to_use = WORKLOAD; 186 } else if (!strcmp(command, "BRANCH_RATIO")) { 187 pkt->policy_to_use = BRANCH_RATIO; 188 } else { 189 RTE_LOG(ERR, CHANNEL_MONITOR, 190 "Wrong policy_type received in JSON\n"); 191 return -1; 192 } 193 } else if (!strcmp(key, "workload")) { 194 char command[32]; 195 strlcpy(command, json_string_value(value), 32); 196 if (!strcmp(command, "HIGH")) { 197 pkt->workload = HIGH; 198 } else if (!strcmp(command, "MEDIUM")) { 199 pkt->workload = MEDIUM; 200 } else if (!strcmp(command, "LOW")) { 201 pkt->workload = LOW; 202 } else { 203 RTE_LOG(ERR, CHANNEL_MONITOR, 204 "Wrong workload received in JSON\n"); 205 return -1; 206 } 207 } else if (!strcmp(key, "busy_hours")) { 208 unsigned int i; 209 size_t size = json_array_size(value); 210 211 for (i = 0; i < size; i++) { 212 int hour = (int)json_integer_value( 213 json_array_get(value, i)); 214 pkt->timer_policy.busy_hours[i] = hour; 215 } 216 } else if (!strcmp(key, "quiet_hours")) { 217 unsigned int i; 218 size_t size = json_array_size(value); 219 220 for (i = 0; i < size; i++) { 221 int hour = (int)json_integer_value( 222 json_array_get(value, i)); 223 pkt->timer_policy.quiet_hours[i] = hour; 224 } 225 } else if (!strcmp(key, "core_list")) { 226 unsigned int i; 227 size_t size = json_array_size(value); 228 229 for (i = 0; i < size; i++) { 230 int core = (int)json_integer_value( 231 json_array_get(value, i)); 232 pkt->vcpu_to_control[i] = core; 233 } 234 pkt->num_vcpu = size; 235 } else if (!strcmp(key, "mac_list")) { 236 unsigned int i; 237 size_t size = json_array_size(value); 238 239 for (i = 0; i < size; i++) { 240 char mac[32]; 241 strlcpy(mac, 242 json_string_value(json_array_get(value, i)), 243 32); 244 set_policy_mac(pkt, i, mac); 245 } 246 pkt->nb_mac_to_monitor = size; 247 } else if (!strcmp(key, "avg_packet_thresh")) { 248 pkt->traffic_policy.avg_max_packet_thresh = 249 (uint32_t)json_integer_value(value); 250 } else if (!strcmp(key, "max_packet_thresh")) { 251 pkt->traffic_policy.max_max_packet_thresh = 252 (uint32_t)json_integer_value(value); 253 } else if (!strcmp(key, "unit")) { 254 char unit[32]; 255 strlcpy(unit, json_string_value(value), 32); 256 if (!strcmp(unit, "SCALE_UP")) { 257 pkt->unit = CPU_POWER_SCALE_UP; 258 } else if (!strcmp(unit, "SCALE_DOWN")) { 259 pkt->unit = CPU_POWER_SCALE_DOWN; 260 } else if (!strcmp(unit, "SCALE_MAX")) { 261 pkt->unit = CPU_POWER_SCALE_MAX; 262 } else if (!strcmp(unit, "SCALE_MIN")) { 263 pkt->unit = CPU_POWER_SCALE_MIN; 264 } else if (!strcmp(unit, "ENABLE_TURBO")) { 265 pkt->unit = CPU_POWER_ENABLE_TURBO; 266 } else if (!strcmp(unit, "DISABLE_TURBO")) { 267 pkt->unit = CPU_POWER_DISABLE_TURBO; 268 } else { 269 RTE_LOG(ERR, CHANNEL_MONITOR, 270 "Invalid command received in JSON\n"); 271 return -1; 272 } 273 } else if (!strcmp(key, "resource_id")) { 274 pkt->resource_id = (uint32_t)json_integer_value(value); 275 } else { 276 RTE_LOG(ERR, CHANNEL_MONITOR, 277 "Unknown key received in JSON string: %s\n", 278 key); 279 } 280 } 281 return 0; 282 } 283 #endif 284 285 void channel_monitor_exit(void) 286 { 287 run_loop = 0; 288 rte_free(global_events_list); 289 } 290 291 static void 292 core_share(int pNo, int z, int x, int t) 293 { 294 if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) { 295 if (strcmp(policies[pNo].pkt.vm_name, 296 lvm_info[x].vm_name) != 0) { 297 policies[pNo].core_share[z].status = 1; 298 power_manager_scale_core_max( 299 policies[pNo].core_share[z].pcpu); 300 } 301 } 302 } 303 304 static void 305 core_share_status(int pNo) 306 { 307 308 int noVms = 0, noVcpus = 0, z, x, t; 309 310 get_all_vm(&noVms, &noVcpus); 311 312 /* Reset Core Share Status. */ 313 for (z = 0; z < noVcpus; z++) 314 policies[pNo].core_share[z].status = 0; 315 316 /* Foreach vcpu in a policy. */ 317 for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) { 318 /* Foreach VM on the platform. */ 319 for (x = 0; x < noVms; x++) { 320 /* Foreach vcpu of VMs on platform. */ 321 for (t = 0; t < lvm_info[x].num_cpus; t++) 322 core_share(pNo, z, x, t); 323 } 324 } 325 } 326 327 328 static int 329 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count) 330 { 331 int ret = 0; 332 333 if (pol->pkt.policy_to_use == BRANCH_RATIO) { 334 ci->cd[pcpu].oob_enabled = 1; 335 ret = add_core_to_monitor(pcpu); 336 if (ret == 0) 337 RTE_LOG(INFO, CHANNEL_MONITOR, 338 "Monitoring pcpu %d OOB for %s\n", 339 pcpu, pol->pkt.vm_name); 340 else 341 RTE_LOG(ERR, CHANNEL_MONITOR, 342 "Error monitoring pcpu %d OOB for %s\n", 343 pcpu, pol->pkt.vm_name); 344 345 } else { 346 pol->core_share[count].pcpu = pcpu; 347 RTE_LOG(INFO, CHANNEL_MONITOR, 348 "Monitoring pcpu %d for %s\n", 349 pcpu, pol->pkt.vm_name); 350 } 351 return ret; 352 } 353 354 static void 355 get_pcpu_to_control(struct policy *pol) 356 { 357 358 /* Convert vcpu to pcpu. */ 359 struct vm_info info; 360 int pcpu, count; 361 struct core_info *ci; 362 363 ci = get_core_info(); 364 365 RTE_LOG(DEBUG, CHANNEL_MONITOR, 366 "Looking for pcpu for %s\n", pol->pkt.vm_name); 367 368 /* 369 * So now that we're handling virtual and physical cores, we need to 370 * differenciate between them when adding them to the branch monitor. 371 * Virtual cores need to be converted to physical cores. 372 */ 373 if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) { 374 /* 375 * If the cores in the policy are virtual, we need to map them 376 * to physical core. We look up the vm info and use that for 377 * the mapping. 378 */ 379 get_info_vm(pol->pkt.vm_name, &info); 380 for (count = 0; count < pol->pkt.num_vcpu; count++) { 381 pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]]; 382 pcpu_monitor(pol, ci, pcpu, count); 383 } 384 } else { 385 /* 386 * If the cores in the policy are physical, we just use 387 * those core id's directly. 388 */ 389 for (count = 0; count < pol->pkt.num_vcpu; count++) { 390 pcpu = pol->pkt.vcpu_to_control[count]; 391 pcpu_monitor(pol, ci, pcpu, count); 392 } 393 } 394 } 395 396 static int 397 get_pfid(struct policy *pol) 398 { 399 400 int i, x, ret = 0; 401 402 for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) { 403 404 RTE_ETH_FOREACH_DEV(x) { 405 ret = rte_pmd_i40e_query_vfid_by_mac(x, 406 (struct ether_addr *)&(pol->pkt.vfid[i])); 407 if (ret != -EINVAL) { 408 pol->port[i] = x; 409 break; 410 } 411 } 412 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) { 413 RTE_LOG(INFO, CHANNEL_MONITOR, 414 "Error with Policy. MAC not found on " 415 "attached ports "); 416 pol->enabled = 0; 417 return ret; 418 } 419 pol->pfid[i] = ret; 420 } 421 return 1; 422 } 423 424 static int 425 update_policy(struct channel_packet *pkt) 426 { 427 428 unsigned int updated = 0; 429 int i; 430 431 432 RTE_LOG(INFO, CHANNEL_MONITOR, 433 "Applying policy for %s\n", pkt->vm_name); 434 435 for (i = 0; i < MAX_CLIENTS; i++) { 436 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 437 /* Copy the contents of *pkt into the policy.pkt */ 438 policies[i].pkt = *pkt; 439 get_pcpu_to_control(&policies[i]); 440 if (get_pfid(&policies[i]) < 0) { 441 updated = 1; 442 break; 443 } 444 core_share_status(i); 445 policies[i].enabled = 1; 446 updated = 1; 447 } 448 } 449 if (!updated) { 450 for (i = 0; i < MAX_CLIENTS; i++) { 451 if (policies[i].enabled == 0) { 452 policies[i].pkt = *pkt; 453 get_pcpu_to_control(&policies[i]); 454 if (get_pfid(&policies[i]) < 0) 455 break; 456 core_share_status(i); 457 policies[i].enabled = 1; 458 break; 459 } 460 } 461 } 462 return 0; 463 } 464 465 static int 466 remove_policy(struct channel_packet *pkt __rte_unused) 467 { 468 int i; 469 470 /* 471 * Disabling the policy is simply a case of setting 472 * enabled to 0 473 */ 474 for (i = 0; i < MAX_CLIENTS; i++) { 475 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) { 476 policies[i].enabled = 0; 477 return 0; 478 } 479 } 480 return -1; 481 } 482 483 static uint64_t 484 get_pkt_diff(struct policy *pol) 485 { 486 487 uint64_t vsi_pkt_count, 488 vsi_pkt_total = 0, 489 vsi_pkt_count_prev_total = 0; 490 double rdtsc_curr, rdtsc_diff, diff; 491 int x; 492 struct rte_eth_stats vf_stats; 493 494 for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) { 495 496 /*Read vsi stats*/ 497 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0) 498 vsi_pkt_count = vf_stats.ipackets; 499 else 500 vsi_pkt_count = -1; 501 502 vsi_pkt_total += vsi_pkt_count; 503 504 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]]; 505 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count; 506 } 507 508 rdtsc_curr = rte_rdtsc_precise(); 509 rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]]; 510 rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr; 511 512 diff = (vsi_pkt_total - vsi_pkt_count_prev_total) * 513 ((double)rte_get_tsc_hz() / rdtsc_diff); 514 515 return diff; 516 } 517 518 static void 519 apply_traffic_profile(struct policy *pol) 520 { 521 522 int count; 523 uint64_t diff = 0; 524 525 diff = get_pkt_diff(pol); 526 527 if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) { 528 for (count = 0; count < pol->pkt.num_vcpu; count++) { 529 if (pol->core_share[count].status != 1) 530 power_manager_scale_core_max( 531 pol->core_share[count].pcpu); 532 } 533 } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 534 for (count = 0; count < pol->pkt.num_vcpu; count++) { 535 if (pol->core_share[count].status != 1) 536 power_manager_scale_core_med( 537 pol->core_share[count].pcpu); 538 } 539 } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) { 540 for (count = 0; count < pol->pkt.num_vcpu; count++) { 541 if (pol->core_share[count].status != 1) 542 power_manager_scale_core_min( 543 pol->core_share[count].pcpu); 544 } 545 } 546 } 547 548 static void 549 apply_time_profile(struct policy *pol) 550 { 551 552 int count, x; 553 struct timeval tv; 554 struct tm *ptm; 555 char time_string[40]; 556 557 /* Obtain the time of day, and convert it to a tm struct. */ 558 gettimeofday(&tv, NULL); 559 ptm = localtime(&tv.tv_sec); 560 /* Format the date and time, down to a single second. */ 561 strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm); 562 563 for (x = 0; x < HOURS; x++) { 564 565 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) { 566 for (count = 0; count < pol->pkt.num_vcpu; count++) { 567 if (pol->core_share[count].status != 1) { 568 power_manager_scale_core_max( 569 pol->core_share[count].pcpu); 570 } 571 } 572 break; 573 } else if (ptm->tm_hour == 574 pol->pkt.timer_policy.quiet_hours[x]) { 575 for (count = 0; count < pol->pkt.num_vcpu; count++) { 576 if (pol->core_share[count].status != 1) { 577 power_manager_scale_core_min( 578 pol->core_share[count].pcpu); 579 } 580 } 581 break; 582 } else if (ptm->tm_hour == 583 pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) { 584 apply_traffic_profile(pol); 585 break; 586 } 587 } 588 } 589 590 static void 591 apply_workload_profile(struct policy *pol) 592 { 593 594 int count; 595 596 if (pol->pkt.workload == HIGH) { 597 for (count = 0; count < pol->pkt.num_vcpu; count++) { 598 if (pol->core_share[count].status != 1) 599 power_manager_scale_core_max( 600 pol->core_share[count].pcpu); 601 } 602 } else if (pol->pkt.workload == MEDIUM) { 603 for (count = 0; count < pol->pkt.num_vcpu; count++) { 604 if (pol->core_share[count].status != 1) 605 power_manager_scale_core_med( 606 pol->core_share[count].pcpu); 607 } 608 } else if (pol->pkt.workload == LOW) { 609 for (count = 0; count < pol->pkt.num_vcpu; count++) { 610 if (pol->core_share[count].status != 1) 611 power_manager_scale_core_min( 612 pol->core_share[count].pcpu); 613 } 614 } 615 } 616 617 static void 618 apply_policy(struct policy *pol) 619 { 620 621 struct channel_packet *pkt = &pol->pkt; 622 623 /*Check policy to use*/ 624 if (pkt->policy_to_use == TRAFFIC) 625 apply_traffic_profile(pol); 626 else if (pkt->policy_to_use == TIME) 627 apply_time_profile(pol); 628 else if (pkt->policy_to_use == WORKLOAD) 629 apply_workload_profile(pol); 630 } 631 632 static int 633 process_request(struct channel_packet *pkt, struct channel_info *chan_info) 634 { 635 int ret; 636 637 if (chan_info == NULL) 638 return -1; 639 640 if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED, 641 CHANNEL_MGR_CHANNEL_PROCESSING) == 0) 642 return -1; 643 644 if (pkt->command == CPU_POWER) { 645 unsigned int core_num; 646 647 if (pkt->core_type == CORE_TYPE_VIRTUAL) 648 core_num = get_pcpu(chan_info, pkt->resource_id); 649 else 650 core_num = pkt->resource_id; 651 652 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n", 653 core_num); 654 655 switch (pkt->unit) { 656 case(CPU_POWER_SCALE_MIN): 657 power_manager_scale_core_min(core_num); 658 break; 659 case(CPU_POWER_SCALE_MAX): 660 power_manager_scale_core_max(core_num); 661 break; 662 case(CPU_POWER_SCALE_DOWN): 663 power_manager_scale_core_down(core_num); 664 break; 665 case(CPU_POWER_SCALE_UP): 666 power_manager_scale_core_up(core_num); 667 break; 668 case(CPU_POWER_ENABLE_TURBO): 669 power_manager_enable_turbo_core(core_num); 670 break; 671 case(CPU_POWER_DISABLE_TURBO): 672 power_manager_disable_turbo_core(core_num); 673 break; 674 default: 675 break; 676 } 677 } 678 679 if (pkt->command == PKT_POLICY) { 680 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n", 681 pkt->vm_name); 682 update_policy(pkt); 683 policy_is_set = 1; 684 } 685 686 if (pkt->command == PKT_POLICY_REMOVE) { 687 ret = remove_policy(pkt); 688 if (ret == 0) 689 RTE_LOG(INFO, CHANNEL_MONITOR, 690 "Removed policy %s\n", pkt->vm_name); 691 else 692 RTE_LOG(INFO, CHANNEL_MONITOR, 693 "Policy %s does not exist\n", pkt->vm_name); 694 } 695 696 /* 697 * Return is not checked as channel status may have been set to DISABLED 698 * from management thread 699 */ 700 rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING, 701 CHANNEL_MGR_CHANNEL_CONNECTED); 702 return 0; 703 704 } 705 706 int 707 add_channel_to_monitor(struct channel_info **chan_info) 708 { 709 struct channel_info *info = *chan_info; 710 struct epoll_event event; 711 712 event.events = EPOLLIN; 713 event.data.ptr = info; 714 if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) { 715 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' " 716 "to epoll\n", info->channel_path); 717 return -1; 718 } 719 RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' " 720 "to monitor\n", info->channel_path); 721 return 0; 722 } 723 724 int 725 remove_channel_from_monitor(struct channel_info *chan_info) 726 { 727 if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL, 728 chan_info->fd, NULL) < 0) { 729 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' " 730 "from epoll\n", chan_info->channel_path); 731 return -1; 732 } 733 return 0; 734 } 735 736 int 737 channel_monitor_init(void) 738 { 739 global_event_fd = epoll_create1(0); 740 if (global_event_fd == 0) { 741 RTE_LOG(ERR, CHANNEL_MONITOR, 742 "Error creating epoll context with error %s\n", 743 strerror(errno)); 744 return -1; 745 } 746 global_events_list = rte_malloc("epoll_events", 747 sizeof(*global_events_list) 748 * MAX_EVENTS, RTE_CACHE_LINE_SIZE); 749 if (global_events_list == NULL) { 750 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for " 751 "epoll events\n"); 752 return -1; 753 } 754 return 0; 755 } 756 757 static void 758 read_binary_packet(struct channel_info *chan_info) 759 { 760 struct channel_packet pkt; 761 void *buffer = &pkt; 762 int buffer_len = sizeof(pkt); 763 int n_bytes, err = 0; 764 765 while (buffer_len > 0) { 766 n_bytes = read(chan_info->fd, 767 buffer, buffer_len); 768 if (n_bytes == buffer_len) 769 break; 770 if (n_bytes < 0) { 771 err = errno; 772 RTE_LOG(DEBUG, CHANNEL_MONITOR, 773 "Received error on " 774 "channel '%s' read: %s\n", 775 chan_info->channel_path, 776 strerror(err)); 777 remove_channel(&chan_info); 778 break; 779 } 780 buffer = (char *)buffer + n_bytes; 781 buffer_len -= n_bytes; 782 } 783 if (!err) 784 process_request(&pkt, chan_info); 785 } 786 787 #ifdef USE_JANSSON 788 static void 789 read_json_packet(struct channel_info *chan_info) 790 { 791 struct channel_packet pkt; 792 int n_bytes, ret; 793 json_t *root; 794 json_error_t error; 795 796 /* read opening brace to closing brace */ 797 do { 798 int idx = 0; 799 int indent = 0; 800 do { 801 n_bytes = read(chan_info->fd, &json_data[idx], 1); 802 if (n_bytes == 0) 803 break; 804 if (json_data[idx] == '{') 805 indent++; 806 if (json_data[idx] == '}') 807 indent--; 808 if ((indent > 0) || (idx > 0)) 809 idx++; 810 if (indent == 0) 811 json_data[idx] = 0; 812 if (idx >= MAX_JSON_STRING_LEN-1) 813 break; 814 } while (indent > 0); 815 816 if (indent > 0) 817 /* 818 * We've broken out of the read loop without getting 819 * a closing brace, so throw away the data 820 */ 821 json_data[idx] = 0; 822 823 if (strlen(json_data) == 0) 824 continue; 825 826 printf("got [%s]\n", json_data); 827 828 root = json_loads(json_data, 0, &error); 829 830 if (root) { 831 /* 832 * Because our data is now in the json 833 * object, we can overwrite the pkt 834 * with a channel_packet struct, using 835 * parse_json_to_pkt() 836 */ 837 ret = parse_json_to_pkt(root, &pkt); 838 json_decref(root); 839 if (ret) { 840 RTE_LOG(ERR, CHANNEL_MONITOR, 841 "Error validating JSON profile data\n"); 842 break; 843 } 844 process_request(&pkt, chan_info); 845 } else { 846 RTE_LOG(ERR, CHANNEL_MONITOR, 847 "JSON error on line %d: %s\n", 848 error.line, error.text); 849 } 850 } while (n_bytes > 0); 851 } 852 #endif 853 854 void 855 run_channel_monitor(void) 856 { 857 while (run_loop) { 858 int n_events, i; 859 860 n_events = epoll_wait(global_event_fd, global_events_list, 861 MAX_EVENTS, 1); 862 if (!run_loop) 863 break; 864 for (i = 0; i < n_events; i++) { 865 struct channel_info *chan_info = (struct channel_info *) 866 global_events_list[i].data.ptr; 867 if ((global_events_list[i].events & EPOLLERR) || 868 (global_events_list[i].events & EPOLLHUP)) { 869 RTE_LOG(INFO, CHANNEL_MONITOR, 870 "Remote closed connection for " 871 "channel '%s'\n", 872 chan_info->channel_path); 873 remove_channel(&chan_info); 874 continue; 875 } 876 if (global_events_list[i].events & EPOLLIN) { 877 878 switch (chan_info->type) { 879 case CHANNEL_TYPE_BINARY: 880 read_binary_packet(chan_info); 881 break; 882 #ifdef USE_JANSSON 883 case CHANNEL_TYPE_JSON: 884 read_json_packet(chan_info); 885 break; 886 #endif 887 default: 888 break; 889 } 890 } 891 } 892 rte_delay_us(time_period_ms*1000); 893 if (policy_is_set) { 894 int j; 895 896 for (j = 0; j < MAX_CLIENTS; j++) { 897 if (policies[j].enabled == 1) 898 apply_policy(&policies[j]); 899 } 900 } 901 } 902 } 903