xref: /llvm-project/llvm/utils/lit/lit/run.py (revision eec1ee8ef10820c61c03b00b68d242d8c87d478a)
1import multiprocessing
2import os
3import platform
4import time
5
6import lit.Test
7import lit.util
8import lit.worker
9
10
11class MaxFailuresError(Exception):
12    pass
13
14
15class TimeoutError(Exception):
16    pass
17
18
19class Run(object):
20    """A concrete, configured testing run."""
21
22    def __init__(
23        self, tests, lit_config, workers, progress_callback, max_failures, timeout
24    ):
25        self.tests = tests
26        self.lit_config = lit_config
27        self.workers = workers
28        self.progress_callback = progress_callback
29        self.max_failures = max_failures
30        self.timeout = timeout
31        assert workers > 0
32
33    def execute(self):
34        """
35        Execute the tests in the run using up to the specified number of
36        parallel tasks, and inform the caller of each individual result. The
37        provided tests should be a subset of the tests available in this run
38        object.
39
40        The progress_callback will be invoked for each completed test.
41
42        If timeout is non-None, it should be a time in seconds after which to
43        stop executing tests.
44
45        Returns the elapsed testing time.
46
47        Upon completion, each test in the run will have its result
48        computed. Tests which were not actually executed (for any reason) will
49        be marked SKIPPED.
50        """
51        self.failures = 0
52
53        # Larger timeouts (one year, positive infinity) don't work on Windows.
54        one_week = 7 * 24 * 60 * 60  # days * hours * minutes * seconds
55        timeout = self.timeout or one_week
56        deadline = time.time() + timeout
57
58        try:
59            self._execute(deadline)
60        finally:
61            skipped = lit.Test.Result(lit.Test.SKIPPED)
62            for test in self.tests:
63                if test.result is None:
64                    test.setResult(skipped)
65
66    def _execute(self, deadline):
67        self._increase_process_limit()
68
69        semaphores = {
70            k: multiprocessing.BoundedSemaphore(v)
71            for k, v in self.lit_config.parallelism_groups.items()
72            if v is not None
73        }
74
75        pool = multiprocessing.Pool(
76            self.workers, lit.worker.initialize, (self.lit_config, semaphores)
77        )
78
79        async_results = [
80            pool.apply_async(
81                lit.worker.execute, args=[test], callback=self.progress_callback
82            )
83            for test in self.tests
84        ]
85        pool.close()
86
87        try:
88            self._wait_for(async_results, deadline)
89        except:
90            pool.terminate()
91            raise
92        finally:
93            pool.join()
94
95    def _wait_for(self, async_results, deadline):
96        timeout = deadline - time.time()
97        for idx, ar in enumerate(async_results):
98            try:
99                test = ar.get(timeout)
100            except multiprocessing.TimeoutError:
101                raise TimeoutError()
102            else:
103                self._update_test(self.tests[idx], test)
104                if test.isFailure():
105                    self.failures += 1
106                    if self.failures == self.max_failures:
107                        raise MaxFailuresError()
108
109    # Update local test object "in place" from remote test object.  This
110    # ensures that the original test object which is used for printing test
111    # results reflects the changes.
112    def _update_test(self, local_test, remote_test):
113        # Needed for getMissingRequiredFeatures()
114        local_test.requires = remote_test.requires
115        local_test.result = remote_test.result
116
117    # TODO(yln): interferes with progress bar
118    # Some tests use threads internally, and at least on Linux each of these
119    # threads counts toward the current process limit. Try to raise the (soft)
120    # process limit so that tests don't fail due to resource exhaustion.
121    def _increase_process_limit(self):
122        ncpus = lit.util.usable_core_count()
123        desired_limit = self.workers * ncpus * 2  # the 2 is a safety factor
124
125        # Importing the resource module will likely fail on Windows.
126        try:
127            import resource
128
129            NPROC = resource.RLIMIT_NPROC
130
131            soft_limit, hard_limit = resource.getrlimit(NPROC)
132            desired_limit = min(desired_limit, hard_limit)
133
134            if soft_limit < desired_limit:
135                resource.setrlimit(NPROC, (desired_limit, hard_limit))
136                self.lit_config.note(
137                    "Raised process limit from %d to %d" % (soft_limit, desired_limit)
138                )
139        except Exception as ex:
140            # Warn, unless this is Windows or z/OS, in which case this is expected.
141            if os.name != "nt" and platform.system() != "OS/390":
142                self.lit_config.warning("Failed to raise process limit: %s" % ex)
143