| #!/usr/bin/python |
| # |
| # Low-level QEMU shell on top of QMP. |
| # |
| # Copyright (C) 2009, 2010 Red Hat Inc. |
| # |
| # Authors: |
| # Luiz Capitulino <lcapitulino@redhat.com> |
| # |
| # This work is licensed under the terms of the GNU GPL, version 2. See |
| # the COPYING file in the top-level directory. |
| # |
| # Usage: |
| # |
| # Start QEMU with: |
| # |
| # # qemu [...] -qmp unix:./qmp-sock,server |
| # |
| # Run the shell: |
| # |
| # $ qmp-shell ./qmp-sock |
| # |
| # Commands have the following format: |
| # |
| # < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] |
| # |
| # For example: |
| # |
| # (QEMU) device_add driver=e1000 id=net1 |
| # {u'return': {}} |
| # (QEMU) |
| # |
| # key=value pairs also support Python or JSON object literal subset notations, |
| # without spaces. Dictionaries/objects {} are supported as are arrays []. |
| # |
| # example-command arg-name1={'key':'value','obj'={'prop':"value"}} |
| # |
| # Both JSON and Python formatting should work, including both styles of |
| # string literal quotes. Both paradigms of literal values should work, |
| # including null/true/false for JSON and None/True/False for Python. |
| # |
| # |
| # Transactions have the following multi-line format: |
| # |
| # transaction( |
| # action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] |
| # ... |
| # action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] |
| # ) |
| # |
| # One line transactions are also supported: |
| # |
| # transaction( action-name1 ... ) |
| # |
| # For example: |
| # |
| # (QEMU) transaction( |
| # TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 |
| # TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 |
| # TRANS> ) |
| # {"return": {}} |
| # (QEMU) |
| # |
| # Use the -v and -p options to activate the verbose and pretty-print options, |
| # which will echo back the properly formatted JSON-compliant QMP that is being |
| # sent to QEMU, which is useful for debugging and documentation generation. |
| |
| from __future__ import print_function |
| import json |
| import ast |
| import readline |
| import sys |
| import os |
| import errno |
| import atexit |
| import re |
| |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) |
| from qemu import qmp |
| |
| class QMPCompleter(list): |
| def complete(self, text, state): |
| for cmd in self: |
| if cmd.startswith(text): |
| if not state: |
| return cmd |
| else: |
| state -= 1 |
| |
| class QMPShellError(Exception): |
| pass |
| |
| class QMPShellBadPort(QMPShellError): |
| pass |
| |
| class FuzzyJSON(ast.NodeTransformer): |
| '''This extension of ast.NodeTransformer filters literal "true/false/null" |
| values in an AST and replaces them by proper "True/False/None" values that |
| Python can properly evaluate.''' |
| def visit_Name(self, node): |
| if node.id == 'true': |
| node.id = 'True' |
| if node.id == 'false': |
| node.id = 'False' |
| if node.id == 'null': |
| node.id = 'None' |
| return node |
| |
| # TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and |
| # _execute_cmd()). Let's design a better one. |
| class QMPShell(qmp.QEMUMonitorProtocol): |
| def __init__(self, address, pretty=False): |
| super(QMPShell, self).__init__(self.__get_address(address)) |
| self._greeting = None |
| self._completer = None |
| self._pretty = pretty |
| self._transmode = False |
| self._actions = list() |
| self._histfile = os.path.join(os.path.expanduser('~'), |
| '.qmp-shell_history') |
| |
| def __get_address(self, arg): |
| """ |
| Figure out if the argument is in the port:host form, if it's not it's |
| probably a file path. |
| """ |
| addr = arg.split(':') |
| if len(addr) == 2: |
| try: |
| port = int(addr[1]) |
| except ValueError: |
| raise QMPShellBadPort |
| return ( addr[0], port ) |
| # socket path |
| return arg |
| |
| def _fill_completion(self): |
| cmds = self.cmd('query-commands') |
| if 'error' in cmds: |
| return |
| for cmd in cmds['return']: |
| self._completer.append(cmd['name']) |
| |
| def __completer_setup(self): |
| self._completer = QMPCompleter() |
| self._fill_completion() |
| readline.set_history_length(1024) |
| readline.set_completer(self._completer.complete) |
| readline.parse_and_bind("tab: complete") |
| # XXX: default delimiters conflict with some command names (eg. query-), |
| # clearing everything as it doesn't seem to matter |
| readline.set_completer_delims('') |
| try: |
| readline.read_history_file(self._histfile) |
| except Exception as e: |
| if isinstance(e, IOError) and e.errno == errno.ENOENT: |
| # File not found. No problem. |
| pass |
| else: |
| print("Failed to read history '%s'; %s" % (self._histfile, e)) |
| atexit.register(self.__save_history) |
| |
| def __save_history(self): |
| try: |
| readline.write_history_file(self._histfile) |
| except Exception as e: |
| print("Failed to save history file '%s'; %s" % (self._histfile, e)) |
| |
| def __parse_value(self, val): |
| try: |
| return int(val) |
| except ValueError: |
| pass |
| |
| if val.lower() == 'true': |
| return True |
| if val.lower() == 'false': |
| return False |
| if val.startswith(('{', '[')): |
| # Try first as pure JSON: |
| try: |
| return json.loads(val) |
| except ValueError: |
| pass |
| # Try once again as FuzzyJSON: |
| try: |
| st = ast.parse(val, mode='eval') |
| return ast.literal_eval(FuzzyJSON().visit(st)) |
| except SyntaxError: |
| pass |
| except ValueError: |
| pass |
| return val |
| |
| def __cli_expr(self, tokens, parent): |
| for arg in tokens: |
| (key, sep, val) = arg.partition('=') |
| if sep != '=': |
| raise QMPShellError("Expected a key=value pair, got '%s'" % arg) |
| |
| value = self.__parse_value(val) |
| optpath = key.split('.') |
| curpath = [] |
| for p in optpath[:-1]: |
| curpath.append(p) |
| d = parent.get(p, {}) |
| if type(d) is not dict: |
| raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath)) |
| parent[p] = d |
| parent = d |
| if optpath[-1] in parent: |
| if type(parent[optpath[-1]]) is dict: |
| raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath)) |
| else: |
| raise QMPShellError('Cannot set "%s" multiple times' % key) |
| parent[optpath[-1]] = value |
| |
| def __build_cmd(self, cmdline): |
| """ |
| Build a QMP input object from a user provided command-line in the |
| following format: |
| |
| < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] |
| """ |
| cmdargs = re.findall(r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''', cmdline) |
| |
| # Transactional CLI entry/exit: |
| if cmdargs[0] == 'transaction(': |
| self._transmode = True |
| cmdargs.pop(0) |
| elif cmdargs[0] == ')' and self._transmode: |
| self._transmode = False |
| if len(cmdargs) > 1: |
| raise QMPShellError("Unexpected input after close of Transaction sub-shell") |
| qmpcmd = { 'execute': 'transaction', |
| 'arguments': { 'actions': self._actions } } |
| self._actions = list() |
| return qmpcmd |
| |
| # Nothing to process? |
| if not cmdargs: |
| return None |
| |
| # Parse and then cache this Transactional Action |
| if self._transmode: |
| finalize = False |
| action = { 'type': cmdargs[0], 'data': {} } |
| if cmdargs[-1] == ')': |
| cmdargs.pop(-1) |
| finalize = True |
| self.__cli_expr(cmdargs[1:], action['data']) |
| self._actions.append(action) |
| return self.__build_cmd(')') if finalize else None |
| |
| # Standard command: parse and return it to be executed. |
| qmpcmd = { 'execute': cmdargs[0], 'arguments': {} } |
| self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) |
| return qmpcmd |
| |
| def _print(self, qmp): |
| indent = None |
| if self._pretty: |
| indent = 4 |
| jsobj = json.dumps(qmp, indent=indent) |
| print(str(jsobj)) |
| |
| def _execute_cmd(self, cmdline): |
| try: |
| qmpcmd = self.__build_cmd(cmdline) |
| except Exception as e: |
| print('Error while parsing command line: %s' % e) |
| print('command format: <command-name> ', end=' ') |
| print('[arg-name1=arg1] ... [arg-nameN=argN]') |
| return True |
| # For transaction mode, we may have just cached the action: |
| if qmpcmd is None: |
| return True |
| if self._verbose: |
| self._print(qmpcmd) |
| resp = self.cmd_obj(qmpcmd) |
| if resp is None: |
| print('Disconnected') |
| return False |
| self._print(resp) |
| return True |
| |
| def connect(self, negotiate): |
| self._greeting = super(QMPShell, self).connect(negotiate) |
| self.__completer_setup() |
| |
| def show_banner(self, msg='Welcome to the QMP low-level shell!'): |
| print(msg) |
| if not self._greeting: |
| print('Connected') |
| return |
| version = self._greeting['QMP']['version']['qemu'] |
| print('Connected to QEMU %d.%d.%d\n' % (version['major'],version['minor'],version['micro'])) |
| |
| def get_prompt(self): |
| if self._transmode: |
| return "TRANS> " |
| return "(QEMU) " |
| |
| def read_exec_command(self, prompt): |
| """ |
| Read and execute a command. |
| |
| @return True if execution was ok, return False if disconnected. |
| """ |
| try: |
| cmdline = raw_input(prompt) |
| except EOFError: |
| print() |
| return False |
| if cmdline == '': |
| for ev in self.get_events(): |
| print(ev) |
| self.clear_events() |
| return True |
| else: |
| return self._execute_cmd(cmdline) |
| |
| def set_verbosity(self, verbose): |
| self._verbose = verbose |
| |
| class HMPShell(QMPShell): |
| def __init__(self, address): |
| QMPShell.__init__(self, address) |
| self.__cpu_index = 0 |
| |
| def __cmd_completion(self): |
| for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): |
| if cmd and cmd[0] != '[' and cmd[0] != '\t': |
| name = cmd.split()[0] # drop help text |
| if name == 'info': |
| continue |
| if name.find('|') != -1: |
| # Command in the form 'foobar|f' or 'f|foobar', take the |
| # full name |
| opt = name.split('|') |
| if len(opt[0]) == 1: |
| name = opt[1] |
| else: |
| name = opt[0] |
| self._completer.append(name) |
| self._completer.append('help ' + name) # help completion |
| |
| def __info_completion(self): |
| for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): |
| if cmd: |
| self._completer.append('info ' + cmd.split()[1]) |
| |
| def __other_completion(self): |
| # special cases |
| self._completer.append('help info') |
| |
| def _fill_completion(self): |
| self.__cmd_completion() |
| self.__info_completion() |
| self.__other_completion() |
| |
| def __cmd_passthrough(self, cmdline, cpu_index = 0): |
| return self.cmd_obj({ 'execute': 'human-monitor-command', 'arguments': |
| { 'command-line': cmdline, |
| 'cpu-index': cpu_index } }) |
| |
| def _execute_cmd(self, cmdline): |
| if cmdline.split()[0] == "cpu": |
| # trap the cpu command, it requires special setting |
| try: |
| idx = int(cmdline.split()[1]) |
| if not 'return' in self.__cmd_passthrough('info version', idx): |
| print('bad CPU index') |
| return True |
| self.__cpu_index = idx |
| except ValueError: |
| print('cpu command takes an integer argument') |
| return True |
| resp = self.__cmd_passthrough(cmdline, self.__cpu_index) |
| if resp is None: |
| print('Disconnected') |
| return False |
| assert 'return' in resp or 'error' in resp |
| if 'return' in resp: |
| # Success |
| if len(resp['return']) > 0: |
| print(resp['return'], end=' ') |
| else: |
| # Error |
| print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) |
| return True |
| |
| def show_banner(self): |
| QMPShell.show_banner(self, msg='Welcome to the HMP shell!') |
| |
| def die(msg): |
| sys.stderr.write('ERROR: %s\n' % msg) |
| sys.exit(1) |
| |
| def fail_cmdline(option=None): |
| if option: |
| sys.stderr.write('ERROR: bad command-line option \'%s\'\n' % option) |
| sys.stderr.write('qmp-shell [ -v ] [ -p ] [ -H ] [ -N ] < UNIX socket path> | < TCP address:port >\n') |
| sys.stderr.write(' -v Verbose (echo command sent and received)\n') |
| sys.stderr.write(' -p Pretty-print JSON\n') |
| sys.stderr.write(' -H Use HMP interface\n') |
| sys.stderr.write(' -N Skip negotiate (for qemu-ga)\n') |
| sys.exit(1) |
| |
| def main(): |
| addr = '' |
| qemu = None |
| hmp = False |
| pretty = False |
| verbose = False |
| negotiate = True |
| |
| try: |
| for arg in sys.argv[1:]: |
| if arg == "-H": |
| if qemu is not None: |
| fail_cmdline(arg) |
| hmp = True |
| elif arg == "-p": |
| pretty = True |
| elif arg == "-N": |
| negotiate = False |
| elif arg == "-v": |
| verbose = True |
| else: |
| if qemu is not None: |
| fail_cmdline(arg) |
| if hmp: |
| qemu = HMPShell(arg) |
| else: |
| qemu = QMPShell(arg, pretty) |
| addr = arg |
| |
| if qemu is None: |
| fail_cmdline() |
| except QMPShellBadPort: |
| die('bad port number in command-line') |
| |
| try: |
| qemu.connect(negotiate) |
| except qmp.QMPConnectError: |
| die('Didn\'t get QMP greeting message') |
| except qmp.QMPCapabilitiesError: |
| die('Could not negotiate capabilities') |
| except qemu.error: |
| die('Could not connect to %s' % addr) |
| |
| qemu.show_banner() |
| qemu.set_verbosity(verbose) |
| while qemu.read_exec_command(qemu.get_prompt()): |
| pass |
| qemu.close() |
| |
| if __name__ == '__main__': |
| main() |