| #!/usr/bin/env python |
| # |
| # Test case for the QMP 'change' command and all other associated |
| # commands |
| # |
| # Copyright (C) 2015 Red Hat, Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import os |
| import stat |
| import time |
| import iotests |
| from iotests import qemu_img |
| |
| old_img = os.path.join(iotests.test_dir, 'test0.img') |
| new_img = os.path.join(iotests.test_dir, 'test1.img') |
| |
| def interface_to_device_name(interface): |
| if interface == 'ide': |
| return 'ide-cd' |
| elif interface == 'floppy': |
| return 'floppy' |
| elif interface == 'scsi': |
| return 'scsi-cd' |
| else: |
| return None |
| |
| class ChangeBaseClass(iotests.QMPTestCase): |
| has_opened = False |
| has_closed = False |
| |
| device_name = 'qdev0' |
| use_drive = False |
| |
| def process_events(self): |
| for event in self.vm.get_qmp_events(wait=False): |
| if (event['event'] == 'DEVICE_TRAY_MOVED' and |
| (event['data']['device'] == 'drive0' or |
| event['data']['id'] == self.device_name)): |
| if event['data']['tray-open'] == False: |
| self.has_closed = True |
| else: |
| self.has_opened = True |
| |
| def wait_for_open(self): |
| if not self.has_real_tray: |
| return |
| |
| with iotests.Timeout(3, 'Timeout while waiting for the tray to open'): |
| while not self.has_opened: |
| self.process_events() |
| |
| def wait_for_close(self): |
| if not self.has_real_tray: |
| return |
| |
| with iotests.Timeout(3, 'Timeout while waiting for the tray to close'): |
| while not self.has_closed: |
| self.process_events() |
| |
| class GeneralChangeTestsBaseClass(ChangeBaseClass): |
| |
| def test_change(self): |
| # 'change' requires a drive name, so skip the test for blockdev |
| if not self.use_drive: |
| return |
| |
| result = self.vm.qmp('change', device='drive0', target=new_img, |
| arg=iotests.imgfmt) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| self.wait_for_close() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_blockdev_change_medium(self): |
| result = self.vm.qmp('blockdev-change-medium', |
| id=self.device_name, filename=new_img, |
| format=iotests.imgfmt) |
| |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| self.wait_for_close() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_eject(self): |
| result = self.vm.qmp('eject', id=self.device_name, force=True) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| |
| def test_tray_eject_change(self): |
| result = self.vm.qmp('eject', id=self.device_name, force=True) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| |
| result = self.vm.qmp('blockdev-change-medium', id=self.device_name, |
| filename=new_img, format=iotests.imgfmt) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_close() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_tray_open_close(self): |
| result = self.vm.qmp('blockdev-open-tray', |
| id=self.device_name, force=True) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| if self.was_empty == True: |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| else: |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-close-tray', id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| if self.has_real_tray or not self.was_empty: |
| self.wait_for_close() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', False) |
| if self.was_empty == True: |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| else: |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| def test_tray_eject_close(self): |
| result = self.vm.qmp('eject', id=self.device_name, force=True) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| |
| result = self.vm.qmp('blockdev-close-tray', id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_close() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', False) |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| |
| def test_tray_open_change(self): |
| result = self.vm.qmp('blockdev-open-tray', id=self.device_name, |
| force=True) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| if self.was_empty == True: |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| else: |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_close() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_cycle(self, read_only_node=False): |
| result = self.vm.qmp('blockdev-add', |
| node_name='new', |
| driver=iotests.imgfmt, |
| read_only=read_only_node, |
| file={'filename': new_img, |
| 'driver': 'file'}) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('blockdev-open-tray', |
| id=self.device_name, force=True) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| if self.was_empty == True: |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| else: |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-remove-medium', |
| id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| |
| result = self.vm.qmp('blockdev-insert-medium', |
| id=self.device_name, node_name='new') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| result = self.vm.qmp('blockdev-close-tray', id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_close() |
| |
| result = self.vm.qmp('query-block') |
| if self.has_real_tray: |
| self.assert_qmp(result, 'return[0]/tray_open', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_cycle_read_only_media(self): |
| self.test_cycle(True) |
| |
| def test_close_on_closed(self): |
| result = self.vm.qmp('blockdev-close-tray', id=self.device_name) |
| # Should be a no-op |
| self.assert_qmp(result, 'return', {}) |
| self.assertEqual(self.vm.get_qmp_events(wait=False), []) |
| |
| def test_remove_on_closed(self): |
| if not self.has_real_tray: |
| return |
| |
| result = self.vm.qmp('blockdev-remove-medium', id=self.device_name) |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| def test_insert_on_closed(self): |
| if not self.has_real_tray: |
| return |
| |
| result = self.vm.qmp('blockdev-add', |
| node_name='new', |
| driver=iotests.imgfmt, |
| file={'filename': new_img, |
| 'driver': 'file'}) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('blockdev-insert-medium', id=self.device_name, |
| node_name='new') |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| class TestInitiallyFilled(GeneralChangeTestsBaseClass): |
| was_empty = False |
| |
| def setUp(self): |
| qemu_img('create', '-f', iotests.imgfmt, old_img, '1440k') |
| qemu_img('create', '-f', iotests.imgfmt, new_img, '1440k') |
| self.vm = iotests.VM() |
| if self.use_drive: |
| self.vm.add_drive(old_img, 'media=%s' % self.media, 'none') |
| else: |
| self.vm.add_blockdev([ 'node-name=drive0', |
| 'driver=%s' % iotests.imgfmt, |
| 'file.driver=file', |
| 'file.filename=%s' % old_img ]) |
| if self.interface == 'scsi': |
| self.vm.add_device('virtio-scsi-pci') |
| self.vm.add_device('%s,drive=drive0,id=%s' % |
| (interface_to_device_name(self.interface), |
| self.device_name)) |
| self.vm.launch() |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| os.remove(old_img) |
| os.remove(new_img) |
| |
| def test_insert_on_filled(self): |
| result = self.vm.qmp('blockdev-add', |
| node_name='new', |
| driver=iotests.imgfmt, |
| file={'filename': new_img, |
| 'driver': 'file'}) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('blockdev-open-tray', id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('blockdev-insert-medium', id=self.device_name, |
| node_name='new') |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| class TestInitiallyEmpty(GeneralChangeTestsBaseClass): |
| was_empty = True |
| |
| def setUp(self): |
| qemu_img('create', '-f', iotests.imgfmt, new_img, '1440k') |
| self.vm = iotests.VM() |
| if self.use_drive: |
| self.vm.add_drive(None, 'media=%s' % self.media, 'none') |
| if self.interface == 'scsi': |
| self.vm.add_device('virtio-scsi-pci') |
| self.vm.add_device('%s,%sid=%s' % |
| (interface_to_device_name(self.interface), |
| 'drive=drive0,' if self.use_drive else '', |
| self.device_name)) |
| self.vm.launch() |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| os.remove(new_img) |
| |
| def test_remove_on_empty(self): |
| result = self.vm.qmp('blockdev-open-tray', id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| self.wait_for_open() |
| |
| result = self.vm.qmp('blockdev-remove-medium', id=self.device_name) |
| # Should be a no-op |
| self.assert_qmp(result, 'return', {}) |
| |
| # Do this in a function to avoid leaking variables like case into the global |
| # name space (otherwise tests would be run for the abstract base classes) |
| def create_basic_test_classes(): |
| for (media, interface, has_real_tray) in [ ('cdrom', 'ide', True), |
| ('cdrom', 'scsi', True), |
| ('disk', 'floppy', False) ]: |
| |
| for case in [ TestInitiallyFilled, TestInitiallyEmpty ]: |
| for use_drive in [ True, False ]: |
| attr = { 'media': media, |
| 'interface': interface, |
| 'has_real_tray': has_real_tray, |
| 'use_drive': use_drive } |
| |
| name = '%s_%s_%s_%s' % (case.__name__, media, interface, |
| 'drive' if use_drive else 'blockdev') |
| globals()[name] = type(name, (case, ), attr) |
| |
| create_basic_test_classes() |
| |
| class TestChangeReadOnly(ChangeBaseClass): |
| device_name = 'qdev0' |
| |
| def setUp(self): |
| qemu_img('create', '-f', iotests.imgfmt, old_img, '1440k') |
| qemu_img('create', '-f', iotests.imgfmt, new_img, '1440k') |
| self.vm = iotests.VM() |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| os.chmod(old_img, 0o666) |
| os.chmod(new_img, 0o666) |
| os.remove(old_img) |
| os.remove(new_img) |
| |
| def test_ro_ro_retain(self): |
| os.chmod(old_img, 0o444) |
| os.chmod(new_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk,read-only=on', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='retain') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_ro_rw_retain(self): |
| os.chmod(old_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk,read-only=on', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='retain') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| @iotests.skip_if_user_is_root |
| def test_rw_ro_retain(self): |
| os.chmod(new_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='retain') |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| self.assertEqual(self.vm.get_qmp_events(wait=False), []) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| def test_ro_rw(self): |
| os.chmod(old_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk,read-only=on', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', |
| id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='read-write') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_rw_ro(self): |
| os.chmod(new_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', |
| id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='read-only') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| def test_make_rw_ro(self): |
| self.vm.add_drive(old_img, 'media=disk', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', |
| id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='read-only') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| @iotests.skip_if_user_is_root |
| def test_make_ro_rw(self): |
| os.chmod(new_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', |
| id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='read-write') |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| def test_make_rw_ro_by_retain(self): |
| os.chmod(old_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk,read-only=on', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='retain') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| @iotests.skip_if_user_is_root |
| def test_make_ro_rw_by_retain(self): |
| os.chmod(new_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-change-medium', id=self.device_name, |
| filename=new_img, |
| format=iotests.imgfmt, |
| read_only_mode='retain') |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| def test_rw_ro_cycle(self): |
| os.chmod(new_img, 0o444) |
| self.vm.add_drive(old_img, 'media=disk', 'none') |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-add', |
| node_name='new', |
| driver=iotests.imgfmt, |
| read_only=True, |
| file={'filename': new_img, |
| 'driver': 'file'}) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', False) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| result = self.vm.qmp('blockdev-remove-medium', id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| |
| result = self.vm.qmp('blockdev-insert-medium', id=self.device_name, |
| node_name='new') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/ro', True) |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| |
| GeneralChangeTestsBaseClass = None |
| TestInitiallyFilled = None |
| TestInitiallyEmpty = None |
| |
| |
| class TestBlockJobsAfterCycle(ChangeBaseClass): |
| device_name = 'qdev0' |
| |
| def setUp(self): |
| qemu_img('create', '-f', iotests.imgfmt, old_img, '1440K') |
| |
| self.vm = iotests.VM() |
| self.vm.add_drive_raw("id=drive0,driver=null-co,if=none") |
| self.vm.add_device('floppy,drive=drive0,id=%s' % self.device_name) |
| self.vm.launch() |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/image/format', 'null-co') |
| |
| # For device-less BBs, calling blockdev-open-tray or blockdev-close-tray |
| # is not necessary |
| result = self.vm.qmp('blockdev-remove-medium', id=self.device_name) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp_absent(result, 'return[0]/inserted') |
| |
| result = self.vm.qmp('blockdev-add', |
| node_name='node0', |
| driver=iotests.imgfmt, |
| file={'filename': old_img, |
| 'driver': 'file'}) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('blockdev-insert-medium', id=self.device_name, |
| node_name='node0') |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img) |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| os.remove(old_img) |
| try: |
| os.remove(new_img) |
| except OSError: |
| pass |
| |
| def test_snapshot_and_commit(self): |
| # We need backing file support |
| if iotests.imgfmt != 'qcow2' and iotests.imgfmt != 'qed': |
| return |
| |
| result = self.vm.qmp('blockdev-snapshot-sync', device='drive0', |
| snapshot_file=new_img, |
| format=iotests.imgfmt) |
| self.assert_qmp(result, 'return', {}) |
| |
| result = self.vm.qmp('query-block') |
| self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img) |
| self.assert_qmp(result, |
| 'return[0]/inserted/image/backing-image/filename', |
| old_img) |
| |
| result = self.vm.qmp('block-commit', device='drive0') |
| self.assert_qmp(result, 'return', {}) |
| |
| self.vm.event_wait(name='BLOCK_JOB_READY') |
| |
| result = self.vm.qmp('query-block-jobs') |
| self.assert_qmp(result, 'return[0]/device', 'drive0') |
| |
| result = self.vm.qmp('block-job-complete', device='drive0') |
| self.assert_qmp(result, 'return', {}) |
| |
| self.vm.event_wait(name='BLOCK_JOB_COMPLETED') |
| |
| |
| if __name__ == '__main__': |
| if iotests.qemu_default_machine != 'pc': |
| # We need floppy and IDE CD-ROM |
| iotests.notrun('not suitable for this machine type: %s' % |
| iotests.qemu_default_machine) |
| # Need to support image creation |
| iotests.main(supported_fmts=['vpc', 'parallels', 'qcow', 'vdi', 'qcow2', |
| 'vmdk', 'raw', 'vhdx', 'qed'], |
| supported_protocols=['file']) |