1 /* Copyright (C) 2021-2024 Free Software Foundation, Inc. 2 Contributed by Oracle. 3 4 This file is part of GNU Binutils. 5 6 This program is free software; you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation; either version 3, or (at your option) 9 any later version. 10 11 This program is distributed in the hope that it will be useful, 12 but WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 GNU General Public License for more details. 15 16 You should have received a copy of the GNU General Public License 17 along with this program; if not, write to the Free Software 18 Foundation, 51 Franklin Street - Fifth Floor, Boston, 19 MA 02110-1301, USA. */ 20 21 #include "config.h" 22 #include <stdio.h> 23 #include <stdlib.h> 24 #include <signal.h> 25 #include <unistd.h> 26 #include <iostream> 27 #include <iomanip> 28 #include <sstream> 29 #include <queue> 30 #include "vec.h" 31 #include "util.h" 32 #include "ipcio.h" 33 #include "DbeThread.h" 34 #include "Experiment.h" 35 36 #define ipc_trace if (ipc_flags) ipc_default_log 37 #define ipc_request_trace if (ipc_flags) ipc_request_log 38 #define ipc_response_trace if (ipc_flags) ipc_response_log 39 40 using namespace std; 41 42 // IPC implementation 43 static const int L_PROGRESS = 0; 44 static const int L_INTEGER = 1; 45 static const int L_BOOLEAN = 2; 46 static const int L_LONG = 3; 47 static const int L_STRING = 4; 48 static const int L_DOUBLE = 5; 49 static const int L_ARRAY = 6; 50 static const int L_OBJECT = 7; 51 static const int L_CHAR = 8; 52 53 int currentRequestID; 54 int currentChannelID; 55 IPCresponse *IPCresponseGlobal; 56 57 BufferPool *responseBufferPool; 58 59 IPCrequest::IPCrequest (int sz, int reqID, int chID) 60 { 61 size = sz; 62 requestID = reqID; 63 channelID = chID; 64 status = INITIALIZED; 65 idx = 0; 66 buf = (char *) malloc (size); 67 cancelImmediate = false; 68 } 69 70 IPCrequest::~IPCrequest () 71 { 72 free (buf); 73 } 74 75 void 76 IPCrequest::read (void) 77 { 78 for (int i = 0; i < size; i++) 79 { 80 int c = getc (stdin); 81 ipc_request_trace (TRACE_LVL_4, " IPCrequest:getc(stdin): %02x\n", c); 82 buf[i] = c; 83 } 84 } 85 86 IPCrequestStatus 87 IPCrequest::getStatus (void) 88 { 89 return status; 90 } 91 92 void 93 IPCrequest::setStatus (IPCrequestStatus newStatus) 94 { 95 status = newStatus; 96 } 97 98 static int 99 readByte (IPCrequest* req) 100 { 101 int c; 102 int val = 0; 103 for (int i = 0; i < 2; i++) 104 { 105 if (req == NULL) 106 { 107 c = getc (stdin); 108 ipc_request_trace (TRACE_LVL_4, " readByte:getc(stdin): %02x\n", c); 109 } 110 else 111 c = req->rgetc (); 112 switch (c) 113 { 114 case '0': case '1': case '2': case '3': 115 case '4': case '5': case '6': case '7': 116 case '8': case '9': 117 val = val * 16 + c - '0'; 118 break; 119 case 'a': case 'b': case 'c': case 'd': case 'e': case 'f': 120 val = val * 16 + c - 'a' + 10; 121 break; 122 case EOF: 123 val = EOF; 124 break; 125 default: 126 fprintf (stderr, "readByte: Unknown byte: %d\n", c); 127 break; 128 } 129 } 130 return val; 131 } 132 133 static int 134 readIVal (IPCrequest *req) 135 { 136 int val = readByte (req); 137 for (int i = 0; i < 3; i++) 138 val = val * 256 + readByte (req); 139 ipc_trace (" readIVal: %d\n", val); 140 return val; 141 } 142 143 static String 144 readSVal (IPCrequest *req) 145 { 146 int len = readIVal (req); 147 if (len == -1) 148 { 149 ipc_trace (" readSVal: <NULL>\n"); 150 return NULL; 151 } 152 char *str = (char *) malloc (len + 1); 153 char *s = str; 154 *s = (char) 0; 155 while (len--) 156 *s++ = req->rgetc (); 157 *s = (char) 0; 158 ipc_trace (" readSVal: '%s'\n", str); 159 return str; 160 } 161 162 static long long 163 readLVal (IPCrequest *req) 164 { 165 long long val = readByte (req); 166 for (int i = 0; i < 7; i++) 167 val = val * 256 + readByte (req); 168 ipc_trace (" readLVal: %lld\n", val); 169 return val; 170 } 171 172 static bool 173 readBVal (IPCrequest *req) 174 { 175 int val = readByte (req); 176 ipc_trace (" readBVal: %s\n", val == 0 ? "true" : "false"); 177 return val != 0; 178 } 179 180 static char 181 readCVal (IPCrequest *req) 182 { 183 int val = readByte (req); 184 ipc_trace (" readCVal: %d\n", val); 185 return (char) val; 186 } 187 188 static double 189 readDVal (IPCrequest *req) 190 { 191 String s = readSVal (req); 192 double d = atof (s); 193 free (s); 194 return d; 195 } 196 197 static Object 198 readAVal (IPCrequest *req) 199 { 200 bool twoD = false; 201 int type = readByte (req); 202 if (type == L_ARRAY) 203 { 204 twoD = true; 205 type = readByte (req); 206 } 207 ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type); 208 209 int len = readIVal (req); 210 if (len == -1) 211 return NULL; 212 switch (type) 213 { 214 case L_INTEGER: 215 if (twoD) 216 { 217 Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len); 218 for (int i = 0; i < len; i++) 219 array->store (i, (Vector<int>*)readAVal (req)); 220 return array; 221 } 222 else 223 { 224 Vector<int> *array = new Vector<int>(len); 225 for (int i = 0; i < len; i++) 226 array->store (i, readIVal (req)); 227 return array; 228 } 229 //break; 230 case L_LONG: 231 if (twoD) 232 { 233 Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len); 234 for (int i = 0; i < len; i++) 235 array->store (i, (Vector<long long>*)readAVal (req)); 236 return array; 237 } 238 else 239 { 240 Vector<long long> *array = new Vector<long long>(len); 241 for (int i = 0; i < len; i++) 242 array->store (i, readLVal (req)); 243 return array; 244 } 245 //break; 246 case L_DOUBLE: 247 if (twoD) 248 { 249 Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len); 250 for (int i = 0; i < len; i++) 251 array->store (i, (Vector<double>*)readAVal (req)); 252 return array; 253 } 254 else 255 { 256 Vector<double> *array = new Vector<double>(len); 257 for (int i = 0; i < len; i++) 258 array->store (i, readDVal (req)); 259 return array; 260 } 261 //break; 262 case L_BOOLEAN: 263 if (twoD) 264 { 265 Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len); 266 for (int i = 0; i < len; i++) 267 array->store (i, (Vector<bool>*)readAVal (req)); 268 return array; 269 } 270 else 271 { 272 Vector<bool> *array = new Vector<bool>(len); 273 for (int i = 0; i < len; i++) 274 array->store (i, readBVal (req)); 275 return array; 276 } 277 //break; 278 case L_CHAR: 279 if (twoD) 280 { 281 Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len); 282 for (int i = 0; i < len; i++) 283 array->store (i, (Vector<char>*)readAVal (req)); 284 return array; 285 } 286 else 287 { 288 Vector<char> *array = new Vector<char>(len); 289 for (int i = 0; i < len; i++) 290 array->store (i, readCVal (req)); 291 return array; 292 } 293 //break; 294 case L_STRING: 295 if (twoD) 296 { 297 Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len); 298 for (int i = 0; i < len; i++) 299 array->store (i, (Vector<String>*)readAVal (req)); 300 return array; 301 } 302 else 303 { 304 Vector<String> *array = new Vector<String>(len); 305 for (int i = 0; i < len; i++) 306 array->store (i, readSVal (req)); 307 return array; 308 } 309 //break; 310 case L_OBJECT: 311 if (twoD) 312 { 313 Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len); 314 for (int i = 0; i < len; i++) 315 array->store (i, (Vector<Object>*)readAVal (req)); 316 return array; 317 } 318 else 319 { 320 Vector<Object> *array = new Vector<Object>(len); 321 for (int i = 0; i < len; i++) 322 array->store (i, readAVal (req)); 323 return array; 324 } 325 //break; 326 default: 327 fprintf (stderr, "readAVal: Unknown code: %d\n", type); 328 break; 329 } 330 return NULL; 331 } 332 333 static int iVal; 334 static bool bVal; 335 static long long lVal; 336 static String sVal; 337 static double dVal; 338 static Object aVal; 339 340 static void 341 readResult (int type, IPCrequest *req) 342 { 343 int tVal = readByte (req); 344 switch (tVal) 345 { 346 case L_INTEGER: 347 iVal = readIVal (req); 348 break; 349 case L_LONG: 350 lVal = readLVal (req); 351 break; 352 case L_BOOLEAN: 353 bVal = readBVal (req); 354 break; 355 case L_DOUBLE: 356 dVal = readDVal (req); 357 break; 358 case L_STRING: 359 sVal = readSVal (req); 360 break; 361 case L_ARRAY: 362 aVal = readAVal (req); 363 break; 364 case EOF: 365 fprintf (stderr, "EOF read in readResult\n"); 366 sVal = NULL; 367 return; 368 default: 369 fprintf (stderr, "Unknown code: %d\n", tVal); 370 abort (); 371 } 372 if (type != tVal) 373 { 374 fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type); 375 abort (); 376 } 377 } 378 379 int 380 readInt (IPCrequest *req) 381 { 382 readResult (L_INTEGER, req); 383 return iVal; 384 } 385 386 String 387 readString (IPCrequest *req) 388 { 389 readResult (L_STRING, req); 390 return sVal; 391 } 392 393 long long 394 readLong (IPCrequest *req) 395 { 396 readResult (L_LONG, req); 397 return lVal; 398 } 399 400 double 401 readDouble (IPCrequest *req) 402 { 403 readResult (L_DOUBLE, req); 404 return dVal; 405 } 406 407 bool 408 readBoolean (IPCrequest *req) 409 { 410 readResult (L_BOOLEAN, req); 411 return bVal; 412 } 413 414 DbeObj 415 readObject (IPCrequest *req) 416 { 417 readResult (L_LONG, req); 418 return (DbeObj) lVal; 419 } 420 421 Object 422 readArray (IPCrequest *req) 423 { 424 readResult (L_ARRAY, req); 425 return aVal; 426 } 427 428 // Write 429 IPCresponse::IPCresponse (int sz) 430 { 431 requestID = -1; 432 channelID = -1; 433 responseType = -1; 434 responseStatus = RESPONSE_STATUS_SUCCESS; 435 sb = new StringBuilder (sz); 436 next = NULL; 437 } 438 439 IPCresponse::~IPCresponse () 440 { 441 delete sb; 442 } 443 444 void 445 IPCresponse::reset () 446 { 447 requestID = -1; 448 channelID = -1; 449 responseType = -1; 450 responseStatus = RESPONSE_STATUS_SUCCESS; 451 sb->setLength (0); 452 } 453 454 void 455 IPCresponse::sendByte (int b) 456 { 457 ipc_trace ("sendByte: %02x %d\n", b, b); 458 sb->appendf ("%02x", b); 459 } 460 461 void 462 IPCresponse::sendIVal (int i) 463 { 464 ipc_trace ("sendIVal: %08x %d\n", i, i); 465 sb->appendf ("%08x", i); 466 } 467 468 void 469 IPCresponse::sendLVal (long long l) 470 { 471 ipc_trace ("sendLVal: %016llx %lld\n", l, l); 472 sb->appendf ("%016llx", l); 473 } 474 475 void 476 IPCresponse::sendSVal (const char *s) 477 { 478 if (s == NULL) 479 { 480 sendIVal (-1); 481 return; 482 } 483 sendIVal ((int) strlen (s)); 484 ipc_trace ("sendSVal: %s\n", s); 485 sb->appendf ("%s", s); 486 } 487 488 void 489 IPCresponse::sendBVal (bool b) 490 { 491 sendByte (b ? 1 : 0); 492 } 493 494 void 495 IPCresponse::sendCVal (char c) 496 { 497 sendByte (c); 498 } 499 500 void 501 IPCresponse::sendDVal (double d) 502 { 503 char str[32]; 504 snprintf (str, sizeof (str), "%.12f", d); 505 sendSVal (str); 506 } 507 508 void 509 IPCresponse::sendAVal (void *ptr) 510 { 511 if (ptr == NULL) 512 { 513 sendByte (L_INTEGER); 514 sendIVal (-1); 515 return; 516 } 517 518 VecType type = ((Vector<void*>*)ptr)->type (); 519 switch (type) 520 { 521 case VEC_INTEGER: 522 { 523 sendByte (L_INTEGER); 524 Vector<int> *array = (Vector<int>*)ptr; 525 sendIVal (array->size ()); 526 for (int i = 0; i < array->size (); i++) 527 sendIVal (array->fetch (i)); 528 break; 529 } 530 case VEC_BOOL: 531 { 532 sendByte (L_BOOLEAN); 533 Vector<bool> *array = (Vector<bool>*)ptr; 534 sendIVal (array->size ()); 535 for (int i = 0; i < array->size (); i++) 536 sendBVal (array->fetch (i)); 537 break; 538 } 539 case VEC_CHAR: 540 { 541 sendByte (L_CHAR); 542 Vector<char> *array = (Vector<char>*)ptr; 543 sendIVal (array->size ()); 544 for (int i = 0; i < array->size (); i++) 545 sendCVal (array->fetch (i)); 546 break; 547 } 548 case VEC_LLONG: 549 { 550 sendByte (L_LONG); 551 Vector<long long> *array = (Vector<long long>*)ptr; 552 sendIVal (array->size ()); 553 for (int i = 0; i < array->size (); i++) 554 sendLVal (array->fetch (i)); 555 break; 556 } 557 case VEC_DOUBLE: 558 { 559 sendByte (L_DOUBLE); 560 Vector<double> *array = (Vector<double>*)ptr; 561 sendIVal (array->size ()); 562 for (int i = 0; i < array->size (); i++) 563 sendDVal (array->fetch (i)); 564 break; 565 } 566 case VEC_STRING: 567 { 568 sendByte (L_STRING); 569 Vector<String> *array = (Vector<String>*)ptr; 570 sendIVal (array->size ()); 571 for (int i = 0; i < array->size (); i++) 572 sendSVal (array->fetch (i)); 573 break; 574 } 575 case VEC_STRINGARR: 576 { 577 sendByte (L_ARRAY); 578 sendByte (L_STRING); 579 Vector<void*> *array = (Vector<void*>*)ptr; 580 sendIVal (array->size ()); 581 for (int i = 0; i < array->size (); i++) 582 sendAVal (array->fetch (i)); 583 break; 584 } 585 case VEC_INTARR: 586 { 587 sendByte (L_ARRAY); 588 sendByte (L_INTEGER); 589 Vector<void*> *array = (Vector<void*>*)ptr; 590 sendIVal (array->size ()); 591 for (int i = 0; i < array->size (); i++) 592 sendAVal (array->fetch (i)); 593 break; 594 } 595 case VEC_LLONGARR: 596 { 597 sendByte (L_ARRAY); 598 sendByte (L_LONG); 599 Vector<void*> *array = (Vector<void*>*)ptr; 600 sendIVal (array->size ()); 601 for (int i = 0; i < array->size (); i++) 602 sendAVal (array->fetch (i)); 603 break; 604 } 605 case VEC_VOIDARR: 606 { 607 sendByte (L_OBJECT); 608 Vector<void*> *array = (Vector<void*>*)ptr; 609 sendIVal (array->size ()); 610 for (int i = 0; i < array->size (); i++) 611 sendAVal (array->fetch (i)); 612 break; 613 } 614 default: 615 fprintf (stderr, "sendAVal: Unknown type: %d\n", type); 616 abort (); 617 } 618 } 619 620 bool 621 cancelNeeded (int chID) 622 { 623 if (chID == cancellableChannelID && chID == cancelRequestedChannelID) 624 return true; 625 else 626 return false; 627 } 628 629 static void 630 writeResponseWithHeader (int requestID, int channelID, int responseType, 631 int responseStatus, IPCresponse* os) 632 { 633 if (cancelNeeded (channelID)) 634 { 635 responseStatus = RESPONSE_STATUS_CANCELLED; 636 ipc_trace ("CANCELLING %d %d\n", requestID, channelID); 637 // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here 638 } 639 os->setRequestID (requestID); 640 os->setChannelID (channelID); 641 os->setResponseType (responseType); 642 os->setResponseStatus (responseStatus); 643 os->print (); 644 os->reset (); 645 responseBufferPool->recycle (os); 646 } 647 648 void 649 writeAck (int requestID, int channelID) 650 { 651 #if DEBUG 652 char *s = getenv (NTXT ("SP_NO_IPC_ACK")); 653 #else /* ^DEBUG */ 654 char *s = NULL; 655 #endif /* ^DEBUG */ 656 if (s) 657 { 658 int i = requestID; 659 int j = channelID; 660 ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j); 661 } 662 else 663 { 664 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); 665 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK, 666 RESPONSE_STATUS_SUCCESS, OUTS); 667 } 668 } 669 670 void 671 writeHandshake (int requestID, int channelID) 672 { 673 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); 674 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS); 675 } 676 677 void 678 writeResponseGeneric (int responseStatus, int requestID, int channelID) 679 { 680 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); 681 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS); 682 } 683 684 BufferPool::BufferPool () 685 { 686 pthread_mutex_init (&p_mutex, NULL); 687 smallBuf = NULL; 688 largeBuf = NULL; 689 } 690 691 BufferPool::~BufferPool () 692 { 693 for (IPCresponse *p = smallBuf; p;) 694 { 695 IPCresponse *tmp = p; 696 p = tmp->next; 697 delete tmp; 698 } 699 for (IPCresponse *p = largeBuf; p;) 700 { 701 IPCresponse *tmp = p; 702 p = tmp->next; 703 delete tmp; 704 } 705 } 706 707 IPCresponse* 708 BufferPool::getNewResponse (int size) 709 { 710 pthread_mutex_lock (&p_mutex); 711 if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE) 712 size = BUFFER_SIZE_LARGE; 713 IPCresponse *newResponse = NULL; 714 if (size >= BUFFER_SIZE_LARGE) 715 { 716 if (largeBuf) 717 { 718 newResponse = largeBuf; 719 largeBuf = largeBuf->next; 720 } 721 } 722 else if (smallBuf) 723 { 724 newResponse = smallBuf; 725 smallBuf = smallBuf->next; 726 } 727 if (newResponse) 728 newResponse->reset (); 729 else 730 { 731 newResponse = new IPCresponse (size); 732 ipc_trace ("GETNEWBUFFER %d\n", size); 733 } 734 pthread_mutex_unlock (&p_mutex); 735 return newResponse; 736 } 737 738 void 739 BufferPool::recycle (IPCresponse *respB) 740 { 741 pthread_mutex_lock (&p_mutex); 742 if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE) 743 { 744 respB->next = largeBuf; 745 largeBuf = respB; 746 } 747 else 748 { 749 respB->next = smallBuf; 750 smallBuf = respB; 751 } 752 pthread_mutex_unlock (&p_mutex); 753 } 754 755 void 756 writeArray (void *ptr, IPCrequest* req) 757 { 758 if (req->getStatus () == CANCELLED_IMMEDIATE) 759 return; 760 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE); 761 OUTS->sendByte (L_ARRAY); 762 OUTS->sendAVal (ptr); 763 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), 764 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 765 } 766 767 void 768 writeString (const char *s, IPCrequest* req) 769 { 770 if (req->getStatus () == CANCELLED_IMMEDIATE) 771 return; 772 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE); 773 OUTS->sendByte (L_STRING); 774 OUTS->sendSVal (s); 775 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), 776 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 777 } 778 779 void 780 writeObject (DbeObj obj, IPCrequest* req) 781 { 782 writeLong ((long long) obj, req); 783 } 784 785 void 786 writeBoolean (bool b, IPCrequest* req) 787 { 788 if (req->getStatus () == CANCELLED_IMMEDIATE) 789 return; 790 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 791 OUTS->sendByte (L_BOOLEAN); 792 OUTS->sendBVal (b); 793 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), 794 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 795 } 796 797 void 798 writeInt (int i, IPCrequest* req) 799 { 800 if (req->getStatus () == CANCELLED_IMMEDIATE) 801 return; 802 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 803 OUTS->sendByte (L_INTEGER); 804 OUTS->sendIVal (i); 805 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 806 } 807 808 void 809 writeChar (char c, IPCrequest* req) 810 { 811 if (req->getStatus () == CANCELLED_IMMEDIATE) 812 return; 813 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 814 OUTS->sendByte (L_CHAR); 815 OUTS->sendCVal (c); 816 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 817 } 818 819 void 820 writeLong (long long l, IPCrequest* req) 821 { 822 if (req->getStatus () == CANCELLED_IMMEDIATE) 823 return; 824 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 825 OUTS->sendByte (L_LONG); 826 OUTS->sendLVal (l); 827 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 828 } 829 830 void 831 writeDouble (double d, IPCrequest* req) 832 { 833 if (req->getStatus () == CANCELLED_IMMEDIATE) return; 834 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 835 OUTS->sendByte (L_DOUBLE); 836 OUTS->sendDVal (d); 837 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 838 } 839 840 int 841 setProgress (int percentage, const char *proc_str) 842 { 843 if (cancelNeeded (currentChannelID)) 844 { 845 // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException(); 846 // throw (e1); 847 return 1; 848 } 849 if (NULL == proc_str) 850 return 1; 851 int size = strlen (proc_str) + 100; // 100 bytes for additional data 852 int bs = BUFFER_SIZE_MEDIUM; 853 if (size > BUFFER_SIZE_MEDIUM) 854 { 855 if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen 856 bs = BUFFER_SIZE_LARGE; 857 } 858 IPCresponse *OUTS = responseBufferPool->getNewResponse (bs); 859 OUTS->sendByte (L_PROGRESS); 860 OUTS->sendIVal (percentage); 861 OUTS->sendSVal (proc_str); 862 writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS); 863 return 0; 864 } 865 866 static pthread_mutex_t responce_lock = PTHREAD_MUTEX_INITIALIZER; 867 868 void 869 IPCresponse::print (void) 870 { 871 char buf[23]; 872 int sz = responseType == RESPONSE_TYPE_HANDSHAKE ? 873 IPC_VERSION_NUMBER : sb->length (); 874 snprintf (buf, sizeof (buf), "%02x%08x%02x%02x%08x", HEADER_MARKER, 875 requestID, responseType, responseStatus, sz); 876 pthread_mutex_lock (&responce_lock); 877 ipc_response_trace (TRACE_LVL_1, 878 "IPCresponse: ID=%08x type=%02x status=%02x sz:%6d\n", 879 requestID, responseType, responseStatus, sz); 880 write (1, buf, 22); 881 sb->write (1); 882 pthread_mutex_unlock (&responce_lock); 883 } 884 885 void 886 setCancelRequestedCh (int chID) 887 { 888 cancelRequestedChannelID = chID; 889 } 890 891 void 892 readRequestHeader () 893 { 894 int marker = readByte (NULL); 895 if (marker != HEADER_MARKER) 896 { 897 fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker); 898 error_flag = 1; 899 return; 900 } 901 else 902 ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n"); 903 int requestID = readIVal (NULL); 904 int requestType = readByte (NULL); 905 int channelID = readIVal (NULL); 906 int nBytes = readIVal (NULL); 907 if (requestType == REQUEST_TYPE_HANDSHAKE) 908 { 909 // write the ack directly to the wire, not through the response queue 910 writeAck (requestID, channelID); 911 writeHandshake (requestID, channelID); 912 ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); 913 } 914 else if (requestType == REQUEST_TYPE_CANCEL) 915 { 916 writeAck (requestID, channelID); 917 ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); 918 if (channelID == cancellableChannelID) 919 { 920 // we have worked on at least one request belonging to this channel 921 writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID); 922 setCancelRequestedCh (channelID); 923 ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID); 924 if (channelID == currentChannelID) 925 // request for this channel is currently in progress 926 ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION"); 927 // ssp_post_cond(waitingToFinish); 928 } 929 else 930 { 931 // FIXME: 932 // it is possible that a request for this channel is on the requestQ 933 // or has been submitted to the work group queue but is waiting for a thread to pick it up 934 writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID); 935 setCancelRequestedCh (channelID); 936 ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID); 937 } 938 } 939 else 940 { 941 writeAck (requestID, channelID); 942 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); 943 IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID); 944 nreq->read (); 945 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID); 946 if (cancelNeeded (channelID)) 947 { 948 ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID); 949 writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID); 950 delete nreq; 951 return; 952 } 953 DbeQueue *q = new DbeQueue (ipc_doWork, nreq); 954 ipcThreadPool->put_queue (q); 955 } 956 } 957