xref: /netbsd-src/external/gpl3/binutils/dist/gprofng/src/ipcio.cc (revision f8cf1a9151c7af1cb0bd8b09c13c66bca599c027)
1 /* Copyright (C) 2021-2024 Free Software Foundation, Inc.
2    Contributed by Oracle.
3 
4    This file is part of GNU Binutils.
5 
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3, or (at your option)
9    any later version.
10 
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, 51 Franklin Street - Fifth Floor, Boston,
19    MA 02110-1301, USA.  */
20 
21 #include "config.h"
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <signal.h>
25 #include <unistd.h>
26 #include <iostream>
27 #include <iomanip>
28 #include <sstream>
29 #include <queue>
30 #include "vec.h"
31 #include "util.h"
32 #include "ipcio.h"
33 #include "DbeThread.h"
34 #include "Experiment.h"
35 
36 #define ipc_trace               if (ipc_flags) ipc_default_log
37 #define ipc_request_trace       if (ipc_flags) ipc_request_log
38 #define ipc_response_trace      if (ipc_flags) ipc_response_log
39 
40 using namespace std;
41 
42 // IPC implementation
43 static const int L_PROGRESS = 0;
44 static const int L_INTEGER  = 1;
45 static const int L_BOOLEAN  = 2;
46 static const int L_LONG     = 3;
47 static const int L_STRING   = 4;
48 static const int L_DOUBLE   = 5;
49 static const int L_ARRAY    = 6;
50 static const int L_OBJECT   = 7;
51 static const int L_CHAR     = 8;
52 
53 int currentRequestID;
54 int currentChannelID;
55 IPCresponse *IPCresponseGlobal;
56 
57 BufferPool *responseBufferPool;
58 
59 IPCrequest::IPCrequest (int sz, int reqID, int chID)
60 {
61   size = sz;
62   requestID = reqID;
63   channelID = chID;
64   status = INITIALIZED;
65   idx = 0;
66   buf = (char *) malloc (size);
67   cancelImmediate = false;
68 }
69 
70 IPCrequest::~IPCrequest ()
71 {
72   free (buf);
73 }
74 
75 void
76 IPCrequest::read (void)
77 {
78   for (int i = 0; i < size; i++)
79     {
80       int c = getc (stdin);
81       ipc_request_trace (TRACE_LVL_4, "  IPCrequest:getc(stdin): %02x\n", c);
82       buf[i] = c;
83     }
84 }
85 
86 IPCrequestStatus
87 IPCrequest::getStatus (void)
88 {
89   return status;
90 }
91 
92 void
93 IPCrequest::setStatus (IPCrequestStatus newStatus)
94 {
95   status = newStatus;
96 }
97 
98 static int
99 readByte (IPCrequest* req)
100 {
101   int c;
102   int val = 0;
103   for (int i = 0; i < 2; i++)
104     {
105       if (req == NULL)
106 	{
107 	  c = getc (stdin);
108 	  ipc_request_trace (TRACE_LVL_4, "  readByte:getc(stdin): %02x\n", c);
109 	}
110       else
111 	c = req->rgetc ();
112       switch (c)
113 	{
114 	case '0': case '1': case '2': case '3':
115 	case '4': case '5': case '6': case '7':
116 	case '8': case '9':
117 	  val = val * 16 + c - '0';
118 	  break;
119 	case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
120 	  val = val * 16 + c - 'a' + 10;
121 	  break;
122 	case EOF:
123 	  val = EOF;
124 	  break;
125 	default:
126 	  fprintf (stderr, "readByte: Unknown byte: %d\n", c);
127 	  break;
128 	}
129     }
130   return val;
131 }
132 
133 static int
134 readIVal (IPCrequest *req)
135 {
136   int val = readByte (req);
137   for (int i = 0; i < 3; i++)
138     val = val * 256 + readByte (req);
139   ipc_trace ("  readIVal: %d\n", val);
140   return val;
141 }
142 
143 static String
144 readSVal (IPCrequest *req)
145 {
146   int len = readIVal (req);
147   if (len == -1)
148     {
149       ipc_trace ("  readSVal: <NULL>\n");
150       return NULL;
151     }
152   char *str = (char *) malloc (len + 1);
153   char *s = str;
154   *s = (char) 0;
155   while (len--)
156     *s++ = req->rgetc ();
157   *s = (char) 0;
158   ipc_trace ("  readSVal: '%s'\n", str);
159   return str;
160 }
161 
162 static long long
163 readLVal (IPCrequest *req)
164 {
165   long long val = readByte (req);
166   for (int i = 0; i < 7; i++)
167     val = val * 256 + readByte (req);
168   ipc_trace ("  readLVal: %lld\n", val);
169   return val;
170 }
171 
172 static bool
173 readBVal (IPCrequest *req)
174 {
175   int val = readByte (req);
176   ipc_trace ("  readBVal: %s\n", val == 0 ? "true" : "false");
177   return val != 0;
178 }
179 
180 static char
181 readCVal (IPCrequest *req)
182 {
183   int val = readByte (req);
184   ipc_trace ("  readCVal: %d\n", val);
185   return (char) val;
186 }
187 
188 static double
189 readDVal (IPCrequest *req)
190 {
191   String s = readSVal (req);
192   double d = atof (s);
193   free (s);
194   return d;
195 }
196 
197 static Object
198 readAVal (IPCrequest *req)
199 {
200   bool twoD = false;
201   int type = readByte (req);
202   if (type == L_ARRAY)
203     {
204       twoD = true;
205       type = readByte (req);
206     }
207   ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type);
208 
209   int len = readIVal (req);
210   if (len == -1)
211     return NULL;
212   switch (type)
213     {
214     case L_INTEGER:
215       if (twoD)
216 	{
217 	  Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len);
218 	  for (int i = 0; i < len; i++)
219 	    array->store (i, (Vector<int>*)readAVal (req));
220 	  return array;
221 	}
222       else
223 	{
224 	  Vector<int> *array = new Vector<int>(len);
225 	  for (int i = 0; i < len; i++)
226 	    array->store (i, readIVal (req));
227 	  return array;
228 	}
229       //break;
230     case L_LONG:
231       if (twoD)
232 	{
233 	  Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len);
234 	  for (int i = 0; i < len; i++)
235 	    array->store (i, (Vector<long long>*)readAVal (req));
236 	  return array;
237 	}
238       else
239 	{
240 	  Vector<long long> *array = new Vector<long long>(len);
241 	  for (int i = 0; i < len; i++)
242 	    array->store (i, readLVal (req));
243 	  return array;
244 	}
245       //break;
246     case L_DOUBLE:
247       if (twoD)
248 	{
249 	  Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len);
250 	  for (int i = 0; i < len; i++)
251 	    array->store (i, (Vector<double>*)readAVal (req));
252 	  return array;
253 	}
254       else
255 	{
256 	  Vector<double> *array = new Vector<double>(len);
257 	  for (int i = 0; i < len; i++)
258 	    array->store (i, readDVal (req));
259 	  return array;
260 	}
261       //break;
262     case L_BOOLEAN:
263       if (twoD)
264 	{
265 	  Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len);
266 	  for (int i = 0; i < len; i++)
267 	    array->store (i, (Vector<bool>*)readAVal (req));
268 	  return array;
269 	}
270       else
271 	{
272 	  Vector<bool> *array = new Vector<bool>(len);
273 	  for (int i = 0; i < len; i++)
274 	    array->store (i, readBVal (req));
275 	  return array;
276 	}
277       //break;
278     case L_CHAR:
279       if (twoD)
280 	{
281 	  Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len);
282 	  for (int i = 0; i < len; i++)
283 	    array->store (i, (Vector<char>*)readAVal (req));
284 	  return array;
285 	}
286       else
287 	{
288 	  Vector<char> *array = new Vector<char>(len);
289 	  for (int i = 0; i < len; i++)
290 	    array->store (i, readCVal (req));
291 	  return array;
292 	}
293       //break;
294     case L_STRING:
295       if (twoD)
296 	{
297 	  Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len);
298 	  for (int i = 0; i < len; i++)
299 	    array->store (i, (Vector<String>*)readAVal (req));
300 	  return array;
301 	}
302       else
303 	{
304 	  Vector<String> *array = new Vector<String>(len);
305 	  for (int i = 0; i < len; i++)
306 	    array->store (i, readSVal (req));
307 	  return array;
308 	}
309       //break;
310     case L_OBJECT:
311       if (twoD)
312 	{
313 	  Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len);
314 	  for (int i = 0; i < len; i++)
315 	    array->store (i, (Vector<Object>*)readAVal (req));
316 	  return array;
317 	}
318       else
319 	{
320 	  Vector<Object> *array = new Vector<Object>(len);
321 	  for (int i = 0; i < len; i++)
322 	    array->store (i, readAVal (req));
323 	  return array;
324 	}
325       //break;
326     default:
327       fprintf (stderr, "readAVal: Unknown code: %d\n", type);
328       break;
329     }
330   return NULL;
331 }
332 
333 static int iVal;
334 static bool bVal;
335 static long long lVal;
336 static String sVal;
337 static double dVal;
338 static Object aVal;
339 
340 static void
341 readResult (int type, IPCrequest *req)
342 {
343   int tVal = readByte (req);
344   switch (tVal)
345     {
346     case L_INTEGER:
347       iVal = readIVal (req);
348       break;
349     case L_LONG:
350       lVal = readLVal (req);
351       break;
352     case L_BOOLEAN:
353       bVal = readBVal (req);
354       break;
355     case L_DOUBLE:
356       dVal = readDVal (req);
357       break;
358     case L_STRING:
359       sVal = readSVal (req);
360       break;
361     case L_ARRAY:
362       aVal = readAVal (req);
363       break;
364     case EOF:
365       fprintf (stderr, "EOF read in readResult\n");
366       sVal = NULL;
367       return;
368     default:
369       fprintf (stderr, "Unknown code: %d\n", tVal);
370       abort ();
371     }
372   if (type != tVal)
373     {
374       fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type);
375       abort ();
376     }
377 }
378 
379 int
380 readInt (IPCrequest *req)
381 {
382   readResult (L_INTEGER, req);
383   return iVal;
384 }
385 
386 String
387 readString (IPCrequest *req)
388 {
389   readResult (L_STRING, req);
390   return sVal;
391 }
392 
393 long long
394 readLong (IPCrequest *req)
395 {
396   readResult (L_LONG, req);
397   return lVal;
398 }
399 
400 double
401 readDouble (IPCrequest *req)
402 {
403   readResult (L_DOUBLE, req);
404   return dVal;
405 }
406 
407 bool
408 readBoolean (IPCrequest *req)
409 {
410   readResult (L_BOOLEAN, req);
411   return bVal;
412 }
413 
414 DbeObj
415 readObject (IPCrequest *req)
416 {
417   readResult (L_LONG, req);
418   return (DbeObj) lVal;
419 }
420 
421 Object
422 readArray (IPCrequest *req)
423 {
424   readResult (L_ARRAY, req);
425   return aVal;
426 }
427 
428 // Write
429 IPCresponse::IPCresponse (int sz)
430 {
431   requestID = -1;
432   channelID = -1;
433   responseType = -1;
434   responseStatus = RESPONSE_STATUS_SUCCESS;
435   sb = new StringBuilder (sz);
436   next = NULL;
437 }
438 
439 IPCresponse::~IPCresponse ()
440 {
441   delete sb;
442 }
443 
444 void
445 IPCresponse::reset ()
446 {
447   requestID = -1;
448   channelID = -1;
449   responseType = -1;
450   responseStatus = RESPONSE_STATUS_SUCCESS;
451   sb->setLength (0);
452 }
453 
454 void
455 IPCresponse::sendByte (int b)
456 {
457   ipc_trace ("sendByte: %02x %d\n", b, b);
458   sb->appendf ("%02x", b);
459 }
460 
461 void
462 IPCresponse::sendIVal (int i)
463 {
464   ipc_trace ("sendIVal: %08x %d\n", i, i);
465   sb->appendf ("%08x", i);
466 }
467 
468 void
469 IPCresponse::sendLVal (long long l)
470 {
471   ipc_trace ("sendLVal: %016llx %lld\n", l, l);
472   sb->appendf ("%016llx", l);
473 }
474 
475 void
476 IPCresponse::sendSVal (const char *s)
477 {
478   if (s == NULL)
479     {
480       sendIVal (-1);
481       return;
482     }
483   sendIVal ((int) strlen (s));
484   ipc_trace ("sendSVal: %s\n", s);
485   sb->appendf ("%s", s);
486 }
487 
488 void
489 IPCresponse::sendBVal (bool b)
490 {
491   sendByte (b ? 1 : 0);
492 }
493 
494 void
495 IPCresponse::sendCVal (char c)
496 {
497   sendByte (c);
498 }
499 
500 void
501 IPCresponse::sendDVal (double d)
502 {
503   char str[32];
504   snprintf (str, sizeof (str), "%.12f", d);
505   sendSVal (str);
506 }
507 
508 void
509 IPCresponse::sendAVal (void *ptr)
510 {
511   if (ptr == NULL)
512     {
513       sendByte (L_INTEGER);
514       sendIVal (-1);
515       return;
516     }
517 
518   VecType type = ((Vector<void*>*)ptr)->type ();
519   switch (type)
520     {
521     case VEC_INTEGER:
522       {
523 	sendByte (L_INTEGER);
524 	Vector<int> *array = (Vector<int>*)ptr;
525 	sendIVal (array->size ());
526 	for (int i = 0; i < array->size (); i++)
527 	  sendIVal (array->fetch (i));
528 	break;
529       }
530     case VEC_BOOL:
531       {
532 	sendByte (L_BOOLEAN);
533 	Vector<bool> *array = (Vector<bool>*)ptr;
534 	sendIVal (array->size ());
535 	for (int i = 0; i < array->size (); i++)
536 	  sendBVal (array->fetch (i));
537 	break;
538       }
539     case VEC_CHAR:
540       {
541 	sendByte (L_CHAR);
542 	Vector<char> *array = (Vector<char>*)ptr;
543 	sendIVal (array->size ());
544 	for (int i = 0; i < array->size (); i++)
545 	  sendCVal (array->fetch (i));
546 	break;
547       }
548     case VEC_LLONG:
549       {
550 	sendByte (L_LONG);
551 	Vector<long long> *array = (Vector<long long>*)ptr;
552 	sendIVal (array->size ());
553 	for (int i = 0; i < array->size (); i++)
554 	  sendLVal (array->fetch (i));
555 	break;
556       }
557     case VEC_DOUBLE:
558       {
559 	sendByte (L_DOUBLE);
560 	Vector<double> *array = (Vector<double>*)ptr;
561 	sendIVal (array->size ());
562 	for (int i = 0; i < array->size (); i++)
563 	  sendDVal (array->fetch (i));
564 	break;
565       }
566     case VEC_STRING:
567       {
568 	sendByte (L_STRING);
569 	Vector<String> *array = (Vector<String>*)ptr;
570 	sendIVal (array->size ());
571 	for (int i = 0; i < array->size (); i++)
572 	  sendSVal (array->fetch (i));
573 	break;
574       }
575     case VEC_STRINGARR:
576       {
577 	sendByte (L_ARRAY);
578 	sendByte (L_STRING);
579 	Vector<void*> *array = (Vector<void*>*)ptr;
580 	sendIVal (array->size ());
581 	for (int i = 0; i < array->size (); i++)
582 	  sendAVal (array->fetch (i));
583 	break;
584       }
585     case VEC_INTARR:
586       {
587 	sendByte (L_ARRAY);
588 	sendByte (L_INTEGER);
589 	Vector<void*> *array = (Vector<void*>*)ptr;
590 	sendIVal (array->size ());
591 	for (int i = 0; i < array->size (); i++)
592 	  sendAVal (array->fetch (i));
593 	break;
594       }
595     case VEC_LLONGARR:
596       {
597 	sendByte (L_ARRAY);
598 	sendByte (L_LONG);
599 	Vector<void*> *array = (Vector<void*>*)ptr;
600 	sendIVal (array->size ());
601 	for (int i = 0; i < array->size (); i++)
602 	  sendAVal (array->fetch (i));
603 	break;
604       }
605     case VEC_VOIDARR:
606       {
607 	sendByte (L_OBJECT);
608 	Vector<void*> *array = (Vector<void*>*)ptr;
609 	sendIVal (array->size ());
610 	for (int i = 0; i < array->size (); i++)
611 	  sendAVal (array->fetch (i));
612 	break;
613       }
614     default:
615       fprintf (stderr, "sendAVal: Unknown type: %d\n", type);
616       abort ();
617     }
618 }
619 
620 bool
621 cancelNeeded (int chID)
622 {
623   if (chID == cancellableChannelID && chID == cancelRequestedChannelID)
624     return true;
625   else
626     return false;
627 }
628 
629 static void
630 writeResponseWithHeader (int requestID, int channelID, int responseType,
631 			 int responseStatus, IPCresponse* os)
632 {
633   if (cancelNeeded (channelID))
634     {
635       responseStatus = RESPONSE_STATUS_CANCELLED;
636       ipc_trace ("CANCELLING %d %d\n", requestID, channelID);
637       // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
638     }
639   os->setRequestID (requestID);
640   os->setChannelID (channelID);
641   os->setResponseType (responseType);
642   os->setResponseStatus (responseStatus);
643   os->print ();
644   os->reset ();
645   responseBufferPool->recycle (os);
646 }
647 
648 void
649 writeAck (int requestID, int channelID)
650 {
651 #if DEBUG
652   char *s = getenv (NTXT ("SP_NO_IPC_ACK"));
653 #else /* ^DEBUG */
654   char *s = NULL;
655 #endif /* ^DEBUG */
656   if (s)
657     {
658       int i = requestID;
659       int j = channelID;
660       ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j);
661     }
662   else
663     {
664       IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
665       writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK,
666 			       RESPONSE_STATUS_SUCCESS, OUTS);
667     }
668 }
669 
670 void
671 writeHandshake (int requestID, int channelID)
672 {
673   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
674   writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
675 }
676 
677 void
678 writeResponseGeneric (int responseStatus, int requestID, int channelID)
679 {
680   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
681   writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS);
682 }
683 
684 BufferPool::BufferPool ()
685 {
686   pthread_mutex_init (&p_mutex, NULL);
687   smallBuf = NULL;
688   largeBuf = NULL;
689 }
690 
691 BufferPool::~BufferPool ()
692 {
693   for (IPCresponse *p = smallBuf; p;)
694     {
695       IPCresponse *tmp = p;
696       p = tmp->next;
697       delete tmp;
698     }
699   for (IPCresponse *p = largeBuf; p;)
700     {
701       IPCresponse *tmp = p;
702       p = tmp->next;
703       delete tmp;
704     }
705 }
706 
707 IPCresponse*
708 BufferPool::getNewResponse (int size)
709 {
710   pthread_mutex_lock (&p_mutex);
711   if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE)
712     size = BUFFER_SIZE_LARGE;
713   IPCresponse *newResponse = NULL;
714   if (size >= BUFFER_SIZE_LARGE)
715     {
716       if (largeBuf)
717 	{
718 	  newResponse = largeBuf;
719 	  largeBuf = largeBuf->next;
720 	}
721     }
722   else if (smallBuf)
723     {
724       newResponse = smallBuf;
725       smallBuf = smallBuf->next;
726     }
727   if (newResponse)
728     newResponse->reset ();
729   else
730     {
731       newResponse = new IPCresponse (size);
732       ipc_trace ("GETNEWBUFFER %d\n", size);
733     }
734   pthread_mutex_unlock (&p_mutex);
735   return newResponse;
736 }
737 
738 void
739 BufferPool::recycle (IPCresponse *respB)
740 {
741   pthread_mutex_lock (&p_mutex);
742   if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE)
743     {
744       respB->next = largeBuf;
745       largeBuf = respB;
746     }
747   else
748     {
749       respB->next = smallBuf;
750       smallBuf = respB;
751     }
752   pthread_mutex_unlock (&p_mutex);
753 }
754 
755 void
756 writeArray (void *ptr, IPCrequest* req)
757 {
758   if (req->getStatus () == CANCELLED_IMMEDIATE)
759     return;
760   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
761   OUTS->sendByte (L_ARRAY);
762   OUTS->sendAVal (ptr);
763   writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
764 			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
765 }
766 
767 void
768 writeString (const char *s, IPCrequest* req)
769 {
770   if (req->getStatus () == CANCELLED_IMMEDIATE)
771     return;
772   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
773   OUTS->sendByte (L_STRING);
774   OUTS->sendSVal (s);
775   writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
776 			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
777 }
778 
779 void
780 writeObject (DbeObj obj, IPCrequest* req)
781 {
782   writeLong ((long long) obj, req);
783 }
784 
785 void
786 writeBoolean (bool b, IPCrequest* req)
787 {
788   if (req->getStatus () == CANCELLED_IMMEDIATE)
789     return;
790   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
791   OUTS->sendByte (L_BOOLEAN);
792   OUTS->sendBVal (b);
793   writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
794 			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
795 }
796 
797 void
798 writeInt (int i, IPCrequest* req)
799 {
800   if (req->getStatus () == CANCELLED_IMMEDIATE)
801     return;
802   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
803   OUTS->sendByte (L_INTEGER);
804   OUTS->sendIVal (i);
805   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
806 }
807 
808 void
809 writeChar (char c, IPCrequest* req)
810 {
811   if (req->getStatus () == CANCELLED_IMMEDIATE)
812     return;
813   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
814   OUTS->sendByte (L_CHAR);
815   OUTS->sendCVal (c);
816   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
817 }
818 
819 void
820 writeLong (long long l, IPCrequest* req)
821 {
822   if (req->getStatus () == CANCELLED_IMMEDIATE)
823     return;
824   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
825   OUTS->sendByte (L_LONG);
826   OUTS->sendLVal (l);
827   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
828 }
829 
830 void
831 writeDouble (double d, IPCrequest* req)
832 {
833   if (req->getStatus () == CANCELLED_IMMEDIATE) return;
834   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
835   OUTS->sendByte (L_DOUBLE);
836   OUTS->sendDVal (d);
837   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
838 }
839 
840 int
841 setProgress (int percentage, const char *proc_str)
842 {
843   if (cancelNeeded (currentChannelID))
844     {
845       // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
846       // throw (e1);
847       return 1;
848     }
849   if (NULL == proc_str)
850     return 1;
851   int size = strlen (proc_str) + 100; // 100 bytes for additional data
852   int bs = BUFFER_SIZE_MEDIUM;
853   if (size > BUFFER_SIZE_MEDIUM)
854     {
855       if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen
856       bs = BUFFER_SIZE_LARGE;
857     }
858   IPCresponse *OUTS = responseBufferPool->getNewResponse (bs);
859   OUTS->sendByte (L_PROGRESS);
860   OUTS->sendIVal (percentage);
861   OUTS->sendSVal (proc_str);
862   writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS);
863   return 0;
864 }
865 
866 static pthread_mutex_t responce_lock = PTHREAD_MUTEX_INITIALIZER;
867 
868 void
869 IPCresponse::print (void)
870 {
871   char buf[23];
872   int sz = responseType == RESPONSE_TYPE_HANDSHAKE ?
873       IPC_VERSION_NUMBER : sb->length ();
874   snprintf (buf, sizeof (buf), "%02x%08x%02x%02x%08x", HEADER_MARKER,
875 	    requestID, responseType, responseStatus, sz);
876   pthread_mutex_lock (&responce_lock);
877   ipc_response_trace (TRACE_LVL_1,
878 		      "IPCresponse: ID=%08x type=%02x status=%02x sz:%6d\n",
879 		      requestID, responseType, responseStatus, sz);
880   write (1, buf, 22);
881   sb->write (1);
882   pthread_mutex_unlock (&responce_lock);
883 }
884 
885 void
886 setCancelRequestedCh (int chID)
887 {
888   cancelRequestedChannelID = chID;
889 }
890 
891 void
892 readRequestHeader ()
893 {
894   int marker = readByte (NULL);
895   if (marker != HEADER_MARKER)
896     {
897       fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker);
898       error_flag = 1;
899       return;
900     }
901   else
902     ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n");
903   int requestID = readIVal (NULL);
904   int requestType = readByte (NULL);
905   int channelID = readIVal (NULL);
906   int nBytes = readIVal (NULL);
907   if (requestType == REQUEST_TYPE_HANDSHAKE)
908     {
909       // write the ack directly to the wire, not through the response queue
910       writeAck (requestID, channelID);
911       writeHandshake (requestID, channelID);
912       ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
913     }
914   else if (requestType == REQUEST_TYPE_CANCEL)
915     {
916       writeAck (requestID, channelID);
917       ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH:  %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
918       if (channelID == cancellableChannelID)
919 	{
920 	  // we have worked on at least one request belonging to this channel
921 	  writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID);
922 	  setCancelRequestedCh (channelID);
923 	  ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID);
924 	  if (channelID == currentChannelID)
925 	    //  request for this channel is currently in progress
926 	    ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION");
927 	    //              ssp_post_cond(waitingToFinish);
928 	}
929       else
930 	{
931 	  // FIXME:
932 	  // it is possible that a request for this channel is on the requestQ
933 	  // or has been submitted to the work group queue but is waiting for a thread to pick it up
934 	  writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID);
935 	  setCancelRequestedCh (channelID);
936 	  ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID);
937 	}
938     }
939   else
940     {
941       writeAck (requestID, channelID);
942       ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
943       IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID);
944       nreq->read ();
945       ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID);
946       if (cancelNeeded (channelID))
947 	{
948 	  ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID);
949 	  writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID);
950 	  delete nreq;
951 	  return;
952 	}
953       DbeQueue *q = new DbeQueue (ipc_doWork, nreq);
954       ipcThreadPool->put_queue (q);
955     }
956 }
957