1 /* Copyright (C) 2021-2024 Free Software Foundation, Inc.
2 Contributed by Oracle.
3
4 This file is part of GNU Binutils.
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
9 any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, 51 Franklin Street - Fifth Floor, Boston,
19 MA 02110-1301, USA. */
20
21 #include "config.h"
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <signal.h>
25 #include <unistd.h>
26 #include <iostream>
27 #include <iomanip>
28 #include <sstream>
29 #include <queue>
30 #include "vec.h"
31 #include "util.h"
32 #include "ipcio.h"
33 #include "DbeThread.h"
34 #include "Experiment.h"
35
36 #define ipc_trace if (ipc_flags) ipc_default_log
37 #define ipc_request_trace if (ipc_flags) ipc_request_log
38 #define ipc_response_trace if (ipc_flags) ipc_response_log
39
40 using namespace std;
41
42 // IPC implementation
43 static const int L_PROGRESS = 0;
44 static const int L_INTEGER = 1;
45 static const int L_BOOLEAN = 2;
46 static const int L_LONG = 3;
47 static const int L_STRING = 4;
48 static const int L_DOUBLE = 5;
49 static const int L_ARRAY = 6;
50 static const int L_OBJECT = 7;
51 static const int L_CHAR = 8;
52
53 int currentRequestID;
54 int currentChannelID;
55 IPCresponse *IPCresponseGlobal;
56
57 BufferPool *responseBufferPool;
58
IPCrequest(int sz,int reqID,int chID)59 IPCrequest::IPCrequest (int sz, int reqID, int chID)
60 {
61 size = sz;
62 requestID = reqID;
63 channelID = chID;
64 status = INITIALIZED;
65 idx = 0;
66 buf = (char *) malloc (size);
67 cancelImmediate = false;
68 }
69
~IPCrequest()70 IPCrequest::~IPCrequest ()
71 {
72 free (buf);
73 }
74
75 void
read(void)76 IPCrequest::read (void)
77 {
78 for (int i = 0; i < size; i++)
79 {
80 int c = getc (stdin);
81 ipc_request_trace (TRACE_LVL_4, " IPCrequest:getc(stdin): %02x\n", c);
82 buf[i] = c;
83 }
84 }
85
86 IPCrequestStatus
getStatus(void)87 IPCrequest::getStatus (void)
88 {
89 return status;
90 }
91
92 void
setStatus(IPCrequestStatus newStatus)93 IPCrequest::setStatus (IPCrequestStatus newStatus)
94 {
95 status = newStatus;
96 }
97
98 static int
readByte(IPCrequest * req)99 readByte (IPCrequest* req)
100 {
101 int c;
102 int val = 0;
103 for (int i = 0; i < 2; i++)
104 {
105 if (req == NULL)
106 {
107 c = getc (stdin);
108 ipc_request_trace (TRACE_LVL_4, " readByte:getc(stdin): %02x\n", c);
109 }
110 else
111 c = req->rgetc ();
112 switch (c)
113 {
114 case '0': case '1': case '2': case '3':
115 case '4': case '5': case '6': case '7':
116 case '8': case '9':
117 val = val * 16 + c - '0';
118 break;
119 case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
120 val = val * 16 + c - 'a' + 10;
121 break;
122 case EOF:
123 val = EOF;
124 break;
125 default:
126 fprintf (stderr, "readByte: Unknown byte: %d\n", c);
127 break;
128 }
129 }
130 return val;
131 }
132
133 static int
readIVal(IPCrequest * req)134 readIVal (IPCrequest *req)
135 {
136 int val = readByte (req);
137 for (int i = 0; i < 3; i++)
138 val = val * 256 + readByte (req);
139 ipc_trace (" readIVal: %d\n", val);
140 return val;
141 }
142
143 static String
readSVal(IPCrequest * req)144 readSVal (IPCrequest *req)
145 {
146 int len = readIVal (req);
147 if (len == -1)
148 {
149 ipc_trace (" readSVal: <NULL>\n");
150 return NULL;
151 }
152 char *str = (char *) malloc (len + 1);
153 char *s = str;
154 *s = (char) 0;
155 while (len--)
156 *s++ = req->rgetc ();
157 *s = (char) 0;
158 ipc_trace (" readSVal: '%s'\n", str);
159 return str;
160 }
161
162 static long long
readLVal(IPCrequest * req)163 readLVal (IPCrequest *req)
164 {
165 long long val = readByte (req);
166 for (int i = 0; i < 7; i++)
167 val = val * 256 + readByte (req);
168 ipc_trace (" readLVal: %lld\n", val);
169 return val;
170 }
171
172 static bool
readBVal(IPCrequest * req)173 readBVal (IPCrequest *req)
174 {
175 int val = readByte (req);
176 ipc_trace (" readBVal: %s\n", val == 0 ? "true" : "false");
177 return val != 0;
178 }
179
180 static char
readCVal(IPCrequest * req)181 readCVal (IPCrequest *req)
182 {
183 int val = readByte (req);
184 ipc_trace (" readCVal: %d\n", val);
185 return (char) val;
186 }
187
188 static double
readDVal(IPCrequest * req)189 readDVal (IPCrequest *req)
190 {
191 String s = readSVal (req);
192 double d = atof (s);
193 free (s);
194 return d;
195 }
196
197 static Object
readAVal(IPCrequest * req)198 readAVal (IPCrequest *req)
199 {
200 bool twoD = false;
201 int type = readByte (req);
202 if (type == L_ARRAY)
203 {
204 twoD = true;
205 type = readByte (req);
206 }
207 ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type);
208
209 int len = readIVal (req);
210 if (len == -1)
211 return NULL;
212 switch (type)
213 {
214 case L_INTEGER:
215 if (twoD)
216 {
217 Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len);
218 for (int i = 0; i < len; i++)
219 array->store (i, (Vector<int>*)readAVal (req));
220 return array;
221 }
222 else
223 {
224 Vector<int> *array = new Vector<int>(len);
225 for (int i = 0; i < len; i++)
226 array->store (i, readIVal (req));
227 return array;
228 }
229 //break;
230 case L_LONG:
231 if (twoD)
232 {
233 Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len);
234 for (int i = 0; i < len; i++)
235 array->store (i, (Vector<long long>*)readAVal (req));
236 return array;
237 }
238 else
239 {
240 Vector<long long> *array = new Vector<long long>(len);
241 for (int i = 0; i < len; i++)
242 array->store (i, readLVal (req));
243 return array;
244 }
245 //break;
246 case L_DOUBLE:
247 if (twoD)
248 {
249 Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len);
250 for (int i = 0; i < len; i++)
251 array->store (i, (Vector<double>*)readAVal (req));
252 return array;
253 }
254 else
255 {
256 Vector<double> *array = new Vector<double>(len);
257 for (int i = 0; i < len; i++)
258 array->store (i, readDVal (req));
259 return array;
260 }
261 //break;
262 case L_BOOLEAN:
263 if (twoD)
264 {
265 Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len);
266 for (int i = 0; i < len; i++)
267 array->store (i, (Vector<bool>*)readAVal (req));
268 return array;
269 }
270 else
271 {
272 Vector<bool> *array = new Vector<bool>(len);
273 for (int i = 0; i < len; i++)
274 array->store (i, readBVal (req));
275 return array;
276 }
277 //break;
278 case L_CHAR:
279 if (twoD)
280 {
281 Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len);
282 for (int i = 0; i < len; i++)
283 array->store (i, (Vector<char>*)readAVal (req));
284 return array;
285 }
286 else
287 {
288 Vector<char> *array = new Vector<char>(len);
289 for (int i = 0; i < len; i++)
290 array->store (i, readCVal (req));
291 return array;
292 }
293 //break;
294 case L_STRING:
295 if (twoD)
296 {
297 Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len);
298 for (int i = 0; i < len; i++)
299 array->store (i, (Vector<String>*)readAVal (req));
300 return array;
301 }
302 else
303 {
304 Vector<String> *array = new Vector<String>(len);
305 for (int i = 0; i < len; i++)
306 array->store (i, readSVal (req));
307 return array;
308 }
309 //break;
310 case L_OBJECT:
311 if (twoD)
312 {
313 Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len);
314 for (int i = 0; i < len; i++)
315 array->store (i, (Vector<Object>*)readAVal (req));
316 return array;
317 }
318 else
319 {
320 Vector<Object> *array = new Vector<Object>(len);
321 for (int i = 0; i < len; i++)
322 array->store (i, readAVal (req));
323 return array;
324 }
325 //break;
326 default:
327 fprintf (stderr, "readAVal: Unknown code: %d\n", type);
328 break;
329 }
330 return NULL;
331 }
332
333 static int iVal;
334 static bool bVal;
335 static long long lVal;
336 static String sVal;
337 static double dVal;
338 static Object aVal;
339
340 static void
readResult(int type,IPCrequest * req)341 readResult (int type, IPCrequest *req)
342 {
343 int tVal = readByte (req);
344 switch (tVal)
345 {
346 case L_INTEGER:
347 iVal = readIVal (req);
348 break;
349 case L_LONG:
350 lVal = readLVal (req);
351 break;
352 case L_BOOLEAN:
353 bVal = readBVal (req);
354 break;
355 case L_DOUBLE:
356 dVal = readDVal (req);
357 break;
358 case L_STRING:
359 sVal = readSVal (req);
360 break;
361 case L_ARRAY:
362 aVal = readAVal (req);
363 break;
364 case EOF:
365 fprintf (stderr, "EOF read in readResult\n");
366 sVal = NULL;
367 return;
368 default:
369 fprintf (stderr, "Unknown code: %d\n", tVal);
370 abort ();
371 }
372 if (type != tVal)
373 {
374 fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type);
375 abort ();
376 }
377 }
378
379 int
readInt(IPCrequest * req)380 readInt (IPCrequest *req)
381 {
382 readResult (L_INTEGER, req);
383 return iVal;
384 }
385
386 String
readString(IPCrequest * req)387 readString (IPCrequest *req)
388 {
389 readResult (L_STRING, req);
390 return sVal;
391 }
392
393 long long
readLong(IPCrequest * req)394 readLong (IPCrequest *req)
395 {
396 readResult (L_LONG, req);
397 return lVal;
398 }
399
400 double
readDouble(IPCrequest * req)401 readDouble (IPCrequest *req)
402 {
403 readResult (L_DOUBLE, req);
404 return dVal;
405 }
406
407 bool
readBoolean(IPCrequest * req)408 readBoolean (IPCrequest *req)
409 {
410 readResult (L_BOOLEAN, req);
411 return bVal;
412 }
413
414 DbeObj
readObject(IPCrequest * req)415 readObject (IPCrequest *req)
416 {
417 readResult (L_LONG, req);
418 return (DbeObj) lVal;
419 }
420
421 Object
readArray(IPCrequest * req)422 readArray (IPCrequest *req)
423 {
424 readResult (L_ARRAY, req);
425 return aVal;
426 }
427
428 // Write
IPCresponse(int sz)429 IPCresponse::IPCresponse (int sz)
430 {
431 requestID = -1;
432 channelID = -1;
433 responseType = -1;
434 responseStatus = RESPONSE_STATUS_SUCCESS;
435 sb = new StringBuilder (sz);
436 next = NULL;
437 }
438
~IPCresponse()439 IPCresponse::~IPCresponse ()
440 {
441 delete sb;
442 }
443
444 void
reset()445 IPCresponse::reset ()
446 {
447 requestID = -1;
448 channelID = -1;
449 responseType = -1;
450 responseStatus = RESPONSE_STATUS_SUCCESS;
451 sb->setLength (0);
452 }
453
454 void
sendByte(int b)455 IPCresponse::sendByte (int b)
456 {
457 ipc_trace ("sendByte: %02x %d\n", b, b);
458 sb->appendf ("%02x", b);
459 }
460
461 void
sendIVal(int i)462 IPCresponse::sendIVal (int i)
463 {
464 ipc_trace ("sendIVal: %08x %d\n", i, i);
465 sb->appendf ("%08x", i);
466 }
467
468 void
sendLVal(long long l)469 IPCresponse::sendLVal (long long l)
470 {
471 ipc_trace ("sendLVal: %016llx %lld\n", l, l);
472 sb->appendf ("%016llx", l);
473 }
474
475 void
sendSVal(const char * s)476 IPCresponse::sendSVal (const char *s)
477 {
478 if (s == NULL)
479 {
480 sendIVal (-1);
481 return;
482 }
483 sendIVal ((int) strlen (s));
484 ipc_trace ("sendSVal: %s\n", s);
485 sb->appendf ("%s", s);
486 }
487
488 void
sendBVal(bool b)489 IPCresponse::sendBVal (bool b)
490 {
491 sendByte (b ? 1 : 0);
492 }
493
494 void
sendCVal(char c)495 IPCresponse::sendCVal (char c)
496 {
497 sendByte (c);
498 }
499
500 void
sendDVal(double d)501 IPCresponse::sendDVal (double d)
502 {
503 char str[32];
504 snprintf (str, sizeof (str), "%.12f", d);
505 sendSVal (str);
506 }
507
508 void
sendAVal(void * ptr)509 IPCresponse::sendAVal (void *ptr)
510 {
511 if (ptr == NULL)
512 {
513 sendByte (L_INTEGER);
514 sendIVal (-1);
515 return;
516 }
517
518 VecType type = ((Vector<void*>*)ptr)->type ();
519 switch (type)
520 {
521 case VEC_INTEGER:
522 {
523 sendByte (L_INTEGER);
524 Vector<int> *array = (Vector<int>*)ptr;
525 sendIVal (array->size ());
526 for (int i = 0; i < array->size (); i++)
527 sendIVal (array->fetch (i));
528 break;
529 }
530 case VEC_BOOL:
531 {
532 sendByte (L_BOOLEAN);
533 Vector<bool> *array = (Vector<bool>*)ptr;
534 sendIVal (array->size ());
535 for (int i = 0; i < array->size (); i++)
536 sendBVal (array->fetch (i));
537 break;
538 }
539 case VEC_CHAR:
540 {
541 sendByte (L_CHAR);
542 Vector<char> *array = (Vector<char>*)ptr;
543 sendIVal (array->size ());
544 for (int i = 0; i < array->size (); i++)
545 sendCVal (array->fetch (i));
546 break;
547 }
548 case VEC_LLONG:
549 {
550 sendByte (L_LONG);
551 Vector<long long> *array = (Vector<long long>*)ptr;
552 sendIVal (array->size ());
553 for (int i = 0; i < array->size (); i++)
554 sendLVal (array->fetch (i));
555 break;
556 }
557 case VEC_DOUBLE:
558 {
559 sendByte (L_DOUBLE);
560 Vector<double> *array = (Vector<double>*)ptr;
561 sendIVal (array->size ());
562 for (int i = 0; i < array->size (); i++)
563 sendDVal (array->fetch (i));
564 break;
565 }
566 case VEC_STRING:
567 {
568 sendByte (L_STRING);
569 Vector<String> *array = (Vector<String>*)ptr;
570 sendIVal (array->size ());
571 for (int i = 0; i < array->size (); i++)
572 sendSVal (array->fetch (i));
573 break;
574 }
575 case VEC_STRINGARR:
576 {
577 sendByte (L_ARRAY);
578 sendByte (L_STRING);
579 Vector<void*> *array = (Vector<void*>*)ptr;
580 sendIVal (array->size ());
581 for (int i = 0; i < array->size (); i++)
582 sendAVal (array->fetch (i));
583 break;
584 }
585 case VEC_INTARR:
586 {
587 sendByte (L_ARRAY);
588 sendByte (L_INTEGER);
589 Vector<void*> *array = (Vector<void*>*)ptr;
590 sendIVal (array->size ());
591 for (int i = 0; i < array->size (); i++)
592 sendAVal (array->fetch (i));
593 break;
594 }
595 case VEC_LLONGARR:
596 {
597 sendByte (L_ARRAY);
598 sendByte (L_LONG);
599 Vector<void*> *array = (Vector<void*>*)ptr;
600 sendIVal (array->size ());
601 for (int i = 0; i < array->size (); i++)
602 sendAVal (array->fetch (i));
603 break;
604 }
605 case VEC_VOIDARR:
606 {
607 sendByte (L_OBJECT);
608 Vector<void*> *array = (Vector<void*>*)ptr;
609 sendIVal (array->size ());
610 for (int i = 0; i < array->size (); i++)
611 sendAVal (array->fetch (i));
612 break;
613 }
614 default:
615 fprintf (stderr, "sendAVal: Unknown type: %d\n", type);
616 abort ();
617 }
618 }
619
620 bool
cancelNeeded(int chID)621 cancelNeeded (int chID)
622 {
623 if (chID == cancellableChannelID && chID == cancelRequestedChannelID)
624 return true;
625 else
626 return false;
627 }
628
629 static void
writeResponseWithHeader(int requestID,int channelID,int responseType,int responseStatus,IPCresponse * os)630 writeResponseWithHeader (int requestID, int channelID, int responseType,
631 int responseStatus, IPCresponse* os)
632 {
633 if (cancelNeeded (channelID))
634 {
635 responseStatus = RESPONSE_STATUS_CANCELLED;
636 ipc_trace ("CANCELLING %d %d\n", requestID, channelID);
637 // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
638 }
639 os->setRequestID (requestID);
640 os->setChannelID (channelID);
641 os->setResponseType (responseType);
642 os->setResponseStatus (responseStatus);
643 os->print ();
644 os->reset ();
645 responseBufferPool->recycle (os);
646 }
647
648 void
writeAck(int requestID,int channelID)649 writeAck (int requestID, int channelID)
650 {
651 #if DEBUG
652 char *s = getenv (NTXT ("SP_NO_IPC_ACK"));
653 #else /* ^DEBUG */
654 char *s = NULL;
655 #endif /* ^DEBUG */
656 if (s)
657 {
658 int i = requestID;
659 int j = channelID;
660 ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j);
661 }
662 else
663 {
664 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
665 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK,
666 RESPONSE_STATUS_SUCCESS, OUTS);
667 }
668 }
669
670 void
writeHandshake(int requestID,int channelID)671 writeHandshake (int requestID, int channelID)
672 {
673 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
674 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
675 }
676
677 void
writeResponseGeneric(int responseStatus,int requestID,int channelID)678 writeResponseGeneric (int responseStatus, int requestID, int channelID)
679 {
680 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
681 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS);
682 }
683
BufferPool()684 BufferPool::BufferPool ()
685 {
686 pthread_mutex_init (&p_mutex, NULL);
687 smallBuf = NULL;
688 largeBuf = NULL;
689 }
690
~BufferPool()691 BufferPool::~BufferPool ()
692 {
693 for (IPCresponse *p = smallBuf; p;)
694 {
695 IPCresponse *tmp = p;
696 p = tmp->next;
697 delete tmp;
698 }
699 for (IPCresponse *p = largeBuf; p;)
700 {
701 IPCresponse *tmp = p;
702 p = tmp->next;
703 delete tmp;
704 }
705 }
706
707 IPCresponse*
getNewResponse(int size)708 BufferPool::getNewResponse (int size)
709 {
710 pthread_mutex_lock (&p_mutex);
711 if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE)
712 size = BUFFER_SIZE_LARGE;
713 IPCresponse *newResponse = NULL;
714 if (size >= BUFFER_SIZE_LARGE)
715 {
716 if (largeBuf)
717 {
718 newResponse = largeBuf;
719 largeBuf = largeBuf->next;
720 }
721 }
722 else if (smallBuf)
723 {
724 newResponse = smallBuf;
725 smallBuf = smallBuf->next;
726 }
727 if (newResponse)
728 newResponse->reset ();
729 else
730 {
731 newResponse = new IPCresponse (size);
732 ipc_trace ("GETNEWBUFFER %d\n", size);
733 }
734 pthread_mutex_unlock (&p_mutex);
735 return newResponse;
736 }
737
738 void
recycle(IPCresponse * respB)739 BufferPool::recycle (IPCresponse *respB)
740 {
741 pthread_mutex_lock (&p_mutex);
742 if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE)
743 {
744 respB->next = largeBuf;
745 largeBuf = respB;
746 }
747 else
748 {
749 respB->next = smallBuf;
750 smallBuf = respB;
751 }
752 pthread_mutex_unlock (&p_mutex);
753 }
754
755 void
writeArray(void * ptr,IPCrequest * req)756 writeArray (void *ptr, IPCrequest* req)
757 {
758 if (req->getStatus () == CANCELLED_IMMEDIATE)
759 return;
760 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
761 OUTS->sendByte (L_ARRAY);
762 OUTS->sendAVal (ptr);
763 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
764 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
765 }
766
767 void
writeString(const char * s,IPCrequest * req)768 writeString (const char *s, IPCrequest* req)
769 {
770 if (req->getStatus () == CANCELLED_IMMEDIATE)
771 return;
772 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
773 OUTS->sendByte (L_STRING);
774 OUTS->sendSVal (s);
775 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
776 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
777 }
778
779 void
writeObject(DbeObj obj,IPCrequest * req)780 writeObject (DbeObj obj, IPCrequest* req)
781 {
782 writeLong ((long long) obj, req);
783 }
784
785 void
writeBoolean(bool b,IPCrequest * req)786 writeBoolean (bool b, IPCrequest* req)
787 {
788 if (req->getStatus () == CANCELLED_IMMEDIATE)
789 return;
790 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
791 OUTS->sendByte (L_BOOLEAN);
792 OUTS->sendBVal (b);
793 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
794 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
795 }
796
797 void
writeInt(int i,IPCrequest * req)798 writeInt (int i, IPCrequest* req)
799 {
800 if (req->getStatus () == CANCELLED_IMMEDIATE)
801 return;
802 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
803 OUTS->sendByte (L_INTEGER);
804 OUTS->sendIVal (i);
805 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
806 }
807
808 void
writeChar(char c,IPCrequest * req)809 writeChar (char c, IPCrequest* req)
810 {
811 if (req->getStatus () == CANCELLED_IMMEDIATE)
812 return;
813 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
814 OUTS->sendByte (L_CHAR);
815 OUTS->sendCVal (c);
816 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
817 }
818
819 void
writeLong(long long l,IPCrequest * req)820 writeLong (long long l, IPCrequest* req)
821 {
822 if (req->getStatus () == CANCELLED_IMMEDIATE)
823 return;
824 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
825 OUTS->sendByte (L_LONG);
826 OUTS->sendLVal (l);
827 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
828 }
829
830 void
writeDouble(double d,IPCrequest * req)831 writeDouble (double d, IPCrequest* req)
832 {
833 if (req->getStatus () == CANCELLED_IMMEDIATE) return;
834 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
835 OUTS->sendByte (L_DOUBLE);
836 OUTS->sendDVal (d);
837 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
838 }
839
840 int
setProgress(int percentage,const char * proc_str)841 setProgress (int percentage, const char *proc_str)
842 {
843 if (cancelNeeded (currentChannelID))
844 {
845 // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
846 // throw (e1);
847 return 1;
848 }
849 if (NULL == proc_str)
850 return 1;
851 int size = strlen (proc_str) + 100; // 100 bytes for additional data
852 int bs = BUFFER_SIZE_MEDIUM;
853 if (size > BUFFER_SIZE_MEDIUM)
854 {
855 if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen
856 bs = BUFFER_SIZE_LARGE;
857 }
858 IPCresponse *OUTS = responseBufferPool->getNewResponse (bs);
859 OUTS->sendByte (L_PROGRESS);
860 OUTS->sendIVal (percentage);
861 OUTS->sendSVal (proc_str);
862 writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS);
863 return 0;
864 }
865
866 static pthread_mutex_t responce_lock = PTHREAD_MUTEX_INITIALIZER;
867
868 void
print(void)869 IPCresponse::print (void)
870 {
871 char buf[23];
872 int sz = responseType == RESPONSE_TYPE_HANDSHAKE ?
873 IPC_VERSION_NUMBER : sb->length ();
874 snprintf (buf, sizeof (buf), "%02x%08x%02x%02x%08x", HEADER_MARKER,
875 requestID, responseType, responseStatus, sz);
876 pthread_mutex_lock (&responce_lock);
877 ipc_response_trace (TRACE_LVL_1,
878 "IPCresponse: ID=%08x type=%02x status=%02x sz:%6d\n",
879 requestID, responseType, responseStatus, sz);
880 write (1, buf, 22);
881 sb->write (1);
882 pthread_mutex_unlock (&responce_lock);
883 }
884
885 void
setCancelRequestedCh(int chID)886 setCancelRequestedCh (int chID)
887 {
888 cancelRequestedChannelID = chID;
889 }
890
891 void
readRequestHeader()892 readRequestHeader ()
893 {
894 int marker = readByte (NULL);
895 if (marker != HEADER_MARKER)
896 {
897 fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker);
898 error_flag = 1;
899 return;
900 }
901 else
902 ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n");
903 int requestID = readIVal (NULL);
904 int requestType = readByte (NULL);
905 int channelID = readIVal (NULL);
906 int nBytes = readIVal (NULL);
907 if (requestType == REQUEST_TYPE_HANDSHAKE)
908 {
909 // write the ack directly to the wire, not through the response queue
910 writeAck (requestID, channelID);
911 writeHandshake (requestID, channelID);
912 ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
913 }
914 else if (requestType == REQUEST_TYPE_CANCEL)
915 {
916 writeAck (requestID, channelID);
917 ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
918 if (channelID == cancellableChannelID)
919 {
920 // we have worked on at least one request belonging to this channel
921 writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID);
922 setCancelRequestedCh (channelID);
923 ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID);
924 if (channelID == currentChannelID)
925 // request for this channel is currently in progress
926 ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION");
927 // ssp_post_cond(waitingToFinish);
928 }
929 else
930 {
931 // FIXME:
932 // it is possible that a request for this channel is on the requestQ
933 // or has been submitted to the work group queue but is waiting for a thread to pick it up
934 writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID);
935 setCancelRequestedCh (channelID);
936 ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID);
937 }
938 }
939 else
940 {
941 writeAck (requestID, channelID);
942 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
943 IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID);
944 nreq->read ();
945 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID);
946 if (cancelNeeded (channelID))
947 {
948 ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID);
949 writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID);
950 delete nreq;
951 return;
952 }
953 DbeQueue *q = new DbeQueue (ipc_doWork, nreq);
954 ipcThreadPool->put_queue (q);
955 }
956 }
957