| # Test class and utilities for functional tests |
| # |
| # Copyright (c) 2018 Red Hat, Inc. |
| # |
| # Author: |
| # Cleber Rosa <crosa@redhat.com> |
| # |
| # This work is licensed under the terms of the GNU GPL, version 2 or |
| # later. See the COPYING file in the top-level directory. |
| |
| import logging |
| import os |
| import sys |
| import uuid |
| import tempfile |
| |
| import avocado |
| |
| #: The QEMU build root directory. It may also be the source directory |
| #: if building from the source dir, but it's safer to use BUILD_DIR for |
| #: that purpose. Be aware that if this code is moved outside of a source |
| #: and build tree, it will not be accurate. |
| BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) |
| |
| if os.path.islink(os.path.dirname(os.path.dirname(__file__))): |
| # The link to the acceptance tests dir in the source code directory |
| lnk = os.path.dirname(os.path.dirname(__file__)) |
| #: The QEMU root source directory |
| SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk))) |
| else: |
| SOURCE_DIR = BUILD_DIR |
| |
| sys.path.append(os.path.join(SOURCE_DIR, 'python')) |
| |
| from qemu.machine import QEMUMachine |
| |
| def is_readable_executable_file(path): |
| return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) |
| |
| |
| def pick_default_qemu_bin(arch=None): |
| """ |
| Picks the path of a QEMU binary, starting either in the current working |
| directory or in the source tree root directory. |
| |
| :param arch: the arch to use when looking for a QEMU binary (the target |
| will match the arch given). If None (the default), arch |
| will be the current host system arch (as given by |
| :func:`os.uname`). |
| :type arch: str |
| :returns: the path to the default QEMU binary or None if one could not |
| be found |
| :rtype: str or None |
| """ |
| if arch is None: |
| arch = os.uname()[4] |
| # qemu binary path does not match arch for powerpc, handle it |
| if 'ppc64le' in arch: |
| arch = 'ppc64' |
| qemu_bin_relative_path = os.path.join("%s-softmmu" % arch, |
| "qemu-system-%s" % arch) |
| if is_readable_executable_file(qemu_bin_relative_path): |
| return qemu_bin_relative_path |
| |
| qemu_bin_from_bld_dir_path = os.path.join(BUILD_DIR, |
| qemu_bin_relative_path) |
| if is_readable_executable_file(qemu_bin_from_bld_dir_path): |
| return qemu_bin_from_bld_dir_path |
| |
| |
| def _console_interaction(test, success_message, failure_message, |
| send_string, keep_sending=False): |
| assert not keep_sending or send_string |
| console = test.vm.console_socket.makefile() |
| console_logger = logging.getLogger('console') |
| while True: |
| if send_string: |
| test.vm.console_socket.sendall(send_string.encode()) |
| if not keep_sending: |
| send_string = None # send only once |
| msg = console.readline().strip() |
| if not msg: |
| continue |
| console_logger.debug(msg) |
| if success_message in msg: |
| break |
| if failure_message and failure_message in msg: |
| console.close() |
| fail = 'Failure message found in console: %s' % failure_message |
| test.fail(fail) |
| |
| def interrupt_interactive_console_until_pattern(test, success_message, |
| failure_message=None, |
| interrupt_string='\r'): |
| """ |
| Keep sending a string to interrupt a console prompt, while logging the |
| console output. Typical use case is to break a boot loader prompt, such: |
| |
| Press a key within 5 seconds to interrupt boot process. |
| 5 |
| 4 |
| 3 |
| 2 |
| 1 |
| Booting default image... |
| |
| :param test: an Avocado test containing a VM that will have its console |
| read and probed for a success or failure message |
| :type test: :class:`avocado_qemu.Test` |
| :param success_message: if this message appears, test succeeds |
| :param failure_message: if this message appears, test fails |
| :param interrupt_string: a string to send to the console before trying |
| to read a new line |
| """ |
| _console_interaction(test, success_message, failure_message, |
| interrupt_string, True) |
| |
| def wait_for_console_pattern(test, success_message, failure_message=None): |
| """ |
| Waits for messages to appear on the console, while logging the content |
| |
| :param test: an Avocado test containing a VM that will have its console |
| read and probed for a success or failure message |
| :type test: :class:`avocado_qemu.Test` |
| :param success_message: if this message appears, test succeeds |
| :param failure_message: if this message appears, test fails |
| """ |
| _console_interaction(test, success_message, failure_message, None) |
| |
| def exec_command_and_wait_for_pattern(test, command, |
| success_message, failure_message=None): |
| """ |
| Send a command to a console (appending CRLF characters), then wait |
| for success_message to appear on the console, while logging the. |
| content. Mark the test as failed if failure_message is found instead. |
| |
| :param test: an Avocado test containing a VM that will have its console |
| read and probed for a success or failure message |
| :type test: :class:`avocado_qemu.Test` |
| :param command: the command to send |
| :param success_message: if this message appears, test succeeds |
| :param failure_message: if this message appears, test fails |
| """ |
| _console_interaction(test, success_message, failure_message, command + '\r') |
| |
| class Test(avocado.Test): |
| def _get_unique_tag_val(self, tag_name): |
| """ |
| Gets a tag value, if unique for a key |
| """ |
| vals = self.tags.get(tag_name, []) |
| if len(vals) == 1: |
| return vals.pop() |
| return None |
| |
| def setUp(self): |
| self._vms = {} |
| |
| self.arch = self.params.get('arch', |
| default=self._get_unique_tag_val('arch')) |
| |
| self.machine = self.params.get('machine', |
| default=self._get_unique_tag_val('machine')) |
| |
| default_qemu_bin = pick_default_qemu_bin(arch=self.arch) |
| self.qemu_bin = self.params.get('qemu_bin', |
| default=default_qemu_bin) |
| if self.qemu_bin is None: |
| self.cancel("No QEMU binary defined or found in the build tree") |
| |
| def _new_vm(self, *args): |
| vm = QEMUMachine(self.qemu_bin, sock_dir=tempfile.mkdtemp()) |
| if args: |
| vm.add_args(*args) |
| return vm |
| |
| @property |
| def vm(self): |
| return self.get_vm(name='default') |
| |
| def get_vm(self, *args, name=None): |
| if not name: |
| name = str(uuid.uuid4()) |
| if self._vms.get(name) is None: |
| self._vms[name] = self._new_vm(*args) |
| if self.machine is not None: |
| self._vms[name].set_machine(self.machine) |
| return self._vms[name] |
| |
| def tearDown(self): |
| for vm in self._vms.values(): |
| vm.shutdown() |