| #!/usr/bin/env python |
| # |
| # Test case for NBD's blockdev-add interface |
| # |
| # Copyright (C) 2016 Red Hat, Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import os |
| import random |
| import socket |
| import stat |
| import time |
| import iotests |
| from iotests import cachemode, imgfmt, qemu_img, qemu_nbd, qemu_nbd_pipe |
| |
| NBD_PORT_START = 32768 |
| NBD_PORT_END = NBD_PORT_START + 1024 |
| NBD_IPV6_PORT_START = NBD_PORT_END |
| NBD_IPV6_PORT_END = NBD_IPV6_PORT_START + 1024 |
| |
| test_img = os.path.join(iotests.test_dir, 'test.img') |
| unix_socket = os.path.join(iotests.test_dir, 'nbd.socket') |
| |
| |
| def flatten_sock_addr(crumpled_address): |
| result = { 'type': crumpled_address['type'] } |
| result.update(crumpled_address['data']) |
| return result |
| |
| |
| class NBDBlockdevAddBase(iotests.QMPTestCase): |
| def blockdev_add_options(self, address, export, node_name): |
| options = { 'node-name': node_name, |
| 'driver': 'raw', |
| 'file': { |
| 'driver': 'nbd', |
| 'read-only': True, |
| 'server': address |
| } } |
| if export is not None: |
| options['file']['export'] = export |
| return options |
| |
| def client_test(self, filename, address, export=None, |
| node_name='nbd-blockdev', delete=True): |
| bao = self.blockdev_add_options(address, export, node_name) |
| result = self.vm.qmp('blockdev-add', **bao) |
| self.assert_qmp(result, 'return', {}) |
| |
| found = False |
| result = self.vm.qmp('query-named-block-nodes') |
| for node in result['return']: |
| if node['node-name'] == node_name: |
| found = True |
| if isinstance(filename, str): |
| self.assert_qmp(node, 'image/filename', filename) |
| else: |
| self.assert_json_filename_equal(node['image']['filename'], |
| filename) |
| break |
| self.assertTrue(found) |
| |
| if delete: |
| result = self.vm.qmp('blockdev-del', node_name=node_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| |
| class QemuNBD(NBDBlockdevAddBase): |
| def setUp(self): |
| qemu_img('create', '-f', iotests.imgfmt, test_img, '64k') |
| self.vm = iotests.VM() |
| self.vm.launch() |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| os.remove(test_img) |
| try: |
| os.remove(unix_socket) |
| except OSError: |
| pass |
| |
| def _try_server_up(self, *args): |
| status, msg = qemu_nbd_pipe('-f', imgfmt, test_img, *args) |
| if status == 0: |
| return True |
| if 'Address already in use' in msg: |
| return False |
| self.fail(msg) |
| |
| def _server_up(self, *args): |
| self.assertTrue(self._try_server_up(*args)) |
| |
| def test_inet(self): |
| while True: |
| nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) |
| if self._try_server_up('-b', 'localhost', '-p', str(nbd_port)): |
| break |
| |
| address = { 'type': 'inet', |
| 'data': { |
| 'host': 'localhost', |
| 'port': str(nbd_port) |
| } } |
| self.client_test('nbd://localhost:%i' % nbd_port, |
| flatten_sock_addr(address)) |
| |
| def test_unix(self): |
| self._server_up('-k', unix_socket) |
| address = { 'type': 'unix', |
| 'data': { 'path': unix_socket } } |
| self.client_test('nbd+unix://?socket=' + unix_socket, |
| flatten_sock_addr(address)) |
| |
| |
| class BuiltinNBD(NBDBlockdevAddBase): |
| def setUp(self): |
| qemu_img('create', '-f', iotests.imgfmt, test_img, '64k') |
| self.vm = iotests.VM() |
| self.vm.launch() |
| self.server = iotests.VM('.server') |
| self.server.add_drive_raw('if=none,id=nbd-export,' + |
| 'file=%s,' % test_img + |
| 'format=%s,' % imgfmt + |
| 'cache=%s' % cachemode) |
| self.server.launch() |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| self.server.shutdown() |
| os.remove(test_img) |
| try: |
| os.remove(unix_socket) |
| except OSError: |
| pass |
| |
| # Returns False on EADDRINUSE; fails an assertion on other errors. |
| # Returns True on success. |
| def _try_server_up(self, address, export_name=None, export_name2=None): |
| result = self.server.qmp('nbd-server-start', addr=address) |
| if 'error' in result and \ |
| 'Address already in use' in result['error']['desc']: |
| return False |
| self.assert_qmp(result, 'return', {}) |
| |
| if export_name is None: |
| result = self.server.qmp('nbd-server-add', device='nbd-export') |
| else: |
| result = self.server.qmp('nbd-server-add', device='nbd-export', |
| name=export_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| if export_name2 is not None: |
| result = self.server.qmp('nbd-server-add', device='nbd-export', |
| name=export_name2) |
| self.assert_qmp(result, 'return', {}) |
| |
| return True |
| |
| def _server_up(self, address, export_name=None, export_name2=None): |
| self.assertTrue(self._try_server_up(address, export_name, export_name2)) |
| |
| def _server_down(self): |
| result = self.server.qmp('nbd-server-stop') |
| self.assert_qmp(result, 'return', {}) |
| |
| def do_test_inet(self, export_name=None): |
| while True: |
| nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) |
| address = { 'type': 'inet', |
| 'data': { |
| 'host': 'localhost', |
| 'port': str(nbd_port) |
| } } |
| if self._try_server_up(address, export_name): |
| break |
| |
| export_name = export_name or 'nbd-export' |
| self.client_test('nbd://localhost:%i/%s' % (nbd_port, export_name), |
| flatten_sock_addr(address), export_name) |
| self._server_down() |
| |
| def test_inet_default_export_name(self): |
| self.do_test_inet() |
| |
| def test_inet_same_export_name(self): |
| self.do_test_inet('nbd-export') |
| |
| def test_inet_different_export_name(self): |
| self.do_test_inet('shadow') |
| |
| def test_inet_two_exports(self): |
| while True: |
| nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) |
| address = { 'type': 'inet', |
| 'data': { |
| 'host': 'localhost', |
| 'port': str(nbd_port) |
| } } |
| if self._try_server_up(address, 'exp1', 'exp2'): |
| break |
| |
| self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp1'), |
| flatten_sock_addr(address), 'exp1', 'node1', False) |
| self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp2'), |
| flatten_sock_addr(address), 'exp2', 'node2', False) |
| result = self.vm.qmp('blockdev-del', node_name='node1') |
| self.assert_qmp(result, 'return', {}) |
| result = self.vm.qmp('blockdev-del', node_name='node2') |
| self.assert_qmp(result, 'return', {}) |
| self._server_down() |
| |
| def test_inet6(self): |
| try: |
| socket.getaddrinfo("::0", "0", socket.AF_INET6, |
| socket.SOCK_STREAM, socket.IPPROTO_TCP, |
| socket.AI_ADDRCONFIG | socket.AI_CANONNAME) |
| except socket.gaierror: |
| # IPv6 not available, skip |
| return |
| |
| while True: |
| nbd_port = random.randrange(NBD_IPV6_PORT_START, NBD_IPV6_PORT_END) |
| address = { 'type': 'inet', |
| 'data': { |
| 'host': '::1', |
| 'port': str(nbd_port), |
| 'ipv4': False, |
| 'ipv6': True |
| } } |
| if self._try_server_up(address): |
| break |
| |
| filename = { 'driver': 'raw', |
| 'file': { |
| 'driver': 'nbd', |
| 'export': 'nbd-export', |
| 'server': flatten_sock_addr(address) |
| } } |
| self.client_test(filename, flatten_sock_addr(address), 'nbd-export') |
| self._server_down() |
| |
| def test_unix(self): |
| address = { 'type': 'unix', |
| 'data': { 'path': unix_socket } } |
| self._server_up(address) |
| self.client_test('nbd+unix:///nbd-export?socket=' + unix_socket, |
| flatten_sock_addr(address), 'nbd-export') |
| self._server_down() |
| |
| def test_fd(self): |
| self._server_up({ 'type': 'unix', |
| 'data': { 'path': unix_socket } }) |
| |
| sockfd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| sockfd.connect(unix_socket) |
| |
| result = self.vm.send_fd_scm(fd=sockfd.fileno()) |
| self.assertEqual(result, 0, 'Failed to send socket FD') |
| |
| result = self.vm.qmp('getfd', fdname='nbd-fifo') |
| self.assert_qmp(result, 'return', {}) |
| |
| address = { 'type': 'fd', |
| 'data': { 'str': 'nbd-fifo' } } |
| filename = { 'driver': 'raw', |
| 'file': { |
| 'driver': 'nbd', |
| 'export': 'nbd-export', |
| 'server': flatten_sock_addr(address) |
| } } |
| self.client_test(filename, flatten_sock_addr(address), 'nbd-export') |
| |
| self._server_down() |
| |
| |
| if __name__ == '__main__': |
| # Need to support image creation |
| iotests.main(supported_fmts=['vpc', 'parallels', 'qcow', 'vdi', 'qcow2', |
| 'vmdk', 'raw', 'vhdx', 'qed']) |