|  | #!/usr/bin/env python3 | 
|  | # SPDX-License-Identifier: Apache-2.0 | 
|  | # Copyright 2012-2021 The Meson development team | 
|  |  | 
|  | from __future__ import annotations | 
|  |  | 
|  | # Work around some pathlib bugs... | 
|  | from mesonbuild import _pathlib | 
|  | import sys | 
|  | sys.modules['pathlib'] = _pathlib | 
|  |  | 
|  | from concurrent.futures import ProcessPoolExecutor, CancelledError | 
|  | from enum import Enum | 
|  | from io import StringIO | 
|  | from pathlib import Path, PurePath | 
|  | import argparse | 
|  | import functools | 
|  | import itertools | 
|  | import json | 
|  | import multiprocessing | 
|  | import os | 
|  | import re | 
|  | import shlex | 
|  | import shutil | 
|  | import signal | 
|  | import subprocess | 
|  | import tempfile | 
|  | import time | 
|  | import typing as T | 
|  | import xml.etree.ElementTree as ET | 
|  | import collections | 
|  | import importlib.util | 
|  |  | 
|  | from mesonbuild import build | 
|  | from mesonbuild import environment | 
|  | from mesonbuild import compilers | 
|  | from mesonbuild import mesonlib | 
|  | from mesonbuild import mlog | 
|  | from mesonbuild import mtest | 
|  | from mesonbuild.compilers import compiler_from_language | 
|  | from mesonbuild.build import ConfigurationData | 
|  | from mesonbuild.mesonlib import MachineChoice, Popen_safe, TemporaryDirectoryWinProof, setup_vsenv | 
|  | from mesonbuild.mlog import blue, bold, cyan, green, red, yellow, normal_green | 
|  | from mesonbuild.coredata import version as meson_version | 
|  | from mesonbuild.options import backendlist | 
|  | from mesonbuild.modules.python import PythonExternalProgram | 
|  | from run_tests import ( | 
|  | get_fake_options, run_configure, get_meson_script, get_backend_commands, | 
|  | get_backend_args_for_dir, Backend, | 
|  | guess_backend, handle_meson_skip_test, | 
|  | ) | 
|  |  | 
|  |  | 
|  | if T.TYPE_CHECKING: | 
|  | from types import FrameType | 
|  | from mesonbuild.environment import Environment | 
|  | from mesonbuild._typing import Protocol | 
|  | from concurrent.futures import Future | 
|  |  | 
|  | class CompilerArgumentType(Protocol): | 
|  | cross_file: str | 
|  | native_file: str | 
|  | use_tmpdir: bool | 
|  |  | 
|  |  | 
|  | class ArgumentType(CompilerArgumentType): | 
|  |  | 
|  | """Typing information for command line arguments.""" | 
|  |  | 
|  | extra_args: T.List[str] | 
|  | backend: str | 
|  | num_workers: int | 
|  | failfast: bool | 
|  | no_unittests: bool | 
|  | only: T.List[str] | 
|  | v: bool | 
|  |  | 
|  | ALL_TESTS = ['cmake', 'common', 'native', 'warning-meson', 'failing-meson', 'failing-build', 'failing-test', | 
|  | 'keyval', 'platform-osx', 'platform-windows', 'platform-linux', | 
|  | 'java', 'C#', 'vala', 'cython', 'rust', 'd', 'objective c', 'objective c++', | 
|  | 'fortran', 'swift', 'cuda', 'python3', 'python', 'fpga', 'frameworks', 'nasm', 'wasm', 'wayland', | 
|  | 'format', | 
|  | ] | 
|  |  | 
|  |  | 
|  | class BuildStep(Enum): | 
|  | configure = 1 | 
|  | build = 2 | 
|  | test = 3 | 
|  | install = 4 | 
|  | clean = 5 | 
|  | validate = 6 | 
|  |  | 
|  | verbose_output = False | 
|  |  | 
|  | class TestResult(BaseException): | 
|  | def __init__(self, cicmds: T.List[str]) -> None: | 
|  | self.msg    = ''  # empty msg indicates test success | 
|  | self.stdo   = '' | 
|  | self.stde   = '' | 
|  | self.mlog   = '' | 
|  | self.cicmds = cicmds | 
|  | self.conftime:  float = 0 | 
|  | self.buildtime: float = 0 | 
|  | self.testtime:  float = 0 | 
|  |  | 
|  | def add_step(self, step: BuildStep, stdo: str, stde: str, mlog: str = '', time: float = 0) -> None: | 
|  | self.step = step | 
|  | self.stdo += stdo | 
|  | self.stde += stde | 
|  | self.mlog += mlog | 
|  | if step == BuildStep.configure: | 
|  | self.conftime = time | 
|  | elif step == BuildStep.build: | 
|  | self.buildtime = time | 
|  | elif step == BuildStep.test: | 
|  | self.testtime = time | 
|  |  | 
|  | def fail(self, msg: str) -> None: | 
|  | self.msg = msg | 
|  |  | 
|  | python = PythonExternalProgram(sys.executable) | 
|  | python.sanity() | 
|  |  | 
|  | class InstalledFile: | 
|  | def __init__(self, raw: T.Dict[str, str]): | 
|  | self.path = raw['file'] | 
|  | self.typ = raw['type'] | 
|  | self.platform = raw.get('platform', None) | 
|  | self.language = raw.get('language', 'c') | 
|  |  | 
|  | version = raw.get('version', '') | 
|  | if version: | 
|  | self.version = version.split('.') | 
|  | else: | 
|  | # split on '' will return [''], we want an empty list though | 
|  | self.version = [] | 
|  |  | 
|  | def get_path(self, compiler: str, env: environment.Environment) -> T.Optional[Path]: | 
|  | p = Path(self.path) | 
|  | canonical_compiler = compiler | 
|  | if ((compiler in ['clang-cl', 'intel-cl']) or | 
|  | (env.machines.host.is_windows() and compiler in {'pgi', 'dmd', 'ldc'})): | 
|  | canonical_compiler = 'msvc' | 
|  |  | 
|  | python_suffix = python.info['suffix'] | 
|  | python_limited_suffix = python.info['limited_api_suffix'] | 
|  | has_pdb = False | 
|  | if self.language in {'c', 'cpp'}: | 
|  | has_pdb = canonical_compiler == 'msvc' | 
|  | elif self.language == 'd': | 
|  | # dmd's optlink does not generate pdb files | 
|  | has_pdb = env.coredata.compilers.host['d'].linker.id in {'link', 'lld-link'} | 
|  |  | 
|  | # Abort if the platform does not match | 
|  | matches = { | 
|  | 'msvc': canonical_compiler == 'msvc', | 
|  | 'gcc': canonical_compiler != 'msvc', | 
|  | 'cygwin': env.machines.host.is_cygwin(), | 
|  | '!cygwin': not env.machines.host.is_cygwin(), | 
|  | }.get(self.platform or '', True) | 
|  | if not matches: | 
|  | return None | 
|  |  | 
|  | # Handle the different types | 
|  | if self.typ in {'py_implib', 'py_limited_implib', 'python_lib', 'python_limited_lib', 'python_file', 'python_bytecode'}: | 
|  | val = p.as_posix() | 
|  | val = val.replace('@PYTHON_PLATLIB@', python.platlib) | 
|  | val = val.replace('@PYTHON_PURELIB@', python.purelib) | 
|  | p = Path(val) | 
|  | if self.typ == 'python_file': | 
|  | return p | 
|  | if self.typ == 'python_lib': | 
|  | return p.with_suffix(python_suffix) | 
|  | if self.typ == 'python_limited_lib': | 
|  | return p.with_suffix(python_limited_suffix) | 
|  | if self.typ == 'py_implib': | 
|  | p = p.with_suffix(python_suffix) | 
|  | if env.machines.host.is_windows() and canonical_compiler == 'msvc': | 
|  | return p.with_suffix('.lib') | 
|  | elif env.machines.host.is_windows() or env.machines.host.is_cygwin(): | 
|  | return p.with_suffix('.dll.a') | 
|  | else: | 
|  | return None | 
|  | if self.typ == 'py_limited_implib': | 
|  | p = p.with_suffix(python_limited_suffix) | 
|  | if env.machines.host.is_windows() and canonical_compiler == 'msvc': | 
|  | return p.with_suffix('.lib') | 
|  | elif env.machines.host.is_windows() or env.machines.host.is_cygwin(): | 
|  | return p.with_suffix('.dll.a') | 
|  | else: | 
|  | return None | 
|  | if self.typ == 'python_bytecode': | 
|  | return p.parent / importlib.util.cache_from_source(p.name) | 
|  | elif self.typ in {'file', 'dir', 'link'}: | 
|  | return p | 
|  | elif self.typ == 'shared_lib': | 
|  | if env.machines.host.is_windows() or env.machines.host.is_cygwin(): | 
|  | # Windows only has foo.dll and foo-X.dll | 
|  | if len(self.version) > 1: | 
|  | return None | 
|  | if self.version: | 
|  | p = p.with_name('{}-{}'.format(p.name, self.version[0])) | 
|  | return p.with_suffix('.dll') | 
|  |  | 
|  | p = p.with_name(f'lib{p.name}') | 
|  | if env.machines.host.is_darwin(): | 
|  | # MacOS only has libfoo.dylib and libfoo.X.dylib | 
|  | if len(self.version) > 1: | 
|  | return None | 
|  |  | 
|  | # pathlib.Path.with_suffix replaces, not appends | 
|  | suffix = '.dylib' | 
|  | if self.version: | 
|  | suffix = '.{}{}'.format(self.version[0], suffix) | 
|  | else: | 
|  | # pathlib.Path.with_suffix replaces, not appends | 
|  | suffix = '.so' | 
|  | if self.version: | 
|  | suffix = '{}.{}'.format(suffix, '.'.join(self.version)) | 
|  | return p.with_suffix(suffix) | 
|  | elif self.typ == 'exe': | 
|  | if 'mwcc' in canonical_compiler: | 
|  | return p.with_suffix('.nef') | 
|  | elif env.machines.host.is_windows() or env.machines.host.is_cygwin(): | 
|  | return p.with_suffix('.exe') | 
|  | elif self.typ == 'pdb': | 
|  | if self.version: | 
|  | p = p.with_name('{}-{}'.format(p.name, self.version[0])) | 
|  | return p.with_suffix('.pdb') if has_pdb else None | 
|  | elif self.typ in {'implib', 'implibempty'}: | 
|  | if env.machines.host.is_windows() and canonical_compiler == 'msvc': | 
|  | # only MSVC doesn't generate empty implibs | 
|  | if self.typ == 'implibempty' and compiler == 'msvc': | 
|  | return None | 
|  | return p.parent / (re.sub(r'^lib', '', p.name) + '.lib') | 
|  | elif env.machines.host.is_windows() or env.machines.host.is_cygwin(): | 
|  | return p.with_suffix('.dll.a') | 
|  | else: | 
|  | return None | 
|  | elif self.typ == 'expr': | 
|  | return Path(platform_fix_name(p.as_posix(), canonical_compiler, env)) | 
|  | else: | 
|  | raise RuntimeError(f'Invalid installed file type {self.typ}') | 
|  |  | 
|  | return p | 
|  |  | 
|  | def get_paths(self, compiler: str, env: environment.Environment, installdir: Path) -> T.List[Path]: | 
|  | p = self.get_path(compiler, env) | 
|  | if not p: | 
|  | return [] | 
|  | if self.typ == 'dir': | 
|  | abs_p = installdir / p | 
|  | if not abs_p.exists(): | 
|  | raise RuntimeError(f'{p} does not exist') | 
|  | if not abs_p.is_dir(): | 
|  | raise RuntimeError(f'{p} is not a directory') | 
|  | return [x.relative_to(installdir) for x in abs_p.rglob('*') if x.is_file() or x.is_symlink()] | 
|  | elif self.typ == 'link': | 
|  | abs_p = installdir / p | 
|  | if not abs_p.is_symlink(): | 
|  | raise RuntimeError(f'{p} is not a symlink') | 
|  | return [p] | 
|  | else: | 
|  | return [p] | 
|  |  | 
|  | @functools.total_ordering | 
|  | class TestDef: | 
|  | def __init__(self, path: Path, name: T.Optional[str], args: T.List[str], skip: bool = False, skip_category: bool = False): | 
|  | self.category = path.parts[1] | 
|  | self.path = path | 
|  | self.name = name | 
|  | self.args = args | 
|  | self.skip = skip | 
|  | self.env = os.environ.copy() | 
|  | self.installed_files: T.List[InstalledFile] = [] | 
|  | self.do_not_set_opts: T.List[str] = [] | 
|  | self.stdout: T.List[T.Dict[str, str]] = [] | 
|  | self.skip_category = skip_category | 
|  | self.skip_expected = False | 
|  | self.cleanup: T.List[str] = [] | 
|  |  | 
|  | # Always print a stack trace for Meson exceptions | 
|  | self.env['MESON_FORCE_BACKTRACE'] = '1' | 
|  |  | 
|  | def __repr__(self) -> str: | 
|  | return '<{}: {:<48} [{}: {}] -- {}>'.format(type(self).__name__, str(self.path), self.name, self.args, self.skip) | 
|  |  | 
|  | def display_name(self) -> mlog.TV_LoggableList: | 
|  | # Remove the redundant 'test cases' part | 
|  | section, id = self.path.parts[1:3] | 
|  | res: mlog.TV_LoggableList = [f'{section}:', bold(id)] | 
|  | if self.name: | 
|  | res += [f'   ({self.name})'] | 
|  | return res | 
|  |  | 
|  | def __lt__(self, other: object) -> bool: | 
|  | if isinstance(other, TestDef): | 
|  | # None is not sortable, so replace it with an empty string | 
|  | s_id = int(self.path.name.split(' ')[0]) | 
|  | o_id = int(other.path.name.split(' ')[0]) | 
|  | return (s_id, self.path, self.name or '') < (o_id, other.path, other.name or '') | 
|  | return NotImplemented | 
|  |  | 
|  | failing_testcases: T.List[str] = [] | 
|  | failing_logs: T.List[str] = [] | 
|  | print_debug = 'MESON_PRINT_TEST_OUTPUT' in os.environ | 
|  | under_ci = 'CI' in os.environ | 
|  | raw_ci_jobname = os.environ.get('MESON_CI_JOBNAME', None) | 
|  | ci_jobname = raw_ci_jobname if raw_ci_jobname != 'thirdparty' else None | 
|  | do_debug = under_ci or print_debug | 
|  | no_meson_log_msg = 'No meson-log.txt found.' | 
|  |  | 
|  | host_c_compiler: T.Optional[str]   = None | 
|  | compiler_id_map: T.Dict[str, str]  = {} | 
|  | tool_vers_map:   T.Dict[str, str]  = {} | 
|  |  | 
|  | compile_commands:   T.List[str] | 
|  | clean_commands:     T.List[str] | 
|  | test_commands:      T.List[str] | 
|  | install_commands:   T.List[str] | 
|  | uninstall_commands: T.List[str] | 
|  |  | 
|  | backend:      'Backend' | 
|  | backend_flags: T.List[str] | 
|  |  | 
|  | stop: bool = False | 
|  | is_worker_process: bool = False | 
|  |  | 
|  | # Let's have colors in our CI output | 
|  | if under_ci: | 
|  | def _ci_colorize_console() -> bool: | 
|  | return not is_worker_process | 
|  |  | 
|  | mlog.colorize_console = _ci_colorize_console | 
|  |  | 
|  | class StopException(Exception): | 
|  | def __init__(self) -> None: | 
|  | super().__init__('Stopped by user') | 
|  |  | 
|  | def stop_handler(signal: int, frame: T.Optional['FrameType']) -> None: | 
|  | global stop | 
|  | stop = True | 
|  | signal.signal(signal.SIGINT, stop_handler) | 
|  | signal.signal(signal.SIGTERM, stop_handler) | 
|  |  | 
|  | def setup_commands(optbackend: str) -> None: | 
|  | global do_debug, backend, backend_flags | 
|  | global compile_commands, clean_commands, test_commands, install_commands, uninstall_commands | 
|  | backend, backend_flags = guess_backend(optbackend, shutil.which('msbuild')) | 
|  | compile_commands, clean_commands, test_commands, install_commands, \ | 
|  | uninstall_commands = get_backend_commands(backend, do_debug) | 
|  |  | 
|  | # TODO try to eliminate or at least reduce this function | 
|  | def platform_fix_name(fname: str, canonical_compiler: str, env: environment.Environment) -> str: | 
|  | if '?lib' in fname: | 
|  | if env.machines.host.is_windows() and canonical_compiler == 'msvc': | 
|  | fname = re.sub(r'lib/\?lib(.*)\.', r'bin/\1.', fname) | 
|  | fname = re.sub(r'/\?lib/', r'/bin/', fname) | 
|  | elif env.machines.host.is_windows(): | 
|  | fname = re.sub(r'lib/\?lib(.*)\.', r'bin/lib\1.', fname) | 
|  | fname = re.sub(r'\?lib(.*)\.dll$', r'lib\1.dll', fname) | 
|  | fname = re.sub(r'/\?lib/', r'/bin/', fname) | 
|  | elif env.machines.host.is_cygwin(): | 
|  | fname = re.sub(r'lib/\?lib(.*)\.so$', r'bin/cyg\1.dll', fname) | 
|  | fname = re.sub(r'lib/\?lib(.*)\.', r'bin/cyg\1.', fname) | 
|  | fname = re.sub(r'\?lib(.*)\.dll$', r'cyg\1.dll', fname) | 
|  | fname = re.sub(r'/\?lib/', r'/bin/', fname) | 
|  | else: | 
|  | fname = re.sub(r'\?lib', 'lib', fname) | 
|  |  | 
|  | if fname.endswith('?so'): | 
|  | if env.machines.host.is_windows() and canonical_compiler == 'msvc': | 
|  | fname = re.sub(r'lib/([^/]*)\?so$', r'bin/\1.dll', fname) | 
|  | fname = re.sub(r'/(?:lib|)([^/]*?)\?so$', r'/\1.dll', fname) | 
|  | return fname | 
|  | elif env.machines.host.is_windows(): | 
|  | fname = re.sub(r'lib/([^/]*)\?so$', r'bin/\1.dll', fname) | 
|  | fname = re.sub(r'/([^/]*?)\?so$', r'/\1.dll', fname) | 
|  | return fname | 
|  | elif env.machines.host.is_cygwin(): | 
|  | fname = re.sub(r'lib/([^/]*)\?so$', r'bin/\1.dll', fname) | 
|  | fname = re.sub(r'/lib([^/]*?)\?so$', r'/cyg\1.dll', fname) | 
|  | fname = re.sub(r'/([^/]*?)\?so$', r'/\1.dll', fname) | 
|  | return fname | 
|  | elif env.machines.host.is_darwin(): | 
|  | return fname[:-3] + '.dylib' | 
|  | else: | 
|  | return fname[:-3] + '.so' | 
|  |  | 
|  | return fname | 
|  |  | 
|  | def validate_install(test: TestDef, installdir: Path, env: environment.Environment) -> str: | 
|  | ret_msg = '' | 
|  | expected_raw: T.List[Path] = [] | 
|  | for i in test.installed_files: | 
|  | try: | 
|  | expected_raw += i.get_paths(host_c_compiler, env, installdir) | 
|  | except RuntimeError as err: | 
|  | ret_msg += f'Expected path error: {err}\n' | 
|  | expected = {x: False for x in expected_raw} | 
|  | found = [x.relative_to(installdir) for x in installdir.rglob('*') if x.is_file() or x.is_symlink()] | 
|  | # Mark all found files as found and detect unexpected files | 
|  | for fname in found: | 
|  | if fname not in expected: | 
|  | ret_msg += f'Extra file {fname} found.\n' | 
|  | continue | 
|  | expected[fname] = True | 
|  | # Check if expected files were found | 
|  | for p, f in expected.items(): | 
|  | if not f: | 
|  | ret_msg += f'Expected file {p} missing.\n' | 
|  | # List dir content on error | 
|  | if ret_msg != '': | 
|  | ret_msg += '\nInstall dir contents:\n' | 
|  | for p in found: | 
|  | ret_msg += f'  - {p}\n' | 
|  | return ret_msg | 
|  |  | 
|  | def log_text_file(logfile: T.TextIO, testdir: Path, result: TestResult) -> None: | 
|  | logfile.write('%s\nstdout\n\n---\n' % testdir.as_posix()) | 
|  | logfile.write(result.stdo) | 
|  | logfile.write('\n\n---\n\nstderr\n\n---\n') | 
|  | logfile.write(result.stde) | 
|  | logfile.write('\n\n---\n\n') | 
|  | if print_debug: | 
|  | try: | 
|  | print(result.stdo) | 
|  | except UnicodeError: | 
|  | sanitized_out = result.stdo.encode('ascii', errors='replace').decode() | 
|  | print(sanitized_out) | 
|  | try: | 
|  | print(result.stde, file=sys.stderr) | 
|  | except UnicodeError: | 
|  | sanitized_err = result.stde.encode('ascii', errors='replace').decode() | 
|  | print(sanitized_err, file=sys.stderr) | 
|  |  | 
|  |  | 
|  | def _run_ci_include(args: T.List[str]) -> str: | 
|  | if not args: | 
|  | return 'At least one parameter required' | 
|  |  | 
|  | header = f'Included file {args[0]}:' | 
|  | try: | 
|  | return mlog.ci_fold_file(args[0], header, force=True) | 
|  | except Exception: | 
|  | return 'Failed to open {}\n'.format(args[0]) | 
|  |  | 
|  | ci_commands = { | 
|  | 'ci_include': _run_ci_include | 
|  | } | 
|  |  | 
|  | def run_ci_commands(raw_log: str) -> T.List[str]: | 
|  | res = [] | 
|  | for l in raw_log.splitlines(): | 
|  | if not l.startswith('!meson_ci!/'): | 
|  | continue | 
|  | cmd = shlex.split(l[11:]) | 
|  | if not cmd or cmd[0] not in ci_commands: | 
|  | continue | 
|  | res += ['CI COMMAND {}:\n{}'.format(cmd[0], ci_commands[cmd[0]](cmd[1:]))] | 
|  | return res | 
|  |  | 
|  | class OutputMatch: | 
|  | def __init__(self, how: str, expected: str, count: int) -> None: | 
|  | self.how = how | 
|  | self.expected = expected | 
|  | self.count = count | 
|  |  | 
|  | def match(self, actual: str) -> bool: | 
|  | if self.how == "re": | 
|  | return bool(re.match(self.expected, actual)) | 
|  | return self.expected == actual | 
|  |  | 
|  | def _compare_output(expected: T.List[T.Dict[str, str]], output: str, desc: str) -> str: | 
|  | if expected: | 
|  | matches:   T.List[OutputMatch] = [] | 
|  | nomatches: T.List[OutputMatch] = [] | 
|  | for item in expected: | 
|  | how = item.get('match', 'literal') | 
|  | expected_line = item.get('line') | 
|  | count = int(item.get('count', -1)) | 
|  |  | 
|  | # Simple heuristic to automatically convert path separators for | 
|  | # Windows: | 
|  | # | 
|  | # Any '/' appearing before 'WARNING' or 'ERROR' (i.e. a path in a | 
|  | # filename part of a location) is replaced with '\' (in a re: '\\' | 
|  | # which matches a literal '\') | 
|  | # | 
|  | # (There should probably be a way to turn this off for more complex | 
|  | # cases which don't fit this) | 
|  | if mesonlib.is_windows(): | 
|  | if how != "re": | 
|  | sub = r'\\' | 
|  | else: | 
|  | sub = r'\\\\' | 
|  | expected_line = re.sub(r'/(?=.*(WARNING|ERROR|DEPRECATION))', sub, expected_line) | 
|  |  | 
|  | m = OutputMatch(how, expected_line, count) | 
|  | if count == 0: | 
|  | nomatches.append(m) | 
|  | else: | 
|  | matches.append(m) | 
|  |  | 
|  |  | 
|  | i = 0 | 
|  | for actual in output.splitlines(): | 
|  | # Verify this line does not match any unexpected lines (item.count == 0) | 
|  | for match in nomatches: | 
|  | if match.match(actual): | 
|  | return f'unexpected "{match.expected}" found in {desc}' | 
|  | # If we matched all expected lines, continue to verify there are | 
|  | # no unexpected line. If nomatches is empty then we are done already. | 
|  | if i >= len(matches): | 
|  | if not nomatches: | 
|  | break | 
|  | continue | 
|  | # Check if this line match current expected line | 
|  | match = matches[i] | 
|  | if match.match(actual): | 
|  | if match.count < 0: | 
|  | # count was not specified, continue with next expected line, | 
|  | # it does not matter if this line will be matched again or | 
|  | # not. | 
|  | i += 1 | 
|  | else: | 
|  | # count was specified (must be >0), continue expecting this | 
|  | # same line. If count reached 0 we continue with next | 
|  | # expected line but remember that this one must not match | 
|  | # anymore. | 
|  | match.count -= 1 | 
|  | if match.count == 0: | 
|  | nomatches.append(match) | 
|  | i += 1 | 
|  |  | 
|  | if i < len(matches): | 
|  | # reached the end of output without finding expected | 
|  | return f'expected "{matches[i].expected}" not found in {desc}' | 
|  |  | 
|  | return '' | 
|  |  | 
|  | def validate_output(test: TestDef, stdo: str, stde: str) -> str: | 
|  | return _compare_output(test.stdout, stdo, 'stdout') | 
|  |  | 
|  | # There are some class variables and such that cache | 
|  | # information. Clear all of these. The better solution | 
|  | # would be to change the code so that no state is persisted | 
|  | # but that would be a lot of work given that Meson was originally | 
|  | # coded to run as a batch process. | 
|  | def clear_internal_caches() -> None: | 
|  | import mesonbuild.interpreterbase | 
|  | from mesonbuild.dependencies.cmake import CMakeDependency | 
|  | from mesonbuild.dependencies.pkgconfig import PkgConfigInterface | 
|  | from mesonbuild.mesonlib import PerMachine | 
|  | mesonbuild.interpreterbase.FeatureNew.feature_registry = {} | 
|  | CMakeDependency.class_cmakeinfo = PerMachine(None, None) | 
|  | PkgConfigInterface.class_impl = PerMachine(False, False) | 
|  | PkgConfigInterface.class_cli_impl = PerMachine(False, False) | 
|  | PkgConfigInterface.pkg_bin_per_machine = PerMachine(None, None) | 
|  |  | 
|  |  | 
|  | def run_test_inprocess(testdir: str) -> T.Tuple[int, str, str, str]: | 
|  | old_stdout = sys.stdout | 
|  | sys.stdout = mystdout = StringIO() | 
|  | old_stderr = sys.stderr | 
|  | sys.stderr = mystderr = StringIO() | 
|  | old_cwd = os.getcwd() | 
|  | os.chdir(testdir) | 
|  | test_log_fname = os.path.join('meson-logs', 'testlog.txt') | 
|  | try: | 
|  | returncode_test = mtest.run_with_args(['--no-rebuild']) | 
|  | if os.path.exists(test_log_fname): | 
|  | test_log = _run_ci_include([test_log_fname]) | 
|  | else: | 
|  | test_log = '' | 
|  | returncode_benchmark = mtest.run_with_args(['--no-rebuild', '--benchmark', '--logbase', 'benchmarklog']) | 
|  | finally: | 
|  | sys.stdout = old_stdout | 
|  | sys.stderr = old_stderr | 
|  | os.chdir(old_cwd) | 
|  | return max(returncode_test, returncode_benchmark), mystdout.getvalue(), mystderr.getvalue(), test_log | 
|  |  | 
|  | # Build directory name must be the same so Ccache works over | 
|  | # consecutive invocations. | 
|  | def create_deterministic_builddir(test: TestDef, use_tmpdir: bool) -> str: | 
|  | import hashlib | 
|  | src_dir = test.path.as_posix() | 
|  | if test.name: | 
|  | src_dir += test.name | 
|  | rel_dirname = 'b ' + hashlib.sha256(src_dir.encode(errors='ignore')).hexdigest()[0:10] | 
|  | abs_pathname = os.path.join(tempfile.gettempdir() if use_tmpdir else os.getcwd(), rel_dirname) | 
|  | if os.path.exists(abs_pathname): | 
|  | mesonlib.windows_proof_rmtree(abs_pathname) | 
|  | os.mkdir(abs_pathname) | 
|  | return abs_pathname | 
|  |  | 
|  | def format_parameter_file(file_basename: str, test: TestDef, test_build_dir: str) -> Path: | 
|  | confdata = ConfigurationData() | 
|  | confdata.values = {'MESON_TEST_ROOT': (str(test.path.absolute()), 'base directory of current test')} | 
|  |  | 
|  | template = test.path / (file_basename + '.in') | 
|  | destination = Path(test_build_dir) / file_basename | 
|  | mesonlib.do_conf_file(str(template), str(destination), confdata, 'meson') | 
|  |  | 
|  | return destination | 
|  |  | 
|  | def detect_parameter_files(test: TestDef, test_build_dir: str) -> T.Tuple[Path, Path]: | 
|  | nativefile = test.path / 'nativefile.ini' | 
|  | crossfile = test.path / 'crossfile.ini' | 
|  |  | 
|  | if os.path.exists(str(test.path / 'nativefile.ini.in')): | 
|  | nativefile = format_parameter_file('nativefile.ini', test, test_build_dir) | 
|  |  | 
|  | if os.path.exists(str(test.path / 'crossfile.ini.in')): | 
|  | crossfile = format_parameter_file('crossfile.ini', test, test_build_dir) | 
|  |  | 
|  | return nativefile, crossfile | 
|  |  | 
|  | # In previous python versions the global variables are lost in ProcessPoolExecutor. | 
|  | # So, we use this tuple to restore some of them | 
|  | class GlobalState(T.NamedTuple): | 
|  | compile_commands:   T.List[str] | 
|  | clean_commands:     T.List[str] | 
|  | test_commands:      T.List[str] | 
|  | install_commands:   T.List[str] | 
|  | uninstall_commands: T.List[str] | 
|  |  | 
|  | backend:      'Backend' | 
|  | backend_flags: T.List[str] | 
|  |  | 
|  | host_c_compiler: T.Optional[str] | 
|  |  | 
|  | def run_test(test: TestDef, | 
|  | extra_args: T.List[str], | 
|  | should_fail: str, | 
|  | use_tmp: bool, | 
|  | state: T.Optional[GlobalState] = None) -> T.Optional[TestResult]: | 
|  | # Unpack the global state | 
|  | global compile_commands, clean_commands, test_commands, install_commands, uninstall_commands, backend, backend_flags, host_c_compiler | 
|  | if state is not None: | 
|  | compile_commands, clean_commands, test_commands, install_commands, uninstall_commands, backend, backend_flags, host_c_compiler = state | 
|  | # Store that this is a worker process | 
|  | global is_worker_process | 
|  | is_worker_process = True | 
|  | # Setup the test environment | 
|  | assert not test.skip, 'Skipped test should not be run' | 
|  | build_dir = create_deterministic_builddir(test, use_tmp) | 
|  | try: | 
|  | with TemporaryDirectoryWinProof(prefix='i ', dir=None if use_tmp else os.getcwd()) as install_dir: | 
|  | try: | 
|  | return _run_test(test, build_dir, install_dir, extra_args, should_fail) | 
|  | except TestResult as r: | 
|  | return r | 
|  | finally: | 
|  | mlog.shutdown() # Close the log file because otherwise Windows wets itself. | 
|  | finally: | 
|  | mesonlib.windows_proof_rmtree(build_dir) | 
|  |  | 
|  | def _run_test(test: TestDef, | 
|  | test_build_dir: str, | 
|  | install_dir: str, | 
|  | extra_args: T.List[str], | 
|  | should_fail: str) -> TestResult: | 
|  | gen_start = time.time() | 
|  | # Configure in-process | 
|  | gen_args = ['setup'] | 
|  | if 'prefix' not in test.do_not_set_opts: | 
|  | gen_args += ['--prefix', 'x:/usr'] if mesonlib.is_windows() else ['--prefix', '/usr'] | 
|  | if 'libdir' not in test.do_not_set_opts: | 
|  | gen_args += ['--libdir', 'lib'] | 
|  | gen_args += [test.path.as_posix(), test_build_dir] + backend_flags + extra_args | 
|  |  | 
|  | nativefile, crossfile = detect_parameter_files(test, test_build_dir) | 
|  |  | 
|  | if nativefile.exists(): | 
|  | gen_args.extend(['--native-file', nativefile.as_posix()]) | 
|  | if crossfile.exists(): | 
|  | gen_args.extend(['--cross-file', crossfile.as_posix()]) | 
|  | inprocess, res = run_configure(gen_args, env=test.env, catch_exception=True) | 
|  | returncode, stdo, stde = res | 
|  | cmd = '(inprocess) $ ' if inprocess else '$ ' | 
|  | cmd += mesonlib.join_args(gen_args) | 
|  | logfile = os.path.join(test_build_dir, 'meson-logs', 'meson-log.txt') | 
|  | if os.path.exists(logfile): | 
|  | mesonlog = '\n'.join((cmd, _run_ci_include([logfile]))) | 
|  | else: | 
|  | mesonlog = no_meson_log_msg | 
|  | cicmds = run_ci_commands(mesonlog) | 
|  | testresult = TestResult(cicmds) | 
|  | testresult.add_step(BuildStep.configure, '\n'.join((cmd, stdo)), stde, mesonlog, time.time() - gen_start) | 
|  | output_msg = validate_output(test, stdo, stde) | 
|  | testresult.mlog += output_msg | 
|  | if output_msg: | 
|  | testresult.fail('Unexpected output while configuring.') | 
|  | return testresult | 
|  | if should_fail == 'meson': | 
|  | if returncode == 1: | 
|  | return testresult | 
|  | elif returncode != 0: | 
|  | testresult.fail(f'Test exited with unexpected status {returncode}.') | 
|  | return testresult | 
|  | else: | 
|  | testresult.fail('Test that should have failed succeeded.') | 
|  | return testresult | 
|  | if returncode != 0: | 
|  | testresult.fail('Generating the build system failed.') | 
|  | return testresult | 
|  | builddata = build.load(test_build_dir) | 
|  | dir_args = get_backend_args_for_dir(backend, test_build_dir) | 
|  |  | 
|  | # Build with subprocess | 
|  | def build_step() -> None: | 
|  | build_start = time.time() | 
|  | pc, o, _ = Popen_safe(compile_commands + dir_args, cwd=test_build_dir, stderr=subprocess.STDOUT) | 
|  | testresult.add_step(BuildStep.build, o, '', '', time.time() - build_start) | 
|  | if should_fail == 'build': | 
|  | if pc.returncode != 0: | 
|  | raise testresult | 
|  | testresult.fail('Test that should have failed to build succeeded.') | 
|  | raise testresult | 
|  | if pc.returncode != 0: | 
|  | testresult.fail('Compiling source code failed.') | 
|  | raise testresult | 
|  |  | 
|  | # Touch the meson.build file to force a regenerate | 
|  | def force_regenerate() -> None: | 
|  | os.utime(str(test.path / 'meson.build')) | 
|  |  | 
|  | # just test building | 
|  | build_step() | 
|  |  | 
|  | # test that regeneration works for build step | 
|  | force_regenerate() | 
|  | build_step()  # TBD: assert nothing gets built after the regenerate? | 
|  |  | 
|  | # test that regeneration works for test step | 
|  | force_regenerate() | 
|  |  | 
|  | # Test in-process | 
|  | clear_internal_caches() | 
|  | test_start = time.time() | 
|  | (returncode, tstdo, tstde, test_log) = run_test_inprocess(test_build_dir) | 
|  | testresult.add_step(BuildStep.test, tstdo, tstde, test_log, time.time() - test_start) | 
|  | if should_fail == 'test': | 
|  | if returncode != 0: | 
|  | return testresult | 
|  | testresult.fail('Test that should have failed to run unit tests succeeded.') | 
|  | return testresult | 
|  | if returncode != 0: | 
|  | testresult.fail('Running unit tests failed.') | 
|  | return testresult | 
|  |  | 
|  | # Do installation, if the backend supports it | 
|  | if install_commands: | 
|  | env = test.env.copy() | 
|  | env['DESTDIR'] = install_dir | 
|  | # Install with subprocess | 
|  | pi, o, e = Popen_safe(install_commands, cwd=test_build_dir, env=env) | 
|  | testresult.add_step(BuildStep.install, o, e) | 
|  | if pi.returncode != 0: | 
|  | testresult.fail('Running install failed.') | 
|  | return testresult | 
|  |  | 
|  | # Clean with subprocess | 
|  | env = test.env.copy() | 
|  | pi, o, e = Popen_safe(clean_commands + dir_args, cwd=test_build_dir, env=env) | 
|  | testresult.add_step(BuildStep.clean, o, e) | 
|  | if pi.returncode != 0: | 
|  | testresult.fail('Running clean failed.') | 
|  | return testresult | 
|  |  | 
|  | # Validate installed files | 
|  | testresult.add_step(BuildStep.install, '', '') | 
|  | if not install_commands: | 
|  | return testresult | 
|  | install_msg = validate_install(test, Path(install_dir), builddata.environment) | 
|  | if install_msg: | 
|  | testresult.fail('\n' + install_msg) | 
|  | return testresult | 
|  |  | 
|  | return testresult | 
|  |  | 
|  |  | 
|  | # processing of test.json 'skip_*' keys, which can appear at top level, or in | 
|  | # matrix: | 
|  | def _skip_keys(test_def: T.Dict) -> T.Tuple[bool, bool]: | 
|  | skip_expected = False | 
|  |  | 
|  | # Test is expected to skip if MESON_CI_JOBNAME contains any of the list of | 
|  | # substrings | 
|  | if ('expect_skip_on_jobname' in test_def) and (ci_jobname is not None): | 
|  | skip_expected = any(s in ci_jobname for s in test_def['expect_skip_on_jobname']) | 
|  |  | 
|  | # Test is expected to skip if os matches | 
|  | if 'expect_skip_on_os' in test_def: | 
|  | mesonenv = environment.Environment('', '', get_fake_options('/')) | 
|  | for skip_os in test_def['expect_skip_on_os']: | 
|  | if skip_os.startswith('!'): | 
|  | if mesonenv.machines.host.system != skip_os[1:]: | 
|  | skip_expected = True | 
|  | else: | 
|  | if mesonenv.machines.host.system == skip_os: | 
|  | skip_expected = True | 
|  |  | 
|  | # Skip if environment variable is present | 
|  | skip = False | 
|  | if 'skip_on_env' in test_def: | 
|  | for skip_env_var in test_def['skip_on_env']: | 
|  | if skip_env_var in os.environ: | 
|  | skip = True | 
|  |  | 
|  | return (skip, skip_expected) | 
|  |  | 
|  |  | 
|  | def load_test_json(t: TestDef, stdout_mandatory: bool, skip_category: bool = False) -> T.List[TestDef]: | 
|  | all_tests: T.List[TestDef] = [] | 
|  | test_def = {} | 
|  | test_def_file = t.path / 'test.json' | 
|  | if test_def_file.is_file(): | 
|  | test_def = json.loads(test_def_file.read_text(encoding='utf-8')) | 
|  |  | 
|  | # Handle additional environment variables | 
|  | env: T.Dict[str, str] = {} | 
|  | if 'env' in test_def: | 
|  | assert isinstance(test_def['env'], dict) | 
|  | env = test_def['env'] | 
|  | for key, val in env.items(): | 
|  | val = val.replace('@ROOT@', t.path.resolve().as_posix()) | 
|  | val = val.replace('@PATH@', t.env.get('PATH', '')) | 
|  | env[key] = val | 
|  |  | 
|  | # Handle installed files | 
|  | installed: T.List[InstalledFile] = [] | 
|  | if 'installed' in test_def: | 
|  | installed = [InstalledFile(x) for x in test_def['installed']] | 
|  |  | 
|  | # Handle expected output | 
|  | stdout = test_def.get('stdout', []) | 
|  | if stdout_mandatory and not stdout: | 
|  | raise RuntimeError(f"{test_def_file} must contain a non-empty stdout key") | 
|  |  | 
|  | # Handle the do_not_set_opts list | 
|  | do_not_set_opts: T.List[str] = test_def.get('do_not_set_opts', []) | 
|  |  | 
|  | (t.skip, t.skip_expected) = _skip_keys(test_def) | 
|  |  | 
|  | cleanup = test_def.get('cleanup', []) | 
|  |  | 
|  | # Skip tests if the tool requirements are not met | 
|  | if 'tools' in test_def: | 
|  | assert isinstance(test_def['tools'], dict) | 
|  | for tool, vers_req in test_def['tools'].items(): | 
|  | if tool not in tool_vers_map: | 
|  | t.skip = True | 
|  | elif not mesonlib.version_compare(tool_vers_map[tool], vers_req): | 
|  | t.skip = True | 
|  |  | 
|  | # Skip the matrix code and just update the existing test | 
|  | if 'matrix' not in test_def: | 
|  | t.env.update(env) | 
|  | t.installed_files = installed | 
|  | t.do_not_set_opts = do_not_set_opts | 
|  | t.stdout = stdout | 
|  | t.cleanup = cleanup | 
|  | return [t] | 
|  |  | 
|  | new_opt_list: T.List[T.List[T.Tuple[str, str, bool, bool]]] | 
|  |  | 
|  | # 'matrix; entry is present, so build multiple tests from matrix definition | 
|  | opt_list: T.List[T.List[T.Tuple[str, str, bool, bool]]] = [] | 
|  | matrix = test_def['matrix'] | 
|  | assert "options" in matrix | 
|  | for key, val in matrix["options"].items(): | 
|  | assert isinstance(val, list) | 
|  | tmp_opts: T.List[T.Tuple[str, str, bool, bool]] = [] | 
|  | for i in val: | 
|  | assert isinstance(i, dict) | 
|  | assert "val" in i | 
|  |  | 
|  | (skip, skip_expected) = _skip_keys(i) | 
|  |  | 
|  | # Only run the test if all compiler ID's match | 
|  | if 'compilers' in i: | 
|  | for lang, id_list in i['compilers'].items(): | 
|  | if lang not in compiler_id_map or compiler_id_map[lang] not in id_list: | 
|  | skip = True | 
|  | break | 
|  |  | 
|  | # Add an empty matrix entry | 
|  | if i['val'] is None: | 
|  | tmp_opts += [(key, None, skip, skip_expected)] | 
|  | continue | 
|  |  | 
|  | tmp_opts += [(key, i['val'], skip, skip_expected)] | 
|  |  | 
|  | if opt_list: | 
|  | new_opt_list = [] | 
|  | for i in opt_list: | 
|  | for j in tmp_opts: | 
|  | new_opt_list += [[*i, j]] | 
|  | opt_list = new_opt_list | 
|  | else: | 
|  | opt_list = [[x] for x in tmp_opts] | 
|  |  | 
|  | # Exclude specific configurations | 
|  | if 'exclude' in matrix: | 
|  | assert isinstance(matrix['exclude'], list) | 
|  | new_opt_list = [] | 
|  | for i in opt_list: | 
|  | exclude = False | 
|  | opt_tuple = [(x[0], x[1]) for x in i] | 
|  | for j in matrix['exclude']: | 
|  | ex_list = [(k, v) for k, v in j.items()] | 
|  | if all([x in opt_tuple for x in ex_list]): | 
|  | exclude = True | 
|  | break | 
|  |  | 
|  | if not exclude: | 
|  | new_opt_list += [i] | 
|  |  | 
|  | opt_list = new_opt_list | 
|  |  | 
|  | for i in opt_list: | 
|  | name = ' '.join([f'{x[0]}={x[1]}' for x in i if x[1] is not None]) | 
|  | opts = [f'-D{x[0]}={x[1]}' for x in i if x[1] is not None] | 
|  | skip = any([x[2] for x in i]) | 
|  | skip_expected = any([x[3] for x in i]) | 
|  | test = TestDef(t.path, name, opts, skip or t.skip, skip_category) | 
|  | test.env.update(env) | 
|  | test.installed_files = installed | 
|  | test.do_not_set_opts = do_not_set_opts | 
|  | test.stdout = stdout | 
|  | test.skip_expected = skip_expected or t.skip_expected | 
|  | test.cleanup = cleanup | 
|  |  | 
|  | all_tests.append(test) | 
|  |  | 
|  | return all_tests | 
|  |  | 
|  |  | 
|  | def gather_tests(testdir: Path, stdout_mandatory: bool, only: T.List[str], skip_category: bool) -> T.List[TestDef]: | 
|  | all_tests: T.List[TestDef] = [] | 
|  | for t in testdir.iterdir(): | 
|  | # Filter non-tests files (dot files, etc) | 
|  | if not t.is_dir() or t.name.startswith('.'): | 
|  | continue | 
|  | if t.name in {'18 includedirxyz'}: | 
|  | continue | 
|  | if only and not any(t.name.startswith(prefix) for prefix in only): | 
|  | continue | 
|  | test_def = TestDef(t, None, [], skip_category=skip_category) | 
|  | all_tests.extend(load_test_json(test_def, stdout_mandatory, skip_category)) | 
|  | return sorted(all_tests) | 
|  |  | 
|  |  | 
|  | def have_d_compiler() -> bool: | 
|  | if shutil.which("ldc2"): | 
|  | return True | 
|  | elif shutil.which("ldc"): | 
|  | return True | 
|  | elif shutil.which("gdc"): | 
|  | return True | 
|  | elif shutil.which("dmd"): | 
|  | # The Windows installer sometimes produces a DMD install | 
|  | # that exists but segfaults every time the compiler is run. | 
|  | # Don't know why. Don't know how to fix. Skip in this case. | 
|  | cp = subprocess.run(['dmd', '--version'], | 
|  | capture_output=True) | 
|  | if cp.stdout == b'': | 
|  | return False | 
|  | return True | 
|  | return False | 
|  |  | 
|  | def have_objc_compiler(use_tmp: bool) -> bool: | 
|  | return have_working_compiler('objc', use_tmp) | 
|  |  | 
|  | def have_objcpp_compiler(use_tmp: bool) -> bool: | 
|  | return have_working_compiler('objcpp', use_tmp) | 
|  |  | 
|  | def have_cython_compiler(use_tmp: bool) -> bool: | 
|  | return have_working_compiler('cython', use_tmp) | 
|  |  | 
|  | def have_working_compiler(lang: str, use_tmp: bool) -> bool: | 
|  | with TemporaryDirectoryWinProof(prefix='b ', dir=None if use_tmp else '.') as build_dir: | 
|  | env = environment.Environment('', build_dir, get_fake_options('/')) | 
|  | try: | 
|  | compiler = compiler_from_language(env, lang, MachineChoice.HOST) | 
|  | except mesonlib.MesonException: | 
|  | return False | 
|  | if not compiler: | 
|  | return False | 
|  | env.coredata.process_compiler_options(lang, compiler, env, '') | 
|  | try: | 
|  | compiler.sanity_check(env.get_scratch_dir(), env) | 
|  | except mesonlib.MesonException: | 
|  | return False | 
|  | return True | 
|  |  | 
|  | def have_java() -> bool: | 
|  | if shutil.which('javac') and shutil.which('java'): | 
|  | return True | 
|  | return False | 
|  |  | 
|  | def skip_dont_care(t: TestDef) -> bool: | 
|  | # Everything is optional when not running on CI | 
|  | if ci_jobname is None: | 
|  | return True | 
|  |  | 
|  | # Non-frameworks test are allowed to determine their own skipping under CI (currently) | 
|  | if not t.category.endswith('frameworks'): | 
|  | return True | 
|  |  | 
|  | if mesonlib.is_osx() and '6 gettext' in str(t.path): | 
|  | return True | 
|  |  | 
|  | return False | 
|  |  | 
|  | def skip_csharp(backend: Backend) -> bool: | 
|  | if backend is not Backend.ninja: | 
|  | return True | 
|  | if not shutil.which('resgen'): | 
|  | return True | 
|  | if shutil.which('mcs'): | 
|  | return False | 
|  | if shutil.which('csc'): | 
|  | # Only support VS2017 for now. Earlier versions fail | 
|  | # under CI in mysterious ways. | 
|  | try: | 
|  | stdo = subprocess.check_output(['csc', '/version']) | 
|  | except subprocess.CalledProcessError: | 
|  | return True | 
|  | # Having incrementing version numbers would be too easy. | 
|  | # Microsoft reset the versioning back to 1.0 (from 4.x) | 
|  | # when they got the Roslyn based compiler. Thus there | 
|  | # is NO WAY to reliably do version number comparisons. | 
|  | # Only support the version that ships with VS2017. | 
|  | return not stdo.startswith(b'2.') | 
|  | return True | 
|  |  | 
|  | # In Azure some setups have a broken rustc that will error out | 
|  | # on all compilation attempts. | 
|  |  | 
|  | def has_broken_rustc() -> bool: | 
|  | dirname = Path('brokenrusttest') | 
|  | if dirname.exists(): | 
|  | mesonlib.windows_proof_rmtree(dirname.as_posix()) | 
|  | dirname.mkdir() | 
|  | sanity_file = dirname / 'sanity.rs' | 
|  | sanity_file.write_text('fn main() {\n}\n', encoding='utf-8') | 
|  | pc = subprocess.run(['rustc', '-o', 'sanity.exe', 'sanity.rs'], | 
|  | cwd=dirname.as_posix(), | 
|  | stdout = subprocess.DEVNULL, | 
|  | stderr = subprocess.DEVNULL) | 
|  | mesonlib.windows_proof_rmtree(dirname.as_posix()) | 
|  | return pc.returncode != 0 | 
|  |  | 
|  | def should_skip_rust(backend: Backend) -> bool: | 
|  | if not shutil.which('rustc'): | 
|  | return True | 
|  | if backend is not Backend.ninja: | 
|  | return True | 
|  | if mesonlib.is_windows(): | 
|  | if has_broken_rustc(): | 
|  | return True | 
|  | return False | 
|  |  | 
|  | def should_skip_wayland() -> bool: | 
|  | if mesonlib.is_windows() or mesonlib.is_osx(): | 
|  | return True | 
|  | if not shutil.which('wayland-scanner'): | 
|  | return True | 
|  | return False | 
|  |  | 
|  | def detect_tests_to_run(only: T.Dict[str, T.List[str]], use_tmp: bool) -> T.List[T.Tuple[str, T.List[TestDef], bool]]: | 
|  | """ | 
|  | Parameters | 
|  | ---------- | 
|  | only: dict of categories and list of test cases, optional | 
|  | specify names of tests to run | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | gathered_tests: list of tuple of str, list of TestDef, bool | 
|  | tests to run | 
|  | """ | 
|  |  | 
|  | skip_fortran = not(shutil.which('gfortran') or | 
|  | shutil.which('flang-new') or | 
|  | shutil.which('flang') or | 
|  | shutil.which('pgfortran') or | 
|  | shutil.which('nagfor') or | 
|  | shutil.which('ifort') or | 
|  | shutil.which('ifx')) | 
|  |  | 
|  | skip_cmake = ((os.environ.get('compiler') == 'msvc2015' and under_ci) or | 
|  | 'cmake' not in tool_vers_map or | 
|  | not mesonlib.version_compare(tool_vers_map['cmake'], '>=3.14')) | 
|  |  | 
|  | class TestCategory: | 
|  | def __init__(self, category: str, subdir: str, skip: bool = False, stdout_mandatory: bool = False): | 
|  | self.category = category                  # category name | 
|  | self.subdir = subdir                      # subdirectory | 
|  | self.skip = skip                          # skip condition | 
|  | self.stdout_mandatory = stdout_mandatory  # expected stdout is mandatory for tests in this category | 
|  |  | 
|  | all_tests = [ | 
|  | TestCategory('cmake', 'cmake', skip_cmake), | 
|  | TestCategory('common', 'common'), | 
|  | TestCategory('native', 'native'), | 
|  | TestCategory('warning-meson', 'warning', stdout_mandatory=True), | 
|  | TestCategory('failing-meson', 'failing', stdout_mandatory=True), | 
|  | TestCategory('failing-build', 'failing build'), | 
|  | TestCategory('failing-test',  'failing test'), | 
|  | TestCategory('keyval', 'keyval'), | 
|  | TestCategory('platform-osx', 'osx', not mesonlib.is_osx()), | 
|  | TestCategory('platform-windows', 'windows', not mesonlib.is_windows() and not mesonlib.is_cygwin()), | 
|  | TestCategory('platform-linux', 'linuxlike', mesonlib.is_osx() or mesonlib.is_windows()), | 
|  | TestCategory('java', 'java', backend is not Backend.ninja or not have_java()), | 
|  | TestCategory('C#', 'csharp', skip_csharp(backend)), | 
|  | TestCategory('vala', 'vala', backend is not Backend.ninja or not shutil.which(os.environ.get('VALAC', 'valac'))), | 
|  | TestCategory('cython', 'cython', backend is not Backend.ninja or not have_cython_compiler(options.use_tmpdir)), | 
|  | TestCategory('rust', 'rust', should_skip_rust(backend)), | 
|  | TestCategory('d', 'd', backend is not Backend.ninja or not have_d_compiler()), | 
|  | TestCategory('objective c', 'objc', backend not in (Backend.ninja, Backend.xcode) or not have_objc_compiler(options.use_tmpdir)), | 
|  | TestCategory('objective c++', 'objcpp', backend not in (Backend.ninja, Backend.xcode) or not have_objcpp_compiler(options.use_tmpdir)), | 
|  | TestCategory('fortran', 'fortran', skip_fortran or backend != Backend.ninja), | 
|  | TestCategory('swift', 'swift', backend not in (Backend.ninja, Backend.xcode) or not shutil.which('swiftc')), | 
|  | # CUDA tests on Windows: use Ninja backend:  python run_project_tests.py --only cuda --backend ninja | 
|  | TestCategory('cuda', 'cuda', backend not in (Backend.ninja, Backend.xcode) or not shutil.which('nvcc')), | 
|  | TestCategory('python3', 'python3', backend is not Backend.ninja or 'python3' not in sys.executable), | 
|  | TestCategory('python', 'python'), | 
|  | TestCategory('fpga', 'fpga', shutil.which('yosys') is None), | 
|  | TestCategory('frameworks', 'frameworks'), | 
|  | TestCategory('nasm', 'nasm'), | 
|  | TestCategory('wasm', 'wasm', shutil.which('emcc') is None or backend is not Backend.ninja), | 
|  | TestCategory('wayland', 'wayland', should_skip_wayland()), | 
|  | TestCategory('format', 'format'), | 
|  | ] | 
|  |  | 
|  | categories = [t.category for t in all_tests] | 
|  | assert categories == ALL_TESTS, 'argparse("--only", choices=ALL_TESTS) need to be updated to match all_tests categories' | 
|  |  | 
|  | if only: | 
|  | for key in only.keys(): | 
|  | assert key in categories, f'key `{key}` is not a recognized category' | 
|  | all_tests = [t for t in all_tests if t.category in only.keys()] | 
|  |  | 
|  | gathered_tests = [(t.category, gather_tests(Path('test cases', t.subdir), t.stdout_mandatory, only[t.category], t.skip), t.skip) for t in all_tests] | 
|  | return gathered_tests | 
|  |  | 
|  | def run_tests(all_tests: T.List[T.Tuple[str, T.List[TestDef], bool]], | 
|  | log_name_base: str, | 
|  | failfast: bool, | 
|  | extra_args: T.List[str], | 
|  | use_tmp: bool, | 
|  | num_workers: int) -> T.Tuple[int, int, int]: | 
|  | txtname = log_name_base + '.txt' | 
|  | with open(txtname, 'w', encoding='utf-8', errors='ignore') as lf: | 
|  | return _run_tests(all_tests, log_name_base, failfast, extra_args, use_tmp, num_workers, lf) | 
|  |  | 
|  | class TestStatus(Enum): | 
|  | OK = normal_green(' [SUCCESS] ') | 
|  | SKIP = yellow(' [SKIPPED] ') | 
|  | ERROR = red('  [ERROR]  ') | 
|  | UNEXSKIP = red('[UNEXSKIP] ') | 
|  | UNEXRUN = red(' [UNEXRUN] ') | 
|  | CANCELED = cyan('[CANCELED] ') | 
|  | RUNNING = blue(' [RUNNING] ')  # Should never be actually printed | 
|  | LOG = bold('   [LOG]   ')      # Should never be actually printed | 
|  |  | 
|  | def default_print(*args: mlog.TV_Loggable, sep: str = ' ') -> None: | 
|  | print(*args, sep=sep) | 
|  |  | 
|  | safe_print = default_print | 
|  |  | 
|  | class TestRunFuture: | 
|  | def __init__(self, name: str, testdef: TestDef, future: T.Optional['Future[T.Optional[TestResult]]']) -> None: | 
|  | super().__init__() | 
|  | self.name = name | 
|  | self.testdef = testdef | 
|  | self.future = future | 
|  | self.status = TestStatus.RUNNING if self.future is not None else TestStatus.SKIP | 
|  |  | 
|  | @property | 
|  | def result(self) -> T.Optional[TestResult]: | 
|  | return self.future.result() if self.future else None | 
|  |  | 
|  | def log(self) -> None: | 
|  | if verbose_output or self.status.value != TestStatus.OK.value: | 
|  | without_install = '' if install_commands else '(without install)' | 
|  | safe_print(self.status.value, without_install, *self.testdef.display_name()) | 
|  |  | 
|  | def update_log(self, new_status: TestStatus) -> None: | 
|  | self.status = new_status | 
|  | self.log() | 
|  |  | 
|  | def cancel(self) -> None: | 
|  | if self.future is not None and self.future.cancel(): | 
|  | self.status = TestStatus.CANCELED | 
|  |  | 
|  | class LogRunFuture: | 
|  | def __init__(self, msgs: mlog.TV_LoggableList) -> None: | 
|  | self.msgs = msgs | 
|  | self.status = TestStatus.LOG | 
|  |  | 
|  | def log(self) -> None: | 
|  | safe_print(*self.msgs, sep='') | 
|  |  | 
|  | def cancel(self) -> None: | 
|  | pass | 
|  |  | 
|  | RunFutureUnion = T.Union[TestRunFuture, LogRunFuture] | 
|  |  | 
|  | def _run_tests(all_tests: T.List[T.Tuple[str, T.List[TestDef], bool]], | 
|  | log_name_base: str, | 
|  | failfast: bool, | 
|  | extra_args: T.List[str], | 
|  | use_tmp: bool, | 
|  | num_workers: int, | 
|  | logfile: T.TextIO) -> T.Tuple[int, int, int]: | 
|  | global stop, host_c_compiler | 
|  | xmlname = log_name_base + '.xml' | 
|  | junit_root = ET.Element('testsuites') | 
|  | conf_time:  float = 0 | 
|  | build_time: float = 0 | 
|  | test_time:  float = 0 | 
|  | passing_tests = 0 | 
|  | failing_tests = 0 | 
|  | skipped_tests = 0 | 
|  |  | 
|  | print(f'\nRunning tests with {num_workers} workers') | 
|  |  | 
|  | # Pack the global state | 
|  | state = GlobalState(compile_commands, clean_commands, test_commands, install_commands, uninstall_commands, backend, backend_flags, host_c_compiler) | 
|  | executor = ProcessPoolExecutor(max_workers=num_workers) | 
|  |  | 
|  | futures: T.List[RunFutureUnion] = [] | 
|  |  | 
|  | # First, collect and start all tests and also queue log messages | 
|  | for name, test_cases, skipped in all_tests: | 
|  | current_suite = ET.SubElement(junit_root, 'testsuite', {'name': name, 'tests': str(len(test_cases))}) | 
|  | if skipped: | 
|  | futures += [LogRunFuture(['\n', bold(f'Not running {name} tests.'), '\n'])] | 
|  | continue | 
|  | else: | 
|  | futures += [LogRunFuture(['\n', bold(f'Running {name} tests.'), '\n'])] | 
|  |  | 
|  | for t in test_cases: | 
|  | # Jenkins screws us over by automatically sorting test cases by name | 
|  | # and getting it wrong by not doing logical number sorting. | 
|  | (testnum, testbase) = t.path.name.split(' ', 1) | 
|  | testname = '%.3d %s' % (int(testnum), testbase) | 
|  | if t.name: | 
|  | testname += f' ({t.name})' | 
|  | should_fail = '' | 
|  | suite_args = [] | 
|  | if name.startswith('failing'): | 
|  | should_fail = name.split('failing-')[1] | 
|  | if name.startswith('warning'): | 
|  | suite_args = ['--fatal-meson-warnings'] | 
|  | should_fail = name.split('warning-')[1] | 
|  |  | 
|  | if skipped or t.skip: | 
|  | futures += [TestRunFuture(testname, t, None)] | 
|  | continue | 
|  | result_future = executor.submit(run_test, t, extra_args + suite_args + t.args, should_fail, use_tmp, state=state) | 
|  | futures += [TestRunFuture(testname, t, result_future)] | 
|  |  | 
|  | # Ensure we only cancel once | 
|  | tests_canceled = False | 
|  |  | 
|  | # Optionally enable the tqdm progress bar, but only if there is at least | 
|  | # one LogRunFuture and one TestRunFuture | 
|  | global safe_print | 
|  | futures_iter: T.Iterable[RunFutureUnion] = futures | 
|  | if len(futures) > 2 and sys.stdout.isatty(): | 
|  | try: | 
|  | from tqdm import tqdm | 
|  | futures_iter = tqdm(futures, desc='Running tests', unit='test') | 
|  |  | 
|  | def tqdm_print(*args: mlog.TV_Loggable, sep: str = ' ') -> None: | 
|  | tqdm.write(sep.join([str(x) for x in args])) | 
|  |  | 
|  | safe_print = tqdm_print | 
|  | except ImportError: | 
|  | pass | 
|  |  | 
|  | # Wait and handle the test results and print the stored log output | 
|  | for f in futures_iter: | 
|  | # Just a log entry to print something to stdout | 
|  | sys.stdout.flush() | 
|  | if isinstance(f, LogRunFuture): | 
|  | f.log() | 
|  | continue | 
|  |  | 
|  | # Actual Test run | 
|  | testname = f.name | 
|  | t        = f.testdef | 
|  | try: | 
|  | result = f.result | 
|  | except (CancelledError, KeyboardInterrupt): | 
|  | f.status = TestStatus.CANCELED | 
|  |  | 
|  | if stop and not tests_canceled: | 
|  | num_running = sum(1 if f2.status is TestStatus.RUNNING  else 0 for f2 in futures) | 
|  | for f2 in futures: | 
|  | f2.cancel() | 
|  | executor.shutdown() | 
|  | num_canceled = sum(1 if f2.status is TestStatus.CANCELED else 0 for f2 in futures) | 
|  | safe_print(f'\nCanceled {num_canceled} out of {num_running} running tests.') | 
|  | safe_print(f'Finishing the remaining {num_running - num_canceled} tests.\n') | 
|  | tests_canceled = True | 
|  |  | 
|  | # Handle canceled tests | 
|  | if f.status is TestStatus.CANCELED: | 
|  | f.log() | 
|  | continue | 
|  |  | 
|  | # Handle skipped tests | 
|  | if result is None: | 
|  | # skipped due to skipped category skip or 'tools:' or 'skip_on_env:' | 
|  | is_skipped = True | 
|  | skip_reason = 'not run because preconditions were not met' | 
|  | skip_as_expected = True | 
|  | else: | 
|  | # skipped due to test outputting 'MESON_SKIP_TEST' | 
|  | is_skipped, skip_reason = handle_meson_skip_test(result.stdo) | 
|  | if not skip_dont_care(t): | 
|  | skip_as_expected = (is_skipped == t.skip_expected) | 
|  | else: | 
|  | skip_as_expected = True | 
|  |  | 
|  | if is_skipped: | 
|  | skipped_tests += 1 | 
|  |  | 
|  | if is_skipped and skip_as_expected: | 
|  | f.update_log(TestStatus.SKIP) | 
|  | if not t.skip_category: | 
|  | safe_print(bold('Reason:'), skip_reason) | 
|  | current_test = ET.SubElement(current_suite, 'testcase', {'name': testname, 'classname': t.category}) | 
|  | ET.SubElement(current_test, 'skipped', {}) | 
|  | continue | 
|  |  | 
|  | if not skip_as_expected: | 
|  | failing_tests += 1 | 
|  | if is_skipped: | 
|  | skip_msg = f'Test asked to be skipped ({skip_reason}), but was not expected to' | 
|  | status = TestStatus.UNEXSKIP | 
|  | else: | 
|  | skip_msg = 'Test ran, but was expected to be skipped' | 
|  | status = TestStatus.UNEXRUN | 
|  | result.msg = f"{skip_msg} for MESON_CI_JOBNAME '{ci_jobname}'" | 
|  |  | 
|  | f.update_log(status) | 
|  | safe_print(bold('Reason:'), result.msg) | 
|  | current_test = ET.SubElement(current_suite, 'testcase', {'name': testname, 'classname': t.category}) | 
|  | ET.SubElement(current_test, 'failure', {'message': result.msg}) | 
|  | continue | 
|  |  | 
|  | # Handle Failed tests | 
|  | if result.msg != '': | 
|  | f.update_log(TestStatus.ERROR) | 
|  | safe_print(bold('During:'), result.step.name) | 
|  | safe_print(bold('Reason:'), result.msg) | 
|  | failing_tests += 1 | 
|  | # Append a visual separator for the different test cases | 
|  | cols = shutil.get_terminal_size((100, 20)).columns | 
|  | name_str = ' '.join([str(x) for x in f.testdef.display_name()]) | 
|  | name_len = len(re.sub(r'\x1B[^m]+m', '', name_str))  # Do not count escape sequences | 
|  | left_w = (cols // 2) - (name_len // 2) - 1 | 
|  | left_w = max(3, left_w) | 
|  | right_w = cols - left_w - name_len - 2 | 
|  | right_w = max(3, right_w) | 
|  | failing_testcases.append(name_str) | 
|  | failing_logs.append(f'\n\x1b[31m{"="*left_w}\x1b[0m {name_str} \x1b[31m{"="*right_w}\x1b[0m\n') | 
|  | _during = bold('Failed during:') | 
|  | _reason = bold('Reason:') | 
|  | failing_logs.append(f'{_during} {result.step.name}\n{_reason} {result.msg}\n') | 
|  | if result.step == BuildStep.configure and result.mlog != no_meson_log_msg: | 
|  | # For configure failures, instead of printing stdout, | 
|  | # print the meson log if available since it's a superset | 
|  | # of stdout and often has very useful information. | 
|  | failing_logs.append(result.mlog) | 
|  | elif under_ci: | 
|  | # Always print the complete meson log when running in | 
|  | # a CI. This helps debugging issues that only occur in | 
|  | # a hard to reproduce environment | 
|  | failing_logs.append(result.mlog) | 
|  | failing_logs.append(result.stdo) | 
|  | else: | 
|  | failing_logs.append(result.stdo) | 
|  | for cmd_res in result.cicmds: | 
|  | failing_logs.append(cmd_res) | 
|  | failing_logs.append(result.stde) | 
|  | if failfast: | 
|  | safe_print("Cancelling the rest of the tests") | 
|  | for f2 in futures: | 
|  | f2.cancel() | 
|  | else: | 
|  | f.update_log(TestStatus.OK) | 
|  | passing_tests += 1 | 
|  | for cleanup_path in t.cleanup: | 
|  | assert not os.path.isabs(cleanup_path) | 
|  | abspath = t.path / cleanup_path | 
|  | if abspath.is_file(): | 
|  | mesonlib.windows_proof_rm(abspath) | 
|  | else: | 
|  | mesonlib.windows_proof_rmtree(abspath) | 
|  | conf_time += result.conftime | 
|  | build_time += result.buildtime | 
|  | test_time += result.testtime | 
|  | total_time = conf_time + build_time + test_time | 
|  | log_text_file(logfile, t.path, result) | 
|  | current_test = ET.SubElement( | 
|  | current_suite, | 
|  | 'testcase', | 
|  | {'name': testname, 'classname': t.category, 'time': '%.3f' % total_time} | 
|  | ) | 
|  | if result.msg != '': | 
|  | ET.SubElement(current_test, 'failure', {'message': result.msg}) | 
|  | stdoel = ET.SubElement(current_test, 'system-out') | 
|  | stdoel.text = result.stdo | 
|  | stdeel = ET.SubElement(current_test, 'system-err') | 
|  | stdeel.text = result.stde | 
|  |  | 
|  | # Reset, just in case | 
|  | safe_print = default_print | 
|  |  | 
|  | print() | 
|  | print("Total configuration time: %.2fs" % conf_time) | 
|  | print("Total build time:         %.2fs" % build_time) | 
|  | print("Total test time:          %.2fs" % test_time) | 
|  | ET.ElementTree(element=junit_root).write(xmlname, xml_declaration=True, encoding='UTF-8') | 
|  | return passing_tests, failing_tests, skipped_tests | 
|  |  | 
|  | def check_meson_commands_work(use_tmpdir: bool, extra_args: T.List[str]) -> None: | 
|  | global backend, compile_commands, test_commands, install_commands | 
|  | testdir = PurePath('test cases', 'common', '1 trivial').as_posix() | 
|  | meson_commands = mesonlib.python_command + [get_meson_script()] | 
|  | with TemporaryDirectoryWinProof(prefix='b ', dir=None if use_tmpdir else '.') as build_dir: | 
|  | print('Checking that configuring works...') | 
|  | gen_cmd = meson_commands + ['setup' , testdir, build_dir] + backend_flags + extra_args | 
|  | pc, o, e = Popen_safe(gen_cmd) | 
|  | if pc.returncode != 0: | 
|  | raise RuntimeError(f'Failed to configure {testdir!r}:\n{e}\n{o}') | 
|  | print('Checking that introspect works...') | 
|  | pc, o, e = Popen_safe(meson_commands + ['introspect', '--targets'], cwd=build_dir) | 
|  | json.loads(o) | 
|  | if pc.returncode != 0: | 
|  | raise RuntimeError(f'Failed to introspect --targets {testdir!r}:\n{e}\n{o}') | 
|  | print('Checking that building works...') | 
|  | dir_args = get_backend_args_for_dir(backend, build_dir) | 
|  | pc, o, e = Popen_safe(compile_commands + dir_args, cwd=build_dir) | 
|  | if pc.returncode != 0: | 
|  | raise RuntimeError(f'Failed to build {testdir!r}:\n{e}\n{o}') | 
|  | print('Checking that testing works...') | 
|  | pc, o, e = Popen_safe(test_commands, cwd=build_dir) | 
|  | if pc.returncode != 0: | 
|  | raise RuntimeError(f'Failed to test {testdir!r}:\n{e}\n{o}') | 
|  | if install_commands: | 
|  | print('Checking that installing works...') | 
|  | pc, o, e = Popen_safe(install_commands, cwd=build_dir) | 
|  | if pc.returncode != 0: | 
|  | raise RuntimeError(f'Failed to install {testdir!r}:\n{e}\n{o}') | 
|  |  | 
|  |  | 
|  | def detect_system_compiler(options: 'CompilerArgumentType') -> None: | 
|  | global host_c_compiler, compiler_id_map | 
|  |  | 
|  | fake_opts = get_fake_options('/') | 
|  | if options.cross_file: | 
|  | fake_opts.cross_file = [options.cross_file] | 
|  | if options.native_file: | 
|  | fake_opts.native_file = [options.native_file] | 
|  |  | 
|  | env = environment.Environment('', '', fake_opts) | 
|  |  | 
|  | print_compilers(env, MachineChoice.HOST) | 
|  | if options.cross_file: | 
|  | print_compilers(env, MachineChoice.BUILD) | 
|  |  | 
|  | for lang in sorted(compilers.all_languages): | 
|  | try: | 
|  | comp = compiler_from_language(env, lang, MachineChoice.HOST) | 
|  | # note compiler id for later use with test.json matrix | 
|  | compiler_id_map[lang] = comp.get_id() | 
|  | except mesonlib.MesonException: | 
|  | comp = None | 
|  |  | 
|  | # note C compiler for later use by platform_fix_name() | 
|  | if lang == 'c': | 
|  | if comp: | 
|  | host_c_compiler = comp.get_id() | 
|  | else: | 
|  | raise RuntimeError("Could not find C compiler.") | 
|  |  | 
|  |  | 
|  | def print_compilers(env: 'Environment', machine: MachineChoice) -> None: | 
|  | print() | 
|  | print(f'{machine.get_lower_case_name()} machine compilers') | 
|  | print() | 
|  | for lang in sorted(compilers.all_languages): | 
|  | try: | 
|  | comp = compiler_from_language(env, lang, machine) | 
|  | details = '{:<10} {} {}'.format('[' + comp.get_id() + ']', ' '.join(comp.get_exelist()), comp.get_version_string()) | 
|  | except mesonlib.MesonException: | 
|  | details = '[not found]' | 
|  | print(f'{lang:<7}: {details}') | 
|  |  | 
|  | class ToolInfo(T.NamedTuple): | 
|  | tool: str | 
|  | args: T.List[str] | 
|  | regex: T.Pattern | 
|  | match_group: int | 
|  |  | 
|  | def detect_tools(report: bool = True) -> None: | 
|  | tools: T.List[ToolInfo] = [ | 
|  | ToolInfo( | 
|  | 'ninja', | 
|  | ['--version'], | 
|  | re.compile(r'^([0-9]+(\.[0-9]+)*(-[a-z0-9]+)?)$'), | 
|  | 1, | 
|  | ), | 
|  | ToolInfo( | 
|  | 'cmake', | 
|  | ['--version'], | 
|  | re.compile(r'^cmake version ([0-9]+(\.[0-9]+)*(-[a-z0-9]+)?)$'), | 
|  | 1, | 
|  | ), | 
|  | ToolInfo( | 
|  | 'hotdoc', | 
|  | ['--version'], | 
|  | re.compile(r'^([0-9]+(\.[0-9]+)*(-[a-z0-9]+)?)$'), | 
|  | 1, | 
|  | ), | 
|  | ] | 
|  |  | 
|  | def get_version(t: ToolInfo) -> str: | 
|  | exe = shutil.which(t.tool) | 
|  | if not exe: | 
|  | return 'not found' | 
|  |  | 
|  | args = [t.tool] + t.args | 
|  | pc, o, e = Popen_safe(args) | 
|  | if pc.returncode != 0: | 
|  | return f'{exe} (invalid {t.tool} executable)' | 
|  | for i in o.split('\n'): | 
|  | i = i.strip('\n\r\t ') | 
|  | m = t.regex.match(i) | 
|  | if m is not None: | 
|  | tool_vers_map[t.tool] = m.group(t.match_group) | 
|  | return '{} ({})'.format(exe, m.group(t.match_group)) | 
|  |  | 
|  | return f'{exe} (unknown)' | 
|  |  | 
|  | if not report: | 
|  | for tool in tools: | 
|  | get_version(tool) | 
|  | return | 
|  |  | 
|  | print() | 
|  | print('tools') | 
|  | print() | 
|  |  | 
|  | max_width = max([len(x.tool) for x in tools] + [7]) | 
|  | for tool in tools: | 
|  | print('{0:<{2}}: {1}'.format(tool.tool, get_version(tool), max_width)) | 
|  | print() | 
|  |  | 
|  | symlink_test_dir1 = None | 
|  | symlink_test_dir2 = None | 
|  | symlink_file1 = None | 
|  | symlink_file2 = None | 
|  | symlink_file3 = None | 
|  |  | 
|  | def scan_test_data_symlinks() -> None: | 
|  | global symlink_test_dir1, symlink_test_dir2, symlink_file1, symlink_file2, symlink_file3 | 
|  | tmpdir1 = list(Path('.').glob('test cases/**/*install functions and follow symlinks')) | 
|  | tmpdir2 = list(Path('.').glob('test cases/frameworks/*boost symlinks')) | 
|  | assert len(tmpdir1) == 1 | 
|  | assert len(tmpdir2) == 1 | 
|  | symlink_test_dir1 = tmpdir1[0] | 
|  | symlink_test_dir2 = tmpdir2[0] / 'boost/include' | 
|  | symlink_file1 = symlink_test_dir1 / 'foo/link1' | 
|  | symlink_file2 = symlink_test_dir1 / 'foo/link2.h' | 
|  | symlink_file3 = symlink_test_dir2 / 'boost' | 
|  |  | 
|  | def clear_transitive_files() -> None: | 
|  | a = Path('test cases/common') | 
|  | for d in a.glob('*subproject subdir/subprojects/subsubsub*'): | 
|  | if d.is_dir(): | 
|  | mesonlib.windows_proof_rmtree(str(d)) | 
|  | else: | 
|  | mesonlib.windows_proof_rm(str(d)) | 
|  | try: | 
|  | if symlink_file1 is not None: | 
|  | symlink_file1.unlink() | 
|  | except FileNotFoundError: | 
|  | pass | 
|  | try: | 
|  | if symlink_file2 is not None: | 
|  | symlink_file2.unlink() | 
|  | except FileNotFoundError: | 
|  | pass | 
|  | try: | 
|  | if symlink_file3 is not None: | 
|  | symlink_file3.unlink() | 
|  | symlink_test_dir2.rmdir() | 
|  | except FileNotFoundError: | 
|  | pass | 
|  |  | 
|  | def setup_symlinks() -> None: | 
|  | try: | 
|  | symlink_file1.symlink_to('file1') | 
|  | symlink_file2.symlink_to('file1') | 
|  | symlink_test_dir2.mkdir(parents=True, exist_ok=True) | 
|  | symlink_file3.symlink_to('../Cellar/boost/0.3.0/include/boost') | 
|  | except OSError: | 
|  | print('symlinks are not supported on this system') | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | if under_ci and not raw_ci_jobname: | 
|  | raise SystemExit('Running under CI but $MESON_CI_JOBNAME is not set (set to "thirdparty" if you are running outside of the github org)') | 
|  |  | 
|  | setup_vsenv() | 
|  | try: | 
|  | # This fails in some CI environments for unknown reasons. | 
|  | num_workers = multiprocessing.cpu_count() | 
|  | except Exception as e: | 
|  | print('Could not determine number of CPUs due to the following reason:', str(e)) | 
|  | print('Defaulting to using only two processes') | 
|  | num_workers = 2 | 
|  |  | 
|  | if num_workers > 64: | 
|  | # Too much parallelism seems to trigger a potential Python bug: | 
|  | # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=1004107 | 
|  | num_workers = 64 | 
|  |  | 
|  | parser = argparse.ArgumentParser(description="Run the test suite of Meson.") | 
|  | parser.add_argument('extra_args', nargs='*', | 
|  | help='arguments that are passed directly to Meson (remember to have -- before these).') | 
|  | parser.add_argument('--backend', dest='backend', choices=backendlist) | 
|  | parser.add_argument('-j', dest='num_workers', type=int, default=num_workers, | 
|  | help=f'Maximum number of parallel tests (default {num_workers})') | 
|  | parser.add_argument('--failfast', action='store_true', | 
|  | help='Stop running if test case fails') | 
|  | parser.add_argument('--no-unittests', action='store_true', | 
|  | help='Not used, only here to simplify run_tests.py') | 
|  | parser.add_argument('--only', default=[], | 
|  | help='name of test(s) to run, in format "category[/name]" where category is one of: ' + ', '.join(ALL_TESTS), nargs='+') | 
|  | parser.add_argument('-v', default=False, action='store_true', | 
|  | help='Verbose mode') | 
|  | parser.add_argument('--cross-file', action='store', help='File describing cross compilation environment.') | 
|  | parser.add_argument('--native-file', action='store', help='File describing native compilation environment.') | 
|  | parser.add_argument('--use-tmpdir', action='store_true', help='Use tmp directory for temporary files.') | 
|  | options = T.cast('ArgumentType', parser.parse_args()) | 
|  | verbose_output = options.v | 
|  |  | 
|  | if options.cross_file: | 
|  | options.extra_args += ['--cross-file', options.cross_file] | 
|  | if options.native_file: | 
|  | options.extra_args += ['--native-file', options.native_file] | 
|  |  | 
|  | if not mesonlib.is_windows(): | 
|  | scan_test_data_symlinks() | 
|  | clear_transitive_files() | 
|  | if not mesonlib.is_windows(): | 
|  | setup_symlinks() | 
|  | mesonlib.set_meson_command(get_meson_script()) | 
|  |  | 
|  | print('Meson build system', meson_version, 'Project Tests') | 
|  | print('Using python', sys.version.split('\n')[0], f'({sys.executable!r})') | 
|  | if 'VSCMD_VER' in os.environ: | 
|  | print('VSCMD version', os.environ['VSCMD_VER']) | 
|  | setup_commands(options.backend) | 
|  | detect_system_compiler(options) | 
|  | detect_tools() | 
|  | script_dir = os.path.split(__file__)[0] | 
|  | if script_dir != '': | 
|  | os.chdir(script_dir) | 
|  | check_meson_commands_work(options.use_tmpdir, options.extra_args) | 
|  | only = collections.defaultdict(list) | 
|  | for i in options.only: | 
|  | try: | 
|  | cat, case = i.split('/') | 
|  | only[cat].append(case) | 
|  | except ValueError: | 
|  | only[i].append('') | 
|  | try: | 
|  | all_tests = detect_tests_to_run(only, options.use_tmpdir) | 
|  | res = run_tests(all_tests, 'meson-test-run', options.failfast, options.extra_args, options.use_tmpdir, options.num_workers) | 
|  | (passing_tests, failing_tests, skipped_tests) = res | 
|  | except StopException: | 
|  | pass | 
|  | if failing_tests > 0: | 
|  | print('\nMesonlogs of failing tests\n') | 
|  | for l in failing_logs: | 
|  | try: | 
|  | print(l, '\n') | 
|  | except UnicodeError: | 
|  | print(l.encode('ascii', errors='replace').decode(), '\n') | 
|  | print() | 
|  | print('Total passed tests: ', green(str(passing_tests))) | 
|  | print('Total failed tests: ', red(str(failing_tests))) | 
|  | print('Total skipped tests:', yellow(str(skipped_tests))) | 
|  | if failing_tests > 0: | 
|  | print('\nAll failures:') | 
|  | for c in failing_testcases: | 
|  | print(f'  -> {c}') | 
|  | for name, dirs, _ in all_tests: | 
|  | dir_names = list({x.path.name for x in dirs}) | 
|  | for k, g in itertools.groupby(dir_names, key=lambda x: x.split()[0]): | 
|  | tests = list(g) | 
|  | if len(tests) != 1: | 
|  | print('WARNING: The {} suite contains duplicate "{}" tests: "{}"'.format(name, k, '", "'.join(tests))) | 
|  | clear_transitive_files() | 
|  | raise SystemExit(failing_tests) |