from lldbsuite.test.lldbtest import * import os import time import json ADDRESS_REGEX = "0x[0-9a-fA-F]*" # Decorator that runs a test with both modes of USE_SB_API. # It assumes that no tests can be executed in parallel. def testSBAPIAndCommands(func): def wrapper(*args, **kwargs): TraceIntelPTTestCaseBase.USE_SB_API = True func(*args, **kwargs) TraceIntelPTTestCaseBase.USE_SB_API = False func(*args, **kwargs) return wrapper # Class that should be used by all python Intel PT tests. # # It has a handy check that skips the test if the intel-pt plugin is not enabled. # # It also contains many functions that can test both the SB API or the command line version # of the most important tracing actions. class TraceIntelPTTestCaseBase(TestBase): NO_DEBUG_INFO_TESTCASE = True # If True, the trace test methods will use the SB API, otherwise they'll use raw commands. USE_SB_API = False def setUp(self): TestBase.setUp(self) if "intel-pt" not in configuration.enabled_plugins: self.skipTest("The intel-pt test plugin is not enabled") def skipIfPerCpuTracingIsNotSupported(self): def is_supported(): try: with open("/proc/sys/kernel/perf_event_paranoid", "r") as permissions: value = int(permissions.readlines()[0]) if value <= 0: return True except: return False if not is_supported(): self.skipTest( "Per cpu tracing is not supported. You need " "/proc/sys/kernel/perf_event_paranoid to be 0 or -1. " "You can use `sudo sysctl -w kernel.perf_event_paranoid=-1` for that." ) def getTraceOrCreate(self): if not self.target().GetTrace().IsValid(): error = lldb.SBError() self.target().CreateTrace(error) return self.target().GetTrace() def assertSBError(self, sberror, error=False): if error: self.assertTrue(sberror.Fail()) else: self.assertSuccess(sberror) def createConfiguration( self, iptTraceSize=None, processBufferSizeLimit=None, enableTsc=False, psbPeriod=None, perCpuTracing=False, ): obj = {} if processBufferSizeLimit is not None: obj["processBufferSizeLimit"] = processBufferSizeLimit if iptTraceSize is not None: obj["iptTraceSize"] = iptTraceSize if psbPeriod is not None: obj["psbPeriod"] = psbPeriod obj["enableTsc"] = enableTsc obj["perCpuTracing"] = perCpuTracing configuration = lldb.SBStructuredData() configuration.SetFromJSON(json.dumps(obj)) return configuration def traceStartThread( self, thread=None, error=False, substrs=None, iptTraceSize=None, enableTsc=False, psbPeriod=None, ): if self.USE_SB_API: trace = self.getTraceOrCreate() thread = thread if thread is not None else self.thread() configuration = self.createConfiguration( iptTraceSize=iptTraceSize, enableTsc=enableTsc, psbPeriod=psbPeriod ) self.assertSBError(trace.Start(thread, configuration), error) else: command = "thread trace start" if thread is not None: command += " " + str(thread.GetIndexID()) if iptTraceSize is not None: command += " -s " + str(iptTraceSize) if enableTsc: command += " --tsc" if psbPeriod is not None: command += " --psb-period " + str(psbPeriod) self.expect(command, error=error, substrs=substrs) def traceStartProcess( self, processBufferSizeLimit=None, error=False, substrs=None, enableTsc=False, psbPeriod=None, perCpuTracing=False, ): if self.USE_SB_API: trace = self.getTraceOrCreate() configuration = self.createConfiguration( processBufferSizeLimit=processBufferSizeLimit, enableTsc=enableTsc, psbPeriod=psbPeriod, perCpuTracing=perCpuTracing, ) self.assertSBError(trace.Start(configuration), error=error) else: command = "process trace start" if processBufferSizeLimit is not None: command += " -l " + str(processBufferSizeLimit) if enableTsc: command += " --tsc" if psbPeriod is not None: command += " --psb-period " + str(psbPeriod) if perCpuTracing: command += " --per-cpu-tracing" self.expect(command, error=error, substrs=substrs) def traceStopProcess(self): if self.USE_SB_API: self.assertSuccess(self.target().GetTrace().Stop()) else: self.expect("process trace stop") def traceStopThread(self, thread=None, error=False, substrs=None): if self.USE_SB_API: thread = thread if thread is not None else self.thread() self.assertSBError(self.target().GetTrace().Stop(thread), error) else: command = "thread trace stop" if thread is not None: command += " " + str(thread.GetIndexID()) self.expect(command, error=error, substrs=substrs) def traceLoad(self, traceDescriptionFilePath, error=False, substrs=None): if self.USE_SB_API: traceDescriptionFile = lldb.SBFileSpec(traceDescriptionFilePath, True) loadTraceError = lldb.SBError() self.dbg.LoadTraceFromFile(loadTraceError, traceDescriptionFile) self.assertSBError(loadTraceError, error) else: command = f"trace load -v {traceDescriptionFilePath}" self.expect(command, error=error, substrs=substrs) def traceSave(self, traceBundleDir, compact=False, error=False, substrs=None): if self.USE_SB_API: save_error = lldb.SBError() self.target().GetTrace().SaveToDisk( save_error, lldb.SBFileSpec(traceBundleDir), compact ) self.assertSBError(save_error, error) else: command = f"trace save {traceBundleDir}" if compact: command += " -c" self.expect(command, error=error, substrs=substrs)