| # SPDX-License-Identifier: Apache-2.0 |
| # Copyright 2016 The Meson development team |
| |
| # This tool is used to manipulate an existing Meson build definition. |
| # |
| # - add a file to a target |
| # - remove files from a target |
| # - move targets |
| # - reindent? |
| from __future__ import annotations |
| |
| from .ast import IntrospectionInterpreter, BUILD_TARGET_FUNCTIONS, AstConditionLevel, AstIDGenerator, AstIndentationGenerator, AstPrinter |
| from mesonbuild.mesonlib import MesonException, setup_vsenv |
| from . import mlog, environment |
| from functools import wraps |
| from .mparser import Token, ArrayNode, ArgumentNode, AssignmentNode, StringNode, BooleanNode, ElementaryNode, IdNode, FunctionNode, SymbolNode |
| import json, os, re, sys |
| import typing as T |
| |
| if T.TYPE_CHECKING: |
| from argparse import ArgumentParser, HelpFormatter |
| from .mparser import BaseNode |
| |
| class RewriterException(MesonException): |
| pass |
| |
| # Note: when adding arguments, please also add them to the completion |
| # scripts in $MESONSRC/data/shell-completions/ |
| def add_arguments(parser: ArgumentParser, formatter: T.Callable[[str], HelpFormatter]) -> None: |
| parser.add_argument('-s', '--sourcedir', type=str, default='.', metavar='SRCDIR', help='Path to source directory.') |
| parser.add_argument('-V', '--verbose', action='store_true', default=False, help='Enable verbose output') |
| parser.add_argument('-S', '--skip-errors', dest='skip', action='store_true', default=False, help='Skip errors instead of aborting') |
| subparsers = parser.add_subparsers(dest='type', title='Rewriter commands', description='Rewrite command to execute') |
| |
| # Target |
| tgt_parser = subparsers.add_parser('target', aliases=['tgt'], help='Modify a target', formatter_class=formatter) |
| tgt_parser.add_argument('-s', '--subdir', default='', dest='subdir', help='Subdirectory of the new target (only for the "add_target" action)') |
| tgt_parser.add_argument('--type', dest='tgt_type', choices=rewriter_keys['target']['target_type'][2], default='executable', |
| help='Type of the target to add (only for the "add_target" action)') |
| tgt_parser.add_argument('target', help='Name or ID of the target') |
| tgt_parser.add_argument('operation', choices=['add', 'rm', 'add_target', 'rm_target', 'add_extra_files', 'rm_extra_files', 'info'], |
| help='Action to execute') |
| tgt_parser.add_argument('sources', nargs='*', help='Sources to add/remove') |
| |
| # KWARGS |
| kw_parser = subparsers.add_parser('kwargs', help='Modify keyword arguments', formatter_class=formatter) |
| kw_parser.add_argument('operation', choices=rewriter_keys['kwargs']['operation'][2], |
| help='Action to execute') |
| kw_parser.add_argument('function', choices=list(rewriter_func_kwargs.keys()), |
| help='Function type to modify') |
| kw_parser.add_argument('id', help='ID of the function to modify (can be anything for "project")') |
| kw_parser.add_argument('kwargs', nargs='*', help='Pairs of keyword and value') |
| |
| # Default options |
| def_parser = subparsers.add_parser('default-options', aliases=['def'], help='Modify the project default options', formatter_class=formatter) |
| def_parser.add_argument('operation', choices=rewriter_keys['default_options']['operation'][2], |
| help='Action to execute') |
| def_parser.add_argument('options', nargs='*', help='Key, value pairs of configuration option') |
| |
| # JSON file/command |
| cmd_parser = subparsers.add_parser('command', aliases=['cmd'], help='Execute a JSON array of commands', formatter_class=formatter) |
| cmd_parser.add_argument('json', help='JSON string or file to execute') |
| |
| class RequiredKeys: |
| def __init__(self, keys): |
| self.keys = keys |
| |
| def __call__(self, f): |
| @wraps(f) |
| def wrapped(*wrapped_args, **wrapped_kwargs): |
| assert len(wrapped_args) >= 2 |
| cmd = wrapped_args[1] |
| for key, val in self.keys.items(): |
| typ = val[0] # The type of the value |
| default = val[1] # The default value -- None is required |
| choices = val[2] # Valid choices -- None is for everything |
| if key not in cmd: |
| if default is not None: |
| cmd[key] = default |
| else: |
| raise RewriterException('Key "{}" is missing in object for {}' |
| .format(key, f.__name__)) |
| if not isinstance(cmd[key], typ): |
| raise RewriterException('Invalid type of "{}". Required is {} but provided was {}' |
| .format(key, typ.__name__, type(cmd[key]).__name__)) |
| if choices is not None: |
| assert isinstance(choices, list) |
| if cmd[key] not in choices: |
| raise RewriterException('Invalid value of "{}": Possible values are {} but provided was "{}"' |
| .format(key, choices, cmd[key])) |
| return f(*wrapped_args, **wrapped_kwargs) |
| |
| return wrapped |
| |
| def _symbol(val: str) -> SymbolNode: |
| return SymbolNode(Token('', '', 0, 0, 0, (0, 0), val)) |
| |
| class MTypeBase: |
| def __init__(self, node: T.Optional[BaseNode] = None): |
| if node is None: |
| self.node = self.new_node() |
| else: |
| self.node = node |
| self.node_type = None |
| for i in self.supported_nodes(): |
| if isinstance(self.node, i): |
| self.node_type = i |
| |
| @classmethod |
| def new_node(cls, value=None): |
| # Overwrite in derived class |
| raise RewriterException('Internal error: new_node of MTypeBase was called') |
| |
| @classmethod |
| def supported_nodes(cls): |
| # Overwrite in derived class |
| return [] |
| |
| def can_modify(self): |
| return self.node_type is not None |
| |
| def get_node(self): |
| return self.node |
| |
| def add_value(self, value): |
| # Overwrite in derived class |
| mlog.warning('Cannot add a value of type', mlog.bold(type(self).__name__), '--> skipping') |
| |
| def remove_value(self, value): |
| # Overwrite in derived class |
| mlog.warning('Cannot remove a value of type', mlog.bold(type(self).__name__), '--> skipping') |
| |
| def remove_regex(self, value): |
| # Overwrite in derived class |
| mlog.warning('Cannot remove a regex in type', mlog.bold(type(self).__name__), '--> skipping') |
| |
| class MTypeStr(MTypeBase): |
| def __init__(self, node: T.Optional[BaseNode] = None): |
| super().__init__(node) |
| |
| @classmethod |
| def new_node(cls, value=None): |
| if value is None: |
| value = '' |
| return StringNode(Token('string', '', 0, 0, 0, None, str(value))) |
| |
| @classmethod |
| def supported_nodes(cls): |
| return [StringNode] |
| |
| class MTypeBool(MTypeBase): |
| def __init__(self, node: T.Optional[BaseNode] = None): |
| super().__init__(node) |
| |
| @classmethod |
| def new_node(cls, value=None): |
| return BooleanNode(Token('', '', 0, 0, 0, None, bool(value))) |
| |
| @classmethod |
| def supported_nodes(cls): |
| return [BooleanNode] |
| |
| class MTypeID(MTypeBase): |
| def __init__(self, node: T.Optional[BaseNode] = None): |
| super().__init__(node) |
| |
| @classmethod |
| def new_node(cls, value=None): |
| if value is None: |
| value = '' |
| return IdNode(Token('', '', 0, 0, 0, None, str(value))) |
| |
| @classmethod |
| def supported_nodes(cls): |
| return [IdNode] |
| |
| class MTypeList(MTypeBase): |
| def __init__(self, node: T.Optional[BaseNode] = None): |
| super().__init__(node) |
| |
| @classmethod |
| def new_node(cls, value=None): |
| if value is None: |
| value = [] |
| elif not isinstance(value, list): |
| return cls._new_element_node(value) |
| args = ArgumentNode(Token('', '', 0, 0, 0, None, '')) |
| args.arguments = [cls._new_element_node(i) for i in value] |
| return ArrayNode(_symbol('['), args, _symbol(']')) |
| |
| @classmethod |
| def _new_element_node(cls, value): |
| # Overwrite in derived class |
| raise RewriterException('Internal error: _new_element_node of MTypeList was called') |
| |
| def _ensure_array_node(self): |
| if not isinstance(self.node, ArrayNode): |
| tmp = self.node |
| self.node = self.new_node() |
| self.node.args.arguments = [tmp] |
| |
| @staticmethod |
| def _check_is_equal(node, value) -> bool: |
| # Overwrite in derived class |
| return False |
| |
| @staticmethod |
| def _check_regex_matches(node, regex: str) -> bool: |
| # Overwrite in derived class |
| return False |
| |
| def get_node(self): |
| if isinstance(self.node, ArrayNode): |
| if len(self.node.args.arguments) == 1: |
| return self.node.args.arguments[0] |
| return self.node |
| |
| @classmethod |
| def supported_element_nodes(cls): |
| # Overwrite in derived class |
| return [] |
| |
| @classmethod |
| def supported_nodes(cls): |
| return [ArrayNode] + cls.supported_element_nodes() |
| |
| def add_value(self, value): |
| if not isinstance(value, list): |
| value = [value] |
| self._ensure_array_node() |
| for i in value: |
| self.node.args.arguments += [self._new_element_node(i)] |
| |
| def _remove_helper(self, value, equal_func): |
| def check_remove_node(node): |
| for j in value: |
| if equal_func(i, j): |
| return True |
| return False |
| |
| if not isinstance(value, list): |
| value = [value] |
| self._ensure_array_node() |
| removed_list = [] |
| for i in self.node.args.arguments: |
| if not check_remove_node(i): |
| removed_list += [i] |
| self.node.args.arguments = removed_list |
| |
| def remove_value(self, value): |
| self._remove_helper(value, self._check_is_equal) |
| |
| def remove_regex(self, regex: str): |
| self._remove_helper(regex, self._check_regex_matches) |
| |
| class MTypeStrList(MTypeList): |
| def __init__(self, node: T.Optional[BaseNode] = None): |
| super().__init__(node) |
| |
| @classmethod |
| def _new_element_node(cls, value): |
| return StringNode(Token('string', '', 0, 0, 0, None, str(value))) |
| |
| @staticmethod |
| def _check_is_equal(node, value) -> bool: |
| if isinstance(node, StringNode): |
| return node.value == value |
| return False |
| |
| @staticmethod |
| def _check_regex_matches(node, regex: str) -> bool: |
| if isinstance(node, StringNode): |
| return re.match(regex, node.value) is not None |
| return False |
| |
| @classmethod |
| def supported_element_nodes(cls): |
| return [StringNode] |
| |
| class MTypeIDList(MTypeList): |
| def __init__(self, node: T.Optional[BaseNode] = None): |
| super().__init__(node) |
| |
| @classmethod |
| def _new_element_node(cls, value): |
| return IdNode(Token('', '', 0, 0, 0, None, str(value))) |
| |
| @staticmethod |
| def _check_is_equal(node, value) -> bool: |
| if isinstance(node, IdNode): |
| return node.value == value |
| return False |
| |
| @staticmethod |
| def _check_regex_matches(node, regex: str) -> bool: |
| if isinstance(node, StringNode): |
| return re.match(regex, node.value) is not None |
| return False |
| |
| @classmethod |
| def supported_element_nodes(cls): |
| return [IdNode] |
| |
| rewriter_keys = { |
| 'default_options': { |
| 'operation': (str, None, ['set', 'delete']), |
| 'options': (dict, {}, None) |
| }, |
| 'kwargs': { |
| 'function': (str, None, None), |
| 'id': (str, None, None), |
| 'operation': (str, None, ['set', 'delete', 'add', 'remove', 'remove_regex', 'info']), |
| 'kwargs': (dict, {}, None) |
| }, |
| 'target': { |
| 'target': (str, None, None), |
| 'operation': (str, None, ['src_add', 'src_rm', 'target_rm', 'target_add', 'extra_files_add', 'extra_files_rm', 'info']), |
| 'sources': (list, [], None), |
| 'subdir': (str, '', None), |
| 'target_type': (str, 'executable', ['both_libraries', 'executable', 'jar', 'library', 'shared_library', 'shared_module', 'static_library']), |
| } |
| } |
| |
| rewriter_func_kwargs = { |
| 'dependency': { |
| 'language': MTypeStr, |
| 'method': MTypeStr, |
| 'native': MTypeBool, |
| 'not_found_message': MTypeStr, |
| 'required': MTypeBool, |
| 'static': MTypeBool, |
| 'version': MTypeStrList, |
| 'modules': MTypeStrList |
| }, |
| 'target': { |
| 'build_by_default': MTypeBool, |
| 'build_rpath': MTypeStr, |
| 'dependencies': MTypeIDList, |
| 'gui_app': MTypeBool, |
| 'link_with': MTypeIDList, |
| 'export_dynamic': MTypeBool, |
| 'implib': MTypeBool, |
| 'install': MTypeBool, |
| 'install_dir': MTypeStr, |
| 'install_rpath': MTypeStr, |
| 'pie': MTypeBool |
| }, |
| 'project': { |
| 'default_options': MTypeStrList, |
| 'meson_version': MTypeStr, |
| 'license': MTypeStrList, |
| 'subproject_dir': MTypeStr, |
| 'version': MTypeStr |
| } |
| } |
| |
| class Rewriter: |
| def __init__(self, sourcedir: str, generator: str = 'ninja', skip_errors: bool = False): |
| self.sourcedir = sourcedir |
| self.interpreter = IntrospectionInterpreter(sourcedir, '', generator, visitors = [AstIDGenerator(), AstIndentationGenerator(), AstConditionLevel()]) |
| self.skip_errors = skip_errors |
| self.modified_nodes = [] |
| self.to_remove_nodes = [] |
| self.to_add_nodes = [] |
| self.functions = { |
| 'default_options': self.process_default_options, |
| 'kwargs': self.process_kwargs, |
| 'target': self.process_target, |
| } |
| self.info_dump = None |
| |
| def analyze_meson(self): |
| mlog.log('Analyzing meson file:', mlog.bold(os.path.join(self.sourcedir, environment.build_filename))) |
| self.interpreter.analyze() |
| mlog.log(' -- Project:', mlog.bold(self.interpreter.project_data['descriptive_name'])) |
| mlog.log(' -- Version:', mlog.cyan(self.interpreter.project_data['version'])) |
| |
| def add_info(self, cmd_type: str, cmd_id: str, data: dict): |
| if self.info_dump is None: |
| self.info_dump = {} |
| if cmd_type not in self.info_dump: |
| self.info_dump[cmd_type] = {} |
| self.info_dump[cmd_type][cmd_id] = data |
| |
| def print_info(self): |
| if self.info_dump is None: |
| return |
| sys.stderr.write(json.dumps(self.info_dump, indent=2)) |
| |
| def on_error(self): |
| if self.skip_errors: |
| return mlog.cyan('-->'), mlog.yellow('skipping') |
| return mlog.cyan('-->'), mlog.red('aborting') |
| |
| def handle_error(self): |
| if self.skip_errors: |
| return None |
| raise MesonException('Rewriting the meson.build failed') |
| |
| def find_target(self, target: str): |
| def check_list(name: str) -> T.List[BaseNode]: |
| result = [] |
| for i in self.interpreter.targets: |
| if name in {i['name'], i['id']}: |
| result += [i] |
| return result |
| |
| targets = check_list(target) |
| if targets: |
| if len(targets) == 1: |
| return targets[0] |
| else: |
| mlog.error('There are multiple targets matching', mlog.bold(target)) |
| for i in targets: |
| mlog.error(' -- Target name', mlog.bold(i['name']), 'with ID', mlog.bold(i['id'])) |
| mlog.error('Please try again with the unique ID of the target', *self.on_error()) |
| self.handle_error() |
| return None |
| |
| # Check the assignments |
| tgt = None |
| if target in self.interpreter.assignments: |
| node = self.interpreter.assignments[target] |
| if isinstance(node, FunctionNode): |
| if node.func_name.value in {'executable', 'jar', 'library', 'shared_library', 'shared_module', 'static_library', 'both_libraries'}: |
| tgt = self.interpreter.assign_vals[target] |
| |
| return tgt |
| |
| def find_dependency(self, dependency: str): |
| def check_list(name: str): |
| for i in self.interpreter.dependencies: |
| if name == i['name']: |
| return i |
| return None |
| |
| dep = check_list(dependency) |
| if dep is not None: |
| return dep |
| |
| # Check the assignments |
| if dependency in self.interpreter.assignments: |
| node = self.interpreter.assignments[dependency] |
| if isinstance(node, FunctionNode): |
| if node.func_name.value == 'dependency': |
| name = self.interpreter.flatten_args(node.args)[0] |
| dep = check_list(name) |
| |
| return dep |
| |
| @RequiredKeys(rewriter_keys['default_options']) |
| def process_default_options(self, cmd): |
| # First, remove the old values |
| kwargs_cmd = { |
| 'function': 'project', |
| 'id': "/", |
| 'operation': 'remove_regex', |
| 'kwargs': { |
| 'default_options': [f'{x}=.*' for x in cmd['options'].keys()] |
| } |
| } |
| self.process_kwargs(kwargs_cmd) |
| |
| # Then add the new values |
| if cmd['operation'] != 'set': |
| return |
| |
| kwargs_cmd['operation'] = 'add' |
| kwargs_cmd['kwargs']['default_options'] = [] |
| |
| cdata = self.interpreter.coredata |
| options = { |
| **{str(k): v for k, v in cdata.optstore.items()}, |
| **{str(k): v for k, v in cdata.optstore.items()}, |
| **{str(k): v for k, v in cdata.optstore.items()}, |
| **{str(k): v for k, v in cdata.optstore.items()}, |
| **{str(k): v for k, v in cdata.optstore.items()}, |
| } |
| |
| for key, val in sorted(cmd['options'].items()): |
| if key not in options: |
| mlog.error('Unknown options', mlog.bold(key), *self.on_error()) |
| self.handle_error() |
| continue |
| |
| try: |
| val = options[key].validate_value(val) |
| except MesonException as e: |
| mlog.error('Unable to set', mlog.bold(key), mlog.red(str(e)), *self.on_error()) |
| self.handle_error() |
| continue |
| |
| kwargs_cmd['kwargs']['default_options'] += [f'{key}={val}'] |
| |
| self.process_kwargs(kwargs_cmd) |
| |
| @RequiredKeys(rewriter_keys['kwargs']) |
| def process_kwargs(self, cmd): |
| mlog.log('Processing function type', mlog.bold(cmd['function']), 'with id', mlog.cyan("'" + cmd['id'] + "'")) |
| if cmd['function'] not in rewriter_func_kwargs: |
| mlog.error('Unknown function type', cmd['function'], *self.on_error()) |
| return self.handle_error() |
| kwargs_def = rewriter_func_kwargs[cmd['function']] |
| |
| # Find the function node to modify |
| node = None |
| arg_node = None |
| if cmd['function'] == 'project': |
| # msys bash may expand '/' to a path. It will mangle '//' to '/' |
| # but in order to keep usage shell-agnostic, also allow `//` as |
| # the function ID such that it will work in both msys bash and |
| # other shells. |
| if {'/', '//'}.isdisjoint({cmd['id']}): |
| mlog.error('The ID for the function type project must be "/" or "//" not "' + cmd['id'] + '"', *self.on_error()) |
| return self.handle_error() |
| node = self.interpreter.project_node |
| arg_node = node.args |
| elif cmd['function'] == 'target': |
| tmp = self.find_target(cmd['id']) |
| if tmp: |
| node = tmp['node'] |
| arg_node = node.args |
| elif cmd['function'] == 'dependency': |
| tmp = self.find_dependency(cmd['id']) |
| if tmp: |
| node = tmp['node'] |
| arg_node = node.args |
| if not node: |
| mlog.error('Unable to find the function node') |
| assert isinstance(node, FunctionNode) |
| assert isinstance(arg_node, ArgumentNode) |
| # Transform the key nodes to plain strings |
| arg_node.kwargs = {k.value: v for k, v in arg_node.kwargs.items()} |
| |
| # Print kwargs info |
| if cmd['operation'] == 'info': |
| info_data = {} |
| for key, val in sorted(arg_node.kwargs.items()): |
| info_data[key] = None |
| if isinstance(val, ElementaryNode): |
| info_data[key] = val.value |
| elif isinstance(val, ArrayNode): |
| data_list = [] |
| for i in val.args.arguments: |
| element = None |
| if isinstance(i, ElementaryNode): |
| element = i.value |
| data_list += [element] |
| info_data[key] = data_list |
| |
| self.add_info('kwargs', '{}#{}'.format(cmd['function'], cmd['id']), info_data) |
| return # Nothing else to do |
| |
| # Modify the kwargs |
| num_changed = 0 |
| for key, val in sorted(cmd['kwargs'].items()): |
| if key not in kwargs_def: |
| mlog.error('Cannot modify unknown kwarg', mlog.bold(key), *self.on_error()) |
| self.handle_error() |
| continue |
| |
| if cmd['operation'] == 'delete': |
| # Remove the key from the kwargs |
| if key not in arg_node.kwargs: |
| mlog.log(' -- Key', mlog.bold(key), 'is already deleted') |
| continue |
| mlog.log(' -- Deleting', mlog.bold(key), 'from the kwargs') |
| del arg_node.kwargs[key] |
| elif cmd['operation'] == 'set': |
| # Replace the key from the kwargs |
| mlog.log(' -- Setting', mlog.bold(key), 'to', mlog.yellow(str(val))) |
| arg_node.kwargs[key] = kwargs_def[key].new_node(val) |
| else: |
| # Modify the value from the kwargs |
| |
| if key not in arg_node.kwargs: |
| arg_node.kwargs[key] = None |
| modifier = kwargs_def[key](arg_node.kwargs[key]) |
| if not modifier.can_modify(): |
| mlog.log(' -- Skipping', mlog.bold(key), 'because it is too complex to modify') |
| continue |
| |
| # Apply the operation |
| val_str = str(val) |
| if cmd['operation'] == 'add': |
| mlog.log(' -- Adding', mlog.yellow(val_str), 'to', mlog.bold(key)) |
| modifier.add_value(val) |
| elif cmd['operation'] == 'remove': |
| mlog.log(' -- Removing', mlog.yellow(val_str), 'from', mlog.bold(key)) |
| modifier.remove_value(val) |
| elif cmd['operation'] == 'remove_regex': |
| mlog.log(' -- Removing all values matching', mlog.yellow(val_str), 'from', mlog.bold(key)) |
| modifier.remove_regex(val) |
| |
| # Write back the result |
| arg_node.kwargs[key] = modifier.get_node() |
| |
| num_changed += 1 |
| |
| # Convert the keys back to IdNode's |
| arg_node.kwargs = {IdNode(Token('', '', 0, 0, 0, None, k)): v for k, v in arg_node.kwargs.items()} |
| for k, v in arg_node.kwargs.items(): |
| k.level = v.level |
| if num_changed > 0 and node not in self.modified_nodes: |
| self.modified_nodes += [node] |
| |
| def find_assignment_node(self, node: BaseNode) -> AssignmentNode: |
| if node.ast_id and node.ast_id in self.interpreter.reverse_assignment: |
| return self.interpreter.reverse_assignment[node.ast_id] |
| return None |
| |
| @RequiredKeys(rewriter_keys['target']) |
| def process_target(self, cmd): |
| mlog.log('Processing target', mlog.bold(cmd['target']), 'operation', mlog.cyan(cmd['operation'])) |
| target = self.find_target(cmd['target']) |
| if target is None and cmd['operation'] != 'target_add': |
| mlog.error('Unknown target', mlog.bold(cmd['target']), *self.on_error()) |
| return self.handle_error() |
| |
| # Make source paths relative to the current subdir |
| def rel_source(src: str) -> str: |
| subdir = os.path.abspath(os.path.join(self.sourcedir, target['subdir'])) |
| if os.path.isabs(src): |
| return os.path.relpath(src, subdir) |
| elif not os.path.exists(src): |
| return src # Trust the user when the source doesn't exist |
| # Make sure that the path is relative to the subdir |
| return os.path.relpath(os.path.abspath(src), subdir) |
| |
| if target is not None: |
| cmd['sources'] = [rel_source(x) for x in cmd['sources']] |
| |
| # Utility function to get a list of the sources from a node |
| def arg_list_from_node(n): |
| args = [] |
| if isinstance(n, FunctionNode): |
| args = list(n.args.arguments) |
| if n.func_name.value in BUILD_TARGET_FUNCTIONS: |
| args.pop(0) |
| elif isinstance(n, ArrayNode): |
| args = n.args.arguments |
| elif isinstance(n, ArgumentNode): |
| args = n.arguments |
| return args |
| |
| to_sort_nodes = [] |
| |
| if cmd['operation'] == 'src_add': |
| node = None |
| if target['sources']: |
| node = target['sources'][0] |
| else: |
| node = target['node'] |
| assert node is not None |
| |
| # Generate the current source list |
| src_list = [] |
| for i in target['sources']: |
| for j in arg_list_from_node(i): |
| if isinstance(j, StringNode): |
| src_list += [j.value] |
| |
| # Generate the new String nodes |
| to_append = [] |
| for i in sorted(set(cmd['sources'])): |
| if i in src_list: |
| mlog.log(' -- Source', mlog.green(i), 'is already defined for the target --> skipping') |
| continue |
| mlog.log(' -- Adding source', mlog.green(i), 'at', |
| mlog.yellow(f'{node.filename}:{node.lineno}')) |
| token = Token('string', node.filename, 0, 0, 0, None, i) |
| to_append += [StringNode(token)] |
| |
| # Append to the AST at the right place |
| arg_node = None |
| if isinstance(node, (FunctionNode, ArrayNode)): |
| arg_node = node.args |
| elif isinstance(node, ArgumentNode): |
| arg_node = node |
| assert arg_node is not None |
| arg_node.arguments += to_append |
| |
| # Mark the node as modified |
| if arg_node not in to_sort_nodes and not isinstance(node, FunctionNode): |
| to_sort_nodes += [arg_node] |
| if node not in self.modified_nodes: |
| self.modified_nodes += [node] |
| |
| elif cmd['operation'] == 'src_rm': |
| # Helper to find the exact string node and its parent |
| def find_node(src): |
| for i in target['sources']: |
| for j in arg_list_from_node(i): |
| if isinstance(j, StringNode): |
| if j.value == src: |
| return i, j |
| return None, None |
| |
| for i in cmd['sources']: |
| # Try to find the node with the source string |
| root, string_node = find_node(i) |
| if root is None: |
| mlog.warning(' -- Unable to find source', mlog.green(i), 'in the target') |
| continue |
| |
| # Remove the found string node from the argument list |
| arg_node = None |
| if isinstance(root, (FunctionNode, ArrayNode)): |
| arg_node = root.args |
| elif isinstance(root, ArgumentNode): |
| arg_node = root |
| assert arg_node is not None |
| mlog.log(' -- Removing source', mlog.green(i), 'from', |
| mlog.yellow(f'{string_node.filename}:{string_node.lineno}')) |
| arg_node.arguments.remove(string_node) |
| |
| # Mark the node as modified |
| if arg_node not in to_sort_nodes and not isinstance(root, FunctionNode): |
| to_sort_nodes += [arg_node] |
| if root not in self.modified_nodes: |
| self.modified_nodes += [root] |
| |
| elif cmd['operation'] == 'extra_files_add': |
| tgt_function: FunctionNode = target['node'] |
| mark_array = True |
| try: |
| node = target['extra_files'][0] |
| except IndexError: |
| # Specifying `extra_files` with a list that flattens to empty gives an empty |
| # target['extra_files'] list, account for that. |
| try: |
| extra_files_key = next(k for k in tgt_function.args.kwargs.keys() if isinstance(k, IdNode) and k.value == 'extra_files') |
| node = tgt_function.args.kwargs[extra_files_key] |
| except StopIteration: |
| # Target has no extra_files kwarg, create one |
| node = ArrayNode(_symbol('['), ArgumentNode(Token('', tgt_function.filename, 0, 0, 0, None, '[]')), _symbol(']')) |
| tgt_function.args.kwargs[IdNode(Token('string', tgt_function.filename, 0, 0, 0, None, 'extra_files'))] = node |
| mark_array = False |
| if tgt_function not in self.modified_nodes: |
| self.modified_nodes += [tgt_function] |
| target['extra_files'] = [node] |
| if isinstance(node, IdNode): |
| node = self.interpreter.assignments[node.value] |
| target['extra_files'] = [node] |
| if not isinstance(node, ArrayNode): |
| mlog.error('Target', mlog.bold(cmd['target']), 'extra_files argument must be a list', *self.on_error()) |
| return self.handle_error() |
| |
| # Generate the current extra files list |
| extra_files_list = [] |
| for i in target['extra_files']: |
| for j in arg_list_from_node(i): |
| if isinstance(j, StringNode): |
| extra_files_list += [j.value] |
| |
| # Generate the new String nodes |
| to_append = [] |
| for i in sorted(set(cmd['sources'])): |
| if i in extra_files_list: |
| mlog.log(' -- Extra file', mlog.green(i), 'is already defined for the target --> skipping') |
| continue |
| mlog.log(' -- Adding extra file', mlog.green(i), 'at', |
| mlog.yellow(f'{node.filename}:{node.lineno}')) |
| token = Token('string', node.filename, 0, 0, 0, None, i) |
| to_append += [StringNode(token)] |
| |
| # Append to the AST at the right place |
| arg_node = node.args |
| arg_node.arguments += to_append |
| |
| # Mark the node as modified |
| if arg_node not in to_sort_nodes: |
| to_sort_nodes += [arg_node] |
| # If the extra_files array is newly created, don't mark it as its parent function node already is, |
| # otherwise this would cause double modification. |
| if mark_array and node not in self.modified_nodes: |
| self.modified_nodes += [node] |
| |
| elif cmd['operation'] == 'extra_files_rm': |
| # Helper to find the exact string node and its parent |
| def find_node(src): |
| for i in target['extra_files']: |
| for j in arg_list_from_node(i): |
| if isinstance(j, StringNode): |
| if j.value == src: |
| return i, j |
| return None, None |
| |
| for i in cmd['sources']: |
| # Try to find the node with the source string |
| root, string_node = find_node(i) |
| if root is None: |
| mlog.warning(' -- Unable to find extra file', mlog.green(i), 'in the target') |
| continue |
| |
| # Remove the found string node from the argument list |
| arg_node = root.args |
| mlog.log(' -- Removing extra file', mlog.green(i), 'from', |
| mlog.yellow(f'{string_node.filename}:{string_node.lineno}')) |
| arg_node.arguments.remove(string_node) |
| |
| # Mark the node as modified |
| if arg_node not in to_sort_nodes and not isinstance(root, FunctionNode): |
| to_sort_nodes += [arg_node] |
| if root not in self.modified_nodes: |
| self.modified_nodes += [root] |
| |
| elif cmd['operation'] == 'target_add': |
| if target is not None: |
| mlog.error('Can not add target', mlog.bold(cmd['target']), 'because it already exists', *self.on_error()) |
| return self.handle_error() |
| |
| id_base = re.sub(r'[- ]', '_', cmd['target']) |
| target_id = id_base + '_exe' if cmd['target_type'] == 'executable' else '_lib' |
| source_id = id_base + '_sources' |
| filename = os.path.join(cmd['subdir'], environment.build_filename) |
| |
| # Build src list |
| src_arg_node = ArgumentNode(Token('string', filename, 0, 0, 0, None, '')) |
| src_arr_node = ArrayNode(_symbol('['), src_arg_node, _symbol(']')) |
| src_far_node = ArgumentNode(Token('string', filename, 0, 0, 0, None, '')) |
| src_fun_node = FunctionNode(IdNode(Token('id', filename, 0, 0, 0, (0, 0), 'files')), _symbol('('), src_far_node, _symbol(')')) |
| src_ass_node = AssignmentNode(IdNode(Token('id', filename, 0, 0, 0, (0, 0), source_id)), _symbol('='), src_fun_node) |
| src_arg_node.arguments = [StringNode(Token('string', filename, 0, 0, 0, None, x)) for x in cmd['sources']] |
| src_far_node.arguments = [src_arr_node] |
| |
| # Build target |
| tgt_arg_node = ArgumentNode(Token('string', filename, 0, 0, 0, None, '')) |
| tgt_fun_node = FunctionNode(IdNode(Token('id', filename, 0, 0, 0, (0, 0), cmd['target_type'])), _symbol('('), tgt_arg_node, _symbol(')')) |
| tgt_ass_node = AssignmentNode(IdNode(Token('id', filename, 0, 0, 0, (0, 0), target_id)), _symbol('='), tgt_fun_node) |
| tgt_arg_node.arguments = [ |
| StringNode(Token('string', filename, 0, 0, 0, None, cmd['target'])), |
| IdNode(Token('string', filename, 0, 0, 0, None, source_id)) |
| ] |
| |
| src_ass_node.accept(AstIndentationGenerator()) |
| tgt_ass_node.accept(AstIndentationGenerator()) |
| self.to_add_nodes += [src_ass_node, tgt_ass_node] |
| |
| elif cmd['operation'] == 'target_rm': |
| to_remove = self.find_assignment_node(target['node']) |
| if to_remove is None: |
| to_remove = target['node'] |
| self.to_remove_nodes += [to_remove] |
| mlog.log(' -- Removing target', mlog.green(cmd['target']), 'at', |
| mlog.yellow(f'{to_remove.filename}:{to_remove.lineno}')) |
| |
| elif cmd['operation'] == 'info': |
| # T.List all sources in the target |
| src_list = [] |
| for i in target['sources']: |
| for j in arg_list_from_node(i): |
| if isinstance(j, StringNode): |
| src_list += [j.value] |
| extra_files_list = [] |
| for i in target['extra_files']: |
| for j in arg_list_from_node(i): |
| if isinstance(j, StringNode): |
| extra_files_list += [j.value] |
| test_data = { |
| 'name': target['name'], |
| 'sources': src_list, |
| 'extra_files': extra_files_list |
| } |
| self.add_info('target', target['id'], test_data) |
| |
| # Sort files |
| for i in to_sort_nodes: |
| convert = lambda text: int(text) if text.isdigit() else text.lower() |
| alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] |
| path_sorter = lambda key: ([(key.count('/') <= idx, alphanum_key(x)) for idx, x in enumerate(key.split('/'))]) |
| |
| unknown = [x for x in i.arguments if not isinstance(x, StringNode)] |
| sources = [x for x in i.arguments if isinstance(x, StringNode)] |
| sources = sorted(sources, key=lambda x: path_sorter(x.value)) |
| i.arguments = unknown + sources |
| |
| def process(self, cmd): |
| if 'type' not in cmd: |
| raise RewriterException('Command has no key "type"') |
| if cmd['type'] not in self.functions: |
| raise RewriterException('Unknown command "{}". Supported commands are: {}' |
| .format(cmd['type'], list(self.functions.keys()))) |
| self.functions[cmd['type']](cmd) |
| |
| def apply_changes(self): |
| assert all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'filename') for x in self.modified_nodes) |
| assert all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'filename') for x in self.to_remove_nodes) |
| assert all(isinstance(x, (ArrayNode, FunctionNode)) for x in self.modified_nodes) |
| assert all(isinstance(x, (ArrayNode, AssignmentNode, FunctionNode)) for x in self.to_remove_nodes) |
| # Sort based on line and column in reversed order |
| work_nodes = [{'node': x, 'action': 'modify'} for x in self.modified_nodes] |
| work_nodes += [{'node': x, 'action': 'rm'} for x in self.to_remove_nodes] |
| work_nodes = sorted(work_nodes, key=lambda x: (x['node'].lineno, x['node'].colno), reverse=True) |
| work_nodes += [{'node': x, 'action': 'add'} for x in self.to_add_nodes] |
| |
| # Generating the new replacement string |
| str_list = [] |
| for i in work_nodes: |
| new_data = '' |
| if i['action'] == 'modify' or i['action'] == 'add': |
| printer = AstPrinter() |
| i['node'].accept(printer) |
| printer.post_process() |
| new_data = printer.result.strip() |
| data = { |
| 'file': i['node'].filename, |
| 'str': new_data, |
| 'node': i['node'], |
| 'action': i['action'] |
| } |
| str_list += [data] |
| |
| # Load build files |
| files = {} |
| for i in str_list: |
| if i['file'] in files: |
| continue |
| fpath = os.path.realpath(os.path.join(self.sourcedir, i['file'])) |
| fdata = '' |
| # Create an empty file if it does not exist |
| if not os.path.exists(fpath): |
| with open(fpath, 'w', encoding='utf-8'): |
| pass |
| with open(fpath, encoding='utf-8') as fp: |
| fdata = fp.read() |
| |
| # Generate line offsets numbers |
| m_lines = fdata.splitlines(True) |
| offset = 0 |
| line_offsets = [] |
| for j in m_lines: |
| line_offsets += [offset] |
| offset += len(j) |
| |
| files[i['file']] = { |
| 'path': fpath, |
| 'raw': fdata, |
| 'offsets': line_offsets |
| } |
| |
| # Replace in source code |
| def remove_node(i): |
| offsets = files[i['file']]['offsets'] |
| raw = files[i['file']]['raw'] |
| node = i['node'] |
| line = node.lineno - 1 |
| col = node.colno |
| start = offsets[line] + col |
| end = start |
| if isinstance(node, (ArrayNode, FunctionNode)): |
| end = offsets[node.end_lineno - 1] + node.end_colno |
| |
| # Only removal is supported for assignments |
| elif isinstance(node, AssignmentNode) and i['action'] == 'rm': |
| if isinstance(node.value, (ArrayNode, FunctionNode)): |
| remove_node({'file': i['file'], 'str': '', 'node': node.value, 'action': 'rm'}) |
| raw = files[i['file']]['raw'] |
| while raw[end] != '=': |
| end += 1 |
| end += 1 # Handle the '=' |
| while raw[end] in {' ', '\n', '\t'}: |
| end += 1 |
| |
| files[i['file']]['raw'] = raw[:start] + i['str'] + raw[end:] |
| |
| for i in str_list: |
| if i['action'] in {'modify', 'rm'}: |
| remove_node(i) |
| elif i['action'] == 'add': |
| files[i['file']]['raw'] += i['str'] + '\n' |
| |
| # Write the files back |
| for key, val in files.items(): |
| mlog.log('Rewriting', mlog.yellow(key)) |
| with open(val['path'], 'w', encoding='utf-8') as fp: |
| fp.write(val['raw']) |
| |
| target_operation_map = { |
| 'add': 'src_add', |
| 'rm': 'src_rm', |
| 'add_target': 'target_add', |
| 'rm_target': 'target_rm', |
| 'add_extra_files': 'extra_files_add', |
| 'rm_extra_files': 'extra_files_rm', |
| 'info': 'info', |
| } |
| |
| def list_to_dict(in_list: T.List[str]) -> T.Dict[str, str]: |
| result = {} |
| it = iter(in_list) |
| try: |
| for i in it: |
| # calling next(it) is not a mistake, we're taking the next element from |
| # the iterator, avoiding the need to preprocess it into a sequence of |
| # key value pairs. |
| result[i] = next(it) |
| except StopIteration: |
| raise TypeError('in_list parameter of list_to_dict must have an even length.') |
| return result |
| |
| def generate_target(options) -> T.List[dict]: |
| return [{ |
| 'type': 'target', |
| 'target': options.target, |
| 'operation': target_operation_map[options.operation], |
| 'sources': options.sources, |
| 'subdir': options.subdir, |
| 'target_type': options.tgt_type, |
| }] |
| |
| def generate_kwargs(options) -> T.List[dict]: |
| return [{ |
| 'type': 'kwargs', |
| 'function': options.function, |
| 'id': options.id, |
| 'operation': options.operation, |
| 'kwargs': list_to_dict(options.kwargs), |
| }] |
| |
| def generate_def_opts(options) -> T.List[dict]: |
| return [{ |
| 'type': 'default_options', |
| 'operation': options.operation, |
| 'options': list_to_dict(options.options), |
| }] |
| |
| def generate_cmd(options) -> T.List[dict]: |
| if os.path.exists(options.json): |
| with open(options.json, encoding='utf-8') as fp: |
| return json.load(fp) |
| else: |
| return json.loads(options.json) |
| |
| # Map options.type to the actual type name |
| cli_type_map = { |
| 'target': generate_target, |
| 'tgt': generate_target, |
| 'kwargs': generate_kwargs, |
| 'default-options': generate_def_opts, |
| 'def': generate_def_opts, |
| 'command': generate_cmd, |
| 'cmd': generate_cmd, |
| } |
| |
| def run(options): |
| if not options.verbose: |
| mlog.set_quiet() |
| |
| try: |
| setup_vsenv() |
| rewriter = Rewriter(options.sourcedir, skip_errors=options.skip) |
| rewriter.analyze_meson() |
| |
| if options.type is None: |
| mlog.error('No command specified') |
| return 1 |
| |
| commands = cli_type_map[options.type](options) |
| |
| if not isinstance(commands, list): |
| raise TypeError('Command is not a list') |
| |
| for i in commands: |
| if not isinstance(i, object): |
| raise TypeError('Command is not an object') |
| rewriter.process(i) |
| |
| rewriter.apply_changes() |
| rewriter.print_info() |
| return 0 |
| except Exception as e: |
| raise e |
| finally: |
| mlog.set_verbose() |