| #!/usr/bin/env python |
| # |
| # Test cases for the QMP 'x-blockdev-del' command |
| # |
| # Copyright (C) 2015 Igalia, S.L. |
| # Author: Alberto Garcia <berto@igalia.com> |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import os |
| import iotests |
| import time |
| |
| base_img = os.path.join(iotests.test_dir, 'base.img') |
| new_img = os.path.join(iotests.test_dir, 'new.img') |
| |
| class TestBlockdevDel(iotests.QMPTestCase): |
| |
| def setUp(self): |
| iotests.qemu_img('create', '-f', iotests.imgfmt, base_img, '1M') |
| self.vm = iotests.VM() |
| self.vm.add_device("virtio-scsi-pci,id=virtio-scsi") |
| self.vm.launch() |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| os.remove(base_img) |
| if os.path.isfile(new_img): |
| os.remove(new_img) |
| |
| # Check whether a BlockDriverState exists |
| def checkBlockDriverState(self, node, must_exist = True): |
| result = self.vm.qmp('query-named-block-nodes') |
| nodes = filter(lambda x: x['node-name'] == node, result['return']) |
| self.assertLessEqual(len(nodes), 1) |
| self.assertEqual(must_exist, len(nodes) == 1) |
| |
| # Add a BlockDriverState without a BlockBackend |
| def addBlockDriverState(self, node): |
| file_node = '%s_file' % node |
| self.checkBlockDriverState(node, False) |
| self.checkBlockDriverState(file_node, False) |
| opts = {'driver': iotests.imgfmt, |
| 'node-name': node, |
| 'file': {'driver': 'file', |
| 'node-name': file_node, |
| 'filename': base_img}} |
| result = self.vm.qmp('blockdev-add', conv_keys = False, **opts) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node) |
| self.checkBlockDriverState(file_node) |
| |
| # Add a BlockDriverState that will be used as overlay for the base_img BDS |
| def addBlockDriverStateOverlay(self, node): |
| self.checkBlockDriverState(node, False) |
| iotests.qemu_img('create', '-f', iotests.imgfmt, |
| '-b', base_img, new_img, '1M') |
| opts = {'driver': iotests.imgfmt, |
| 'node-name': node, |
| 'backing': '', |
| 'file': {'driver': 'file', |
| 'filename': new_img}} |
| result = self.vm.qmp('blockdev-add', conv_keys = False, **opts) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node) |
| |
| # Delete a BlockDriverState |
| def delBlockDriverState(self, node, expect_error = False): |
| self.checkBlockDriverState(node) |
| result = self.vm.qmp('x-blockdev-del', node_name = node) |
| if expect_error: |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| else: |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node, expect_error) |
| |
| # Add a device model |
| def addDeviceModel(self, device, backend, driver = 'virtio-blk-pci'): |
| result = self.vm.qmp('device_add', id = device, |
| driver = driver, drive = backend) |
| self.assert_qmp(result, 'return', {}) |
| |
| # Delete a device model |
| def delDeviceModel(self, device, is_virtio_blk = True): |
| result = self.vm.qmp('device_del', id = device) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('system_reset') |
| self.assert_qmp(result, 'return', {}) |
| |
| if is_virtio_blk: |
| device_path = '/machine/peripheral/%s/virtio-backend' % device |
| event = self.vm.event_wait(name="DEVICE_DELETED", |
| match={'data': {'path': device_path}}) |
| self.assertNotEqual(event, None) |
| |
| event = self.vm.event_wait(name="DEVICE_DELETED", |
| match={'data': {'device': device}}) |
| self.assertNotEqual(event, None) |
| |
| # Remove a BlockDriverState |
| def ejectDrive(self, device, node, expect_error = False, |
| destroys_media = True): |
| self.checkBlockDriverState(node) |
| result = self.vm.qmp('eject', id = device) |
| if expect_error: |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| self.checkBlockDriverState(node) |
| else: |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node, not destroys_media) |
| |
| # Insert a BlockDriverState |
| def insertDrive(self, device, node): |
| self.checkBlockDriverState(node) |
| result = self.vm.qmp('x-blockdev-insert-medium', |
| id = device, node_name = node) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node) |
| |
| # Create a snapshot using 'blockdev-snapshot-sync' |
| def createSnapshotSync(self, node, overlay): |
| self.checkBlockDriverState(node) |
| self.checkBlockDriverState(overlay, False) |
| opts = {'node-name': node, |
| 'snapshot-file': new_img, |
| 'snapshot-node-name': overlay, |
| 'format': iotests.imgfmt} |
| result = self.vm.qmp('blockdev-snapshot-sync', conv_keys=False, **opts) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node) |
| self.checkBlockDriverState(overlay) |
| |
| # Create a snapshot using 'blockdev-snapshot' |
| def createSnapshot(self, node, overlay): |
| self.checkBlockDriverState(node) |
| self.checkBlockDriverState(overlay) |
| result = self.vm.qmp('blockdev-snapshot', |
| node = node, overlay = overlay) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node) |
| self.checkBlockDriverState(overlay) |
| |
| # Create a mirror |
| def createMirror(self, node, new_node): |
| self.checkBlockDriverState(new_node, False) |
| opts = {'device': node, |
| 'job-id': node, |
| 'target': new_img, |
| 'node-name': new_node, |
| 'sync': 'top', |
| 'format': iotests.imgfmt} |
| result = self.vm.qmp('drive-mirror', conv_keys=False, **opts) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(new_node) |
| |
| # Complete an existing block job |
| def completeBlockJob(self, id, node_before, node_after): |
| result = self.vm.qmp('block-job-complete', device=id) |
| self.assert_qmp(result, 'return', {}) |
| self.wait_until_completed(id) |
| |
| # Add a BlkDebug node |
| # Note that the purpose of this is to test the x-blockdev-del |
| # sanity checks, not to create a usable blkdebug drive |
| def addBlkDebug(self, debug, node): |
| self.checkBlockDriverState(node, False) |
| self.checkBlockDriverState(debug, False) |
| image = {'driver': iotests.imgfmt, |
| 'node-name': node, |
| 'file': {'driver': 'file', |
| 'filename': base_img}} |
| opts = {'driver': 'blkdebug', |
| 'node-name': debug, |
| 'image': image} |
| result = self.vm.qmp('blockdev-add', conv_keys = False, **opts) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(node) |
| self.checkBlockDriverState(debug) |
| |
| # Add a BlkVerify node |
| # Note that the purpose of this is to test the x-blockdev-del |
| # sanity checks, not to create a usable blkverify drive |
| def addBlkVerify(self, blkverify, test, raw): |
| self.checkBlockDriverState(test, False) |
| self.checkBlockDriverState(raw, False) |
| self.checkBlockDriverState(blkverify, False) |
| iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M') |
| node_0 = {'driver': iotests.imgfmt, |
| 'node-name': test, |
| 'file': {'driver': 'file', |
| 'filename': base_img}} |
| node_1 = {'driver': iotests.imgfmt, |
| 'node-name': raw, |
| 'file': {'driver': 'file', |
| 'filename': new_img}} |
| opts = {'driver': 'blkverify', |
| 'node-name': blkverify, |
| 'test': node_0, |
| 'raw': node_1} |
| result = self.vm.qmp('blockdev-add', conv_keys = False, **opts) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(test) |
| self.checkBlockDriverState(raw) |
| self.checkBlockDriverState(blkverify) |
| |
| # Add a Quorum node |
| def addQuorum(self, quorum, child0, child1): |
| self.checkBlockDriverState(child0, False) |
| self.checkBlockDriverState(child1, False) |
| self.checkBlockDriverState(quorum, False) |
| iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M') |
| child_0 = {'driver': iotests.imgfmt, |
| 'node-name': child0, |
| 'file': {'driver': 'file', |
| 'filename': base_img}} |
| child_1 = {'driver': iotests.imgfmt, |
| 'node-name': child1, |
| 'file': {'driver': 'file', |
| 'filename': new_img}} |
| opts = {'driver': 'quorum', |
| 'node-name': quorum, |
| 'vote-threshold': 1, |
| 'children': [ child_0, child_1 ]} |
| result = self.vm.qmp('blockdev-add', conv_keys = False, **opts) |
| self.assert_qmp(result, 'return', {}) |
| self.checkBlockDriverState(child0) |
| self.checkBlockDriverState(child1) |
| self.checkBlockDriverState(quorum) |
| |
| ######################## |
| # The tests start here # |
| ######################## |
| |
| def testBlockDriverState(self): |
| self.addBlockDriverState('node0') |
| # You cannot delete a file BDS directly |
| self.delBlockDriverState('node0_file', expect_error = True) |
| self.delBlockDriverState('node0') |
| |
| def testDeviceModel(self): |
| self.addBlockDriverState('node0') |
| self.addDeviceModel('device0', 'node0') |
| self.ejectDrive('device0', 'node0', expect_error = True) |
| self.delBlockDriverState('node0', expect_error = True) |
| self.delDeviceModel('device0') |
| self.delBlockDriverState('node0') |
| |
| def testAttachMedia(self): |
| # This creates a BlockBackend and removes its media |
| self.addBlockDriverState('node0') |
| self.addDeviceModel('device0', 'node0', 'scsi-cd') |
| self.ejectDrive('device0', 'node0', destroys_media = False) |
| self.delBlockDriverState('node0') |
| |
| # This creates a new BlockDriverState and inserts it into the device |
| self.addBlockDriverState('node1') |
| self.insertDrive('device0', 'node1') |
| # The node can't be removed: the new device has an extra reference |
| self.delBlockDriverState('node1', expect_error = True) |
| # The BDS still exists after being ejected, but now it can be removed |
| self.ejectDrive('device0', 'node1', destroys_media = False) |
| self.delBlockDriverState('node1') |
| self.delDeviceModel('device0', False) |
| |
| def testSnapshotSync(self): |
| self.addBlockDriverState('node0') |
| self.addDeviceModel('device0', 'node0') |
| self.createSnapshotSync('node0', 'overlay0') |
| # This fails because node0 is now being used as a backing image |
| self.delBlockDriverState('node0', expect_error = True) |
| self.delBlockDriverState('overlay0', expect_error = True) |
| # This succeeds because device0 only has the backend reference |
| self.delDeviceModel('device0') |
| # FIXME Would still be there if blockdev-snapshot-sync took a ref |
| self.checkBlockDriverState('overlay0', False) |
| self.delBlockDriverState('node0') |
| |
| def testSnapshot(self): |
| self.addBlockDriverState('node0') |
| self.addDeviceModel('device0', 'node0', 'scsi-cd') |
| self.addBlockDriverStateOverlay('overlay0') |
| self.createSnapshot('node0', 'overlay0') |
| self.delBlockDriverState('node0', expect_error = True) |
| self.delBlockDriverState('overlay0', expect_error = True) |
| self.ejectDrive('device0', 'overlay0', destroys_media = False) |
| self.delBlockDriverState('node0', expect_error = True) |
| self.delBlockDriverState('overlay0') |
| self.delBlockDriverState('node0') |
| |
| def testMirror(self): |
| self.addBlockDriverState('node0') |
| self.addDeviceModel('device0', 'node0', 'scsi-cd') |
| self.createMirror('node0', 'mirror0') |
| # The block job prevents removing the device |
| self.delBlockDriverState('node0', expect_error = True) |
| self.delBlockDriverState('mirror0', expect_error = True) |
| self.wait_ready('node0') |
| self.completeBlockJob('node0', 'node0', 'mirror0') |
| self.assert_no_active_block_jobs() |
| # This succeeds because the device now points to mirror0 |
| self.delBlockDriverState('node0') |
| self.delBlockDriverState('mirror0', expect_error = True) |
| self.delDeviceModel('device0', False) |
| # FIXME mirror0 disappears, drive-mirror doesn't take a reference |
| #self.delBlockDriverState('mirror0') |
| |
| def testBlkDebug(self): |
| self.addBlkDebug('debug0', 'node0') |
| # 'node0' is used by the blkdebug node |
| self.delBlockDriverState('node0', expect_error = True) |
| # But we can remove the blkdebug node directly |
| self.delBlockDriverState('debug0') |
| self.checkBlockDriverState('node0', False) |
| |
| def testBlkVerify(self): |
| self.addBlkVerify('verify0', 'node0', 'node1') |
| # We cannot remove the children of a blkverify device |
| self.delBlockDriverState('node0', expect_error = True) |
| self.delBlockDriverState('node1', expect_error = True) |
| # But we can remove the blkverify node directly |
| self.delBlockDriverState('verify0') |
| self.checkBlockDriverState('node0', False) |
| self.checkBlockDriverState('node1', False) |
| |
| def testQuorum(self): |
| if not iotests.supports_quorum(): |
| return |
| |
| self.addQuorum('quorum0', 'node0', 'node1') |
| # We cannot remove the children of a Quorum device |
| self.delBlockDriverState('node0', expect_error = True) |
| self.delBlockDriverState('node1', expect_error = True) |
| # But we can remove the Quorum node directly |
| self.delBlockDriverState('quorum0') |
| self.checkBlockDriverState('node0', False) |
| self.checkBlockDriverState('node1', False) |
| |
| |
| if __name__ == '__main__': |
| iotests.main(supported_fmts=["qcow2"]) |