xref: /netbsd-src/external/gpl3/binutils.old/dist/gprofng/src/ipcio.cc (revision c42dbd0ed2e61fe6eda8590caa852ccf34719964)
1 /* Copyright (C) 2021 Free Software Foundation, Inc.
2    Contributed by Oracle.
3 
4    This file is part of GNU Binutils.
5 
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3, or (at your option)
9    any later version.
10 
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, 51 Franklin Street - Fifth Floor, Boston,
19    MA 02110-1301, USA.  */
20 
21 #include "config.h"
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <signal.h>
25 #include <unistd.h>
26 #include <iostream>
27 #include <iomanip>
28 #include <sstream>
29 #include <queue>
30 #include "vec.h"
31 #include "util.h"
32 #include "ipcio.h"
33 #include "DbeThread.h"
34 #include "Experiment.h"
35 
36 #define ipc_trace               if (ipc_flags) ipc_default_log
37 #define ipc_request_trace       if (ipc_flags) ipc_request_log
38 #define ipc_response_trace      if (ipc_flags) ipc_response_log
39 
40 using namespace std;
41 
42 // IPC implementation
43 static const int L_PROGRESS = 0;
44 static const int L_INTEGER  = 1;
45 static const int L_BOOLEAN  = 2;
46 static const int L_LONG     = 3;
47 static const int L_STRING   = 4;
48 static const int L_DOUBLE   = 5;
49 static const int L_ARRAY    = 6;
50 static const int L_OBJECT   = 7;
51 static const int L_CHAR     = 8;
52 
53 int currentRequestID;
54 int currentChannelID;
55 static long maxSize;
56 
57 extern int cancellableChannelID;
58 extern int error_flag;
59 extern int ipc_delay_microsec;
60 extern FILE *responseLogFileP;
61 
62 IPCresponse *IPCresponseGlobal;
63 
64 BufferPool *responseBufferPool;
65 
IPCrequest(int sz,int reqID,int chID)66 IPCrequest::IPCrequest (int sz, int reqID, int chID)
67 {
68   size = sz;
69   requestID = reqID;
70   channelID = chID;
71   status = INITIALIZED;
72   idx = 0;
73   buf = (char *) malloc (size);
74   cancelImmediate = false;
75 }
76 
~IPCrequest()77 IPCrequest::~IPCrequest ()
78 {
79   free (buf);
80 }
81 
82 void
read(void)83 IPCrequest::read (void)
84 {
85   for (int i = 0; i < size; i++)
86     {
87       int c = getc (stdin);
88       ipc_request_trace (TRACE_LVL_4, "  IPCrequest:getc(stdin): %02x\n", c);
89       buf[i] = c;
90     }
91 }
92 
93 IPCrequestStatus
getStatus(void)94 IPCrequest::getStatus (void)
95 {
96   return status;
97 }
98 
99 void
setStatus(IPCrequestStatus newStatus)100 IPCrequest::setStatus (IPCrequestStatus newStatus)
101 {
102   status = newStatus;
103 }
104 
105 static int
readByte(IPCrequest * req)106 readByte (IPCrequest* req)
107 {
108   int c;
109   int val = 0;
110   for (int i = 0; i < 2; i++)
111     {
112       if (req == NULL)
113 	{
114 	  c = getc (stdin);
115 	  ipc_request_trace (TRACE_LVL_4, "  readByte:getc(stdin): %02x\n", c);
116 	}
117       else
118 	c = req->rgetc ();
119       switch (c)
120 	{
121 	case '0': case '1': case '2': case '3':
122 	case '4': case '5': case '6': case '7':
123 	case '8': case '9':
124 	  val = val * 16 + c - '0';
125 	  break;
126 	case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
127 	  val = val * 16 + c - 'a' + 10;
128 	  break;
129 	case EOF:
130 	  val = EOF;
131 	  break;
132 	default:
133 	  fprintf (stderr, "readByte: Unknown byte: %d\n", c);
134 	  break;
135 	}
136     }
137   return val;
138 }
139 
140 static int
readIVal(IPCrequest * req)141 readIVal (IPCrequest *req)
142 {
143   int val = readByte (req);
144   for (int i = 0; i < 3; i++)
145     val = val * 256 + readByte (req);
146   ipc_trace ("  readIVal: %d\n", val);
147   return val;
148 }
149 
150 static String
readSVal(IPCrequest * req)151 readSVal (IPCrequest *req)
152 {
153   int len = readIVal (req);
154   if (len == -1)
155     {
156       ipc_trace ("  readSVal: <NULL>\n");
157       return NULL;
158     }
159   char *str = (char *) malloc (len + 1);
160   char *s = str;
161   *s = (char) 0;
162   while (len--)
163     *s++ = req->rgetc ();
164   *s = (char) 0;
165   ipc_trace ("  readSVal: '%s'\n", str);
166   return str;
167 }
168 
169 static long long
readLVal(IPCrequest * req)170 readLVal (IPCrequest *req)
171 {
172   long long val = readByte (req);
173   for (int i = 0; i < 7; i++)
174     val = val * 256 + readByte (req);
175   ipc_trace ("  readLVal: %lld\n", val);
176   return val;
177 }
178 
179 static bool
readBVal(IPCrequest * req)180 readBVal (IPCrequest *req)
181 {
182   int val = readByte (req);
183   ipc_trace ("  readBVal: %s\n", val == 0 ? "true" : "false");
184   return val != 0;
185 }
186 
187 static char
readCVal(IPCrequest * req)188 readCVal (IPCrequest *req)
189 {
190   int val = readByte (req);
191   ipc_trace ("  readCVal: %d\n", val);
192   return (char) val;
193 }
194 
195 static double
readDVal(IPCrequest * req)196 readDVal (IPCrequest *req)
197 {
198   String s = readSVal (req);
199   double d = atof (s);
200   free (s);
201   return d;
202 }
203 
204 static Object
readAVal(IPCrequest * req)205 readAVal (IPCrequest *req)
206 {
207   bool twoD = false;
208   int type = readByte (req);
209   if (type == L_ARRAY)
210     {
211       twoD = true;
212       type = readByte (req);
213     }
214   ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type);
215 
216   int len = readIVal (req);
217   if (len == -1)
218     return NULL;
219   switch (type)
220     {
221     case L_INTEGER:
222       if (twoD)
223 	{
224 	  Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len);
225 	  for (int i = 0; i < len; i++)
226 	    array->store (i, (Vector<int>*)readAVal (req));
227 	  return array;
228 	}
229       else
230 	{
231 	  Vector<int> *array = new Vector<int>(len);
232 	  for (int i = 0; i < len; i++)
233 	    array->store (i, readIVal (req));
234 	  return array;
235 	}
236       //break;
237     case L_LONG:
238       if (twoD)
239 	{
240 	  Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len);
241 	  for (int i = 0; i < len; i++)
242 	    array->store (i, (Vector<long long>*)readAVal (req));
243 	  return array;
244 	}
245       else
246 	{
247 	  Vector<long long> *array = new Vector<long long>(len);
248 	  for (int i = 0; i < len; i++)
249 	    array->store (i, readLVal (req));
250 	  return array;
251 	}
252       //break;
253     case L_DOUBLE:
254       if (twoD)
255 	{
256 	  Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len);
257 	  for (int i = 0; i < len; i++)
258 	    array->store (i, (Vector<double>*)readAVal (req));
259 	  return array;
260 	}
261       else
262 	{
263 	  Vector<double> *array = new Vector<double>(len);
264 	  for (int i = 0; i < len; i++)
265 	    array->store (i, readDVal (req));
266 	  return array;
267 	}
268       //break;
269     case L_BOOLEAN:
270       if (twoD)
271 	{
272 	  Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len);
273 	  for (int i = 0; i < len; i++)
274 	    array->store (i, (Vector<bool>*)readAVal (req));
275 	  return array;
276 	}
277       else
278 	{
279 	  Vector<bool> *array = new Vector<bool>(len);
280 	  for (int i = 0; i < len; i++)
281 	    array->store (i, readBVal (req));
282 	  return array;
283 	}
284       //break;
285     case L_CHAR:
286       if (twoD)
287 	{
288 	  Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len);
289 	  for (int i = 0; i < len; i++)
290 	    array->store (i, (Vector<char>*)readAVal (req));
291 	  return array;
292 	}
293       else
294 	{
295 	  Vector<char> *array = new Vector<char>(len);
296 	  for (int i = 0; i < len; i++)
297 	    array->store (i, readCVal (req));
298 	  return array;
299 	}
300       //break;
301     case L_STRING:
302       if (twoD)
303 	{
304 	  Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len);
305 	  for (int i = 0; i < len; i++)
306 	    array->store (i, (Vector<String>*)readAVal (req));
307 	  return array;
308 	}
309       else
310 	{
311 	  Vector<String> *array = new Vector<String>(len);
312 	  for (int i = 0; i < len; i++)
313 	    array->store (i, readSVal (req));
314 	  return array;
315 	}
316       //break;
317     case L_OBJECT:
318       if (twoD)
319 	{
320 	  Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len);
321 	  for (int i = 0; i < len; i++)
322 	    array->store (i, (Vector<Object>*)readAVal (req));
323 	  return array;
324 	}
325       else
326 	{
327 	  Vector<Object> *array = new Vector<Object>(len);
328 	  for (int i = 0; i < len; i++)
329 	    array->store (i, readAVal (req));
330 	  return array;
331 	}
332       //break;
333     default:
334       fprintf (stderr, "readAVal: Unknown code: %d\n", type);
335       break;
336     }
337   return NULL;
338 }
339 
340 static int iVal;
341 static bool bVal;
342 static long long lVal;
343 static String sVal;
344 static double dVal;
345 static Object aVal;
346 
347 static void
readResult(int type,IPCrequest * req)348 readResult (int type, IPCrequest *req)
349 {
350   int tVal = readByte (req);
351   switch (tVal)
352     {
353     case L_INTEGER:
354       iVal = readIVal (req);
355       break;
356     case L_LONG:
357       lVal = readLVal (req);
358       break;
359     case L_BOOLEAN:
360       bVal = readBVal (req);
361       break;
362     case L_DOUBLE:
363       dVal = readDVal (req);
364       break;
365     case L_STRING:
366       sVal = readSVal (req);
367       break;
368     case L_ARRAY:
369       aVal = readAVal (req);
370       break;
371     case EOF:
372       fprintf (stderr, "EOF read in readResult\n");
373       sVal = NULL;
374       return;
375     default:
376       fprintf (stderr, "Unknown code: %d\n", tVal);
377       abort ();
378     }
379   if (type != tVal)
380     {
381       fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type);
382       abort ();
383     }
384 }
385 
386 int
readInt(IPCrequest * req)387 readInt (IPCrequest *req)
388 {
389   readResult (L_INTEGER, req);
390   return iVal;
391 }
392 
393 String
readString(IPCrequest * req)394 readString (IPCrequest *req)
395 {
396   readResult (L_STRING, req);
397   return sVal;
398 }
399 
400 long long
readLong(IPCrequest * req)401 readLong (IPCrequest *req)
402 {
403   readResult (L_LONG, req);
404   return lVal;
405 }
406 
407 double
readDouble(IPCrequest * req)408 readDouble (IPCrequest *req)
409 {
410   readResult (L_DOUBLE, req);
411   return dVal;
412 }
413 
414 bool
readBoolean(IPCrequest * req)415 readBoolean (IPCrequest *req)
416 {
417   readResult (L_BOOLEAN, req);
418   return bVal;
419 }
420 
421 DbeObj
readObject(IPCrequest * req)422 readObject (IPCrequest *req)
423 {
424   readResult (L_LONG, req);
425   return (DbeObj) lVal;
426 }
427 
428 Object
readArray(IPCrequest * req)429 readArray (IPCrequest *req)
430 {
431   readResult (L_ARRAY, req);
432   return aVal;
433 }
434 
435 // Write
IPCresponse(int sz)436 IPCresponse::IPCresponse (int sz)
437 {
438   requestID = -1;
439   channelID = -1;
440   responseType = -1;
441   responseStatus = RESPONSE_STATUS_SUCCESS;
442   sb = new StringBuilder (sz);
443   next = NULL;
444 }
445 
~IPCresponse()446 IPCresponse::~IPCresponse ()
447 {
448   delete sb;
449 }
450 
451 void
reset()452 IPCresponse::reset ()
453 {
454   requestID = -1;
455   channelID = -1;
456   responseType = -1;
457   responseStatus = RESPONSE_STATUS_SUCCESS;
458   sb->setLength (0);
459 }
460 
461 void
sendByte(int b)462 IPCresponse::sendByte (int b)
463 {
464   ipc_trace ("sendByte: %02x %d\n", b, b);
465   sb->appendf ("%02x", b);
466 }
467 
468 void
sendIVal(int i)469 IPCresponse::sendIVal (int i)
470 {
471   ipc_trace ("sendIVal: %08x %d\n", i, i);
472   sb->appendf ("%08x", i);
473 }
474 
475 void
sendLVal(long long l)476 IPCresponse::sendLVal (long long l)
477 {
478   ipc_trace ("sendLVal: %016llx %lld\n", l, l);
479   sb->appendf ("%016llx", l);
480 }
481 
482 void
sendSVal(const char * s)483 IPCresponse::sendSVal (const char *s)
484 {
485   if (s == NULL)
486     {
487       sendIVal (-1);
488       return;
489     }
490   sendIVal ((int) strlen (s));
491   ipc_trace ("sendSVal: %s\n", s);
492   sb->appendf ("%s", s);
493 }
494 
495 void
sendBVal(bool b)496 IPCresponse::sendBVal (bool b)
497 {
498   sendByte (b ? 1 : 0);
499 }
500 
501 void
sendCVal(char c)502 IPCresponse::sendCVal (char c)
503 {
504   sendByte (c);
505 }
506 
507 void
sendDVal(double d)508 IPCresponse::sendDVal (double d)
509 {
510   char str[32];
511   snprintf (str, sizeof (str), "%.12f", d);
512   sendSVal (str);
513 }
514 
515 void
sendAVal(void * ptr)516 IPCresponse::sendAVal (void *ptr)
517 {
518   if (ptr == NULL)
519     {
520       sendByte (L_INTEGER);
521       sendIVal (-1);
522       return;
523     }
524 
525   VecType type = ((Vector<void*>*)ptr)->type ();
526   switch (type)
527     {
528     case VEC_INTEGER:
529       {
530 	sendByte (L_INTEGER);
531 	Vector<int> *array = (Vector<int>*)ptr;
532 	sendIVal (array->size ());
533 	for (int i = 0; i < array->size (); i++)
534 	  sendIVal (array->fetch (i));
535 	break;
536       }
537     case VEC_BOOL:
538       {
539 	sendByte (L_BOOLEAN);
540 	Vector<bool> *array = (Vector<bool>*)ptr;
541 	sendIVal (array->size ());
542 	for (int i = 0; i < array->size (); i++)
543 	  sendBVal (array->fetch (i));
544 	break;
545       }
546     case VEC_CHAR:
547       {
548 	sendByte (L_CHAR);
549 	Vector<char> *array = (Vector<char>*)ptr;
550 	sendIVal (array->size ());
551 	for (int i = 0; i < array->size (); i++)
552 	  sendCVal (array->fetch (i));
553 	break;
554       }
555     case VEC_LLONG:
556       {
557 	sendByte (L_LONG);
558 	Vector<long long> *array = (Vector<long long>*)ptr;
559 	sendIVal (array->size ());
560 	for (int i = 0; i < array->size (); i++)
561 	  sendLVal (array->fetch (i));
562 	break;
563       }
564     case VEC_DOUBLE:
565       {
566 	sendByte (L_DOUBLE);
567 	Vector<double> *array = (Vector<double>*)ptr;
568 	sendIVal (array->size ());
569 	for (int i = 0; i < array->size (); i++)
570 	  sendDVal (array->fetch (i));
571 	break;
572       }
573     case VEC_STRING:
574       {
575 	sendByte (L_STRING);
576 	Vector<String> *array = (Vector<String>*)ptr;
577 	sendIVal (array->size ());
578 	for (int i = 0; i < array->size (); i++)
579 	  sendSVal (array->fetch (i));
580 	break;
581       }
582     case VEC_STRINGARR:
583       {
584 	sendByte (L_ARRAY);
585 	sendByte (L_STRING);
586 	Vector<void*> *array = (Vector<void*>*)ptr;
587 	sendIVal (array->size ());
588 	for (int i = 0; i < array->size (); i++)
589 	  sendAVal (array->fetch (i));
590 	break;
591       }
592     case VEC_INTARR:
593       {
594 	sendByte (L_ARRAY);
595 	sendByte (L_INTEGER);
596 	Vector<void*> *array = (Vector<void*>*)ptr;
597 	sendIVal (array->size ());
598 	for (int i = 0; i < array->size (); i++)
599 	  sendAVal (array->fetch (i));
600 	break;
601       }
602     case VEC_LLONGARR:
603       {
604 	sendByte (L_ARRAY);
605 	sendByte (L_LONG);
606 	Vector<void*> *array = (Vector<void*>*)ptr;
607 	sendIVal (array->size ());
608 	for (int i = 0; i < array->size (); i++)
609 	  sendAVal (array->fetch (i));
610 	break;
611       }
612     case VEC_VOIDARR:
613       {
614 	sendByte (L_OBJECT);
615 	Vector<void*> *array = (Vector<void*>*)ptr;
616 	sendIVal (array->size ());
617 	for (int i = 0; i < array->size (); i++)
618 	  sendAVal (array->fetch (i));
619 	break;
620       }
621     default:
622       fprintf (stderr, "sendAVal: Unknown type: %d\n", type);
623       abort ();
624     }
625 }
626 
627 static void
writeResponseHeader(int requestID,int responseType,int responseStatus,int nBytes)628 writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes)
629 {
630   if (responseType == RESPONSE_TYPE_HANDSHAKE)
631     nBytes = IPC_VERSION_NUMBER;
632   int use_write = 2;
633   ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes);
634   if (use_write)
635     {
636       char buf[23];
637       if (use_write == 1)
638 	{
639 	  int i = 0;
640 	  snprintf (buf + i, 3, "%2x", HEADER_MARKER);
641 	  i += 2;
642 	  snprintf (buf + i, 9, "%8x", requestID);
643 	  i += 8;
644 	  snprintf (buf + i, 3, "%2x", responseType);
645 	  i += 2;
646 	  snprintf (buf + i, 3, "%2x", responseStatus);
647 	  i += 2;
648 	  snprintf (buf + i, 9, "%8x", nBytes);
649 	}
650       else
651 	snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID,
652 		  responseType, responseStatus, nBytes);
653       buf[22] = 0;
654       write (1, buf, 22);
655     }
656   else
657     {
658       cout << setfill ('0') << setw (2) << hex << HEADER_MARKER;
659       cout << setfill ('0') << setw (8) << hex << requestID;
660       cout << setfill ('0') << setw (2) << hex << responseType;
661       cout << setfill ('0') << setw (2) << hex << responseStatus;
662       cout << setfill ('0') << setw (8) << hex << nBytes;
663       cout.flush ();
664     }
665   ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n");
666   if (nBytes > maxSize)
667     {
668       maxSize = nBytes;
669       ipc_trace ("New maxsize %ld\n", maxSize);
670     }
671 }
672 
673 bool
cancelNeeded(int chID)674 cancelNeeded (int chID)
675 {
676   if (chID == cancellableChannelID && chID == cancelRequestedChannelID)
677     return true;
678   else
679     return false;
680 }
681 
682 static void
writeResponseWithHeader(int requestID,int channelID,int responseType,int responseStatus,IPCresponse * os)683 writeResponseWithHeader (int requestID, int channelID, int responseType,
684 			 int responseStatus, IPCresponse* os)
685 {
686   if (cancelNeeded (channelID))
687     {
688       responseStatus = RESPONSE_STATUS_CANCELLED;
689       ipc_trace ("CANCELLING %d %d\n", requestID, channelID);
690       // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
691     }
692   os->setRequestID (requestID);
693   os->setChannelID (channelID);
694   os->setResponseType (responseType);
695   os->setResponseStatus (responseStatus);
696   os->print ();
697   os->reset ();
698   responseBufferPool->recycle (os);
699 }
700 
701 void
writeAckFast(int requestID)702 writeAckFast (int requestID)
703 {
704   writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0);
705 }
706 
707 void
writeAck(int requestID,int channelID)708 writeAck (int requestID, int channelID)
709 {
710 #if DEBUG
711   char *s = getenv (NTXT ("SP_NO_IPC_ACK"));
712 #else /* ^DEBUG */
713   char *s = NULL;
714 #endif /* ^DEBUG */
715   if (s)
716     {
717       int i = requestID;
718       int j = channelID;
719       ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j);
720     }
721   else
722     {
723       IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
724       writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK,
725 			       RESPONSE_STATUS_SUCCESS, OUTS);
726     }
727 }
728 
729 void
writeHandshake(int requestID,int channelID)730 writeHandshake (int requestID, int channelID)
731 {
732   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
733   writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
734   // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER);
735 }
736 
737 void
writeResponseGeneric(int responseStatus,int requestID,int channelID)738 writeResponseGeneric (int responseStatus, int requestID, int channelID)
739 {
740   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
741   writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS);
742 }
743 
BufferPool()744 BufferPool::BufferPool ()
745 {
746   pthread_mutex_init (&p_mutex, NULL);
747   smallBuf = NULL;
748   largeBuf = NULL;
749 }
750 
~BufferPool()751 BufferPool::~BufferPool ()
752 {
753   for (IPCresponse *p = smallBuf; p;)
754     {
755       IPCresponse *tmp = p;
756       p = tmp->next;
757       delete tmp;
758     }
759   for (IPCresponse *p = largeBuf; p;)
760     {
761       IPCresponse *tmp = p;
762       p = tmp->next;
763       delete tmp;
764     }
765 }
766 
767 IPCresponse*
getNewResponse(int size)768 BufferPool::getNewResponse (int size)
769 {
770   pthread_mutex_lock (&p_mutex);
771   if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE)
772     size = BUFFER_SIZE_LARGE;
773   IPCresponse *newResponse = NULL;
774   if (size >= BUFFER_SIZE_LARGE)
775     {
776       if (largeBuf)
777 	{
778 	  newResponse = largeBuf;
779 	  largeBuf = largeBuf->next;
780 	}
781     }
782   else if (smallBuf)
783     {
784       newResponse = smallBuf;
785       smallBuf = smallBuf->next;
786     }
787   if (newResponse)
788     newResponse->reset ();
789   else
790     {
791       newResponse = new IPCresponse (size);
792       ipc_trace ("GETNEWBUFFER %d\n", size);
793     }
794   pthread_mutex_unlock (&p_mutex);
795   return newResponse;
796 }
797 
798 void
recycle(IPCresponse * respB)799 BufferPool::recycle (IPCresponse *respB)
800 {
801   pthread_mutex_lock (&p_mutex);
802   if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE)
803     {
804       respB->next = largeBuf;
805       largeBuf = respB;
806     }
807   else
808     {
809       respB->next = smallBuf;
810       smallBuf = respB;
811     }
812   pthread_mutex_unlock (&p_mutex);
813 }
814 
815 void
writeArray(void * ptr,IPCrequest * req)816 writeArray (void *ptr, IPCrequest* req)
817 {
818   if (req->getStatus () == CANCELLED_IMMEDIATE)
819     return;
820   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
821   OUTS->sendByte (L_ARRAY);
822   OUTS->sendAVal (ptr);
823   writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
824 			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
825 }
826 
827 void
writeString(const char * s,IPCrequest * req)828 writeString (const char *s, IPCrequest* req)
829 {
830   if (req->getStatus () == CANCELLED_IMMEDIATE)
831     return;
832   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
833   OUTS->sendByte (L_STRING);
834   OUTS->sendSVal (s);
835   writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
836 			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
837 }
838 
839 void
writeObject(DbeObj obj,IPCrequest * req)840 writeObject (DbeObj obj, IPCrequest* req)
841 {
842   writeLong ((long long) obj, req);
843 }
844 
845 void
writeBoolean(bool b,IPCrequest * req)846 writeBoolean (bool b, IPCrequest* req)
847 {
848   if (req->getStatus () == CANCELLED_IMMEDIATE)
849     return;
850   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
851   OUTS->sendByte (L_BOOLEAN);
852   OUTS->sendBVal (b);
853   writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
854 			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
855 }
856 
857 void
writeInt(int i,IPCrequest * req)858 writeInt (int i, IPCrequest* req)
859 {
860   if (req->getStatus () == CANCELLED_IMMEDIATE)
861     return;
862   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
863   OUTS->sendByte (L_INTEGER);
864   OUTS->sendIVal (i);
865   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
866 }
867 
868 void
writeChar(char c,IPCrequest * req)869 writeChar (char c, IPCrequest* req)
870 {
871   if (req->getStatus () == CANCELLED_IMMEDIATE)
872     return;
873   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
874   OUTS->sendByte (L_CHAR);
875   OUTS->sendCVal (c);
876   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
877 }
878 
879 void
writeLong(long long l,IPCrequest * req)880 writeLong (long long l, IPCrequest* req)
881 {
882   if (req->getStatus () == CANCELLED_IMMEDIATE)
883     return;
884   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
885   OUTS->sendByte (L_LONG);
886   OUTS->sendLVal (l);
887   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
888 }
889 
890 void
writeDouble(double d,IPCrequest * req)891 writeDouble (double d, IPCrequest* req)
892 {
893   if (req->getStatus () == CANCELLED_IMMEDIATE) return;
894   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
895   OUTS->sendByte (L_DOUBLE);
896   OUTS->sendDVal (d);
897   writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
898 }
899 
900 int
setProgress(int percentage,const char * proc_str)901 setProgress (int percentage, const char *proc_str)
902 {
903   if (cancelNeeded (currentChannelID))
904     {
905       // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
906       // throw (e1);
907       return 1;
908     }
909   if (NULL == proc_str)
910     return 1;
911   int size = strlen (proc_str) + 100; // 100 bytes for additional data
912   int bs = BUFFER_SIZE_MEDIUM;
913   if (size > BUFFER_SIZE_MEDIUM)
914     {
915       if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen
916       bs = BUFFER_SIZE_LARGE;
917     }
918   IPCresponse *OUTS = responseBufferPool->getNewResponse (bs);
919   OUTS->sendByte (L_PROGRESS);
920   OUTS->sendIVal (percentage);
921   OUTS->sendSVal (proc_str);
922   writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS);
923   return 0;
924 }
925 
926 void
print(void)927 IPCresponse::print (void)
928 {
929   if (ipc_delay_microsec)
930     usleep (ipc_delay_microsec);
931   int stringSize = sb->length ();
932   writeResponseHeader (requestID, responseType, responseStatus, stringSize);
933   if (stringSize > 0)
934     {
935       char *s = sb->toString ();
936       hrtime_t start_time = gethrtime ();
937       int use_write = 1;
938       if (use_write)
939 	write (1, s, stringSize); // write(1, sb->toString(), stringSize);
940       else
941 	{
942 	  cout << s;
943 	  cout.flush ();
944 	}
945       hrtime_t end_time = gethrtime ();
946       unsigned long long time_stamp = end_time - start_time;
947       ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu  nanosec \n", requestID, time_stamp);
948       free (s);
949     }
950 }
951 
952 void
setCancelRequestedCh(int chID)953 setCancelRequestedCh (int chID)
954 {
955   cancelRequestedChannelID = chID;
956 }
957 
958 void
readRequestHeader()959 readRequestHeader ()
960 {
961   int marker = readByte (NULL);
962   if (marker != HEADER_MARKER)
963     {
964       fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker);
965       error_flag = 1;
966       return;
967     }
968   else
969     ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n");
970   int requestID = readIVal (NULL);
971   int requestType = readByte (NULL);
972   int channelID = readIVal (NULL);
973   int nBytes = readIVal (NULL);
974   if (requestType == REQUEST_TYPE_HANDSHAKE)
975     {
976       // write the ack directly to the wire, not through the response queue
977       // writeAckFast(requestID);
978       writeAck (requestID, channelID);
979       maxSize = 0;
980       writeHandshake (requestID, channelID);
981       ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
982     }
983   else if (requestType == REQUEST_TYPE_CANCEL)
984     {
985       writeAck (requestID, channelID);
986       ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH:  %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
987       if (channelID == cancellableChannelID)
988 	{
989 	  // we have worked on at least one request belonging to this channel
990 	  writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID);
991 	  setCancelRequestedCh (channelID);
992 	  ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID);
993 	  if (channelID == currentChannelID)
994 	    //  request for this channel is currently in progress
995 	    ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION");
996 	    //              ssp_post_cond(waitingToFinish);
997 	}
998       else
999 	{
1000 	  // FIXME:
1001 	  // it is possible that a request for this channel is on the requestQ
1002 	  // or has been submitted to the work group queue but is waiting for a thread to pick it up
1003 	  writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID);
1004 	  setCancelRequestedCh (channelID);
1005 	  ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID);
1006 	}
1007     }
1008   else
1009     {
1010       writeAck (requestID, channelID);
1011       ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
1012       IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID);
1013       nreq->read ();
1014       ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID);
1015       if (cancelNeeded (channelID))
1016 	{
1017 	  ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID);
1018 	  writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID);
1019 	  delete nreq;
1020 	  return;
1021 	}
1022       DbeQueue *q = new DbeQueue (ipc_doWork, nreq);
1023       ipcThreadPool->put_queue (q);
1024     }
1025 }
1026