| #!/usr/bin/env python3 |
| # group: rw |
| # |
| # Tests for internal snapshot. |
| # |
| # Copyright (C) 2013 IBM, Inc. |
| # |
| # Based on 055. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import time |
| import os |
| import iotests |
| from iotests import qemu_img, qemu_io |
| |
| test_drv_base_name = 'drive' |
| |
| class ImageSnapshotTestCase(iotests.QMPTestCase): |
| image_len = 120 * 1024 * 1024 # MB |
| |
| def __init__(self, *args): |
| self.expect = [] |
| super(ImageSnapshotTestCase, self).__init__(*args) |
| |
| def _setUp(self, test_img_base_name, image_num): |
| self.vm = iotests.VM() |
| for i in range(0, image_num): |
| filename = '%s%d' % (test_img_base_name, i) |
| img = os.path.join(iotests.test_dir, filename) |
| device = '%s%d' % (test_drv_base_name, i) |
| qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len)) |
| self.vm.add_drive(img) |
| self.expect.append({'image': img, 'device': device, |
| 'snapshots': [], |
| 'snapshots_name_counter': 0}) |
| self.vm.launch() |
| |
| def tearDown(self): |
| self.vm.shutdown() |
| for dev_expect in self.expect: |
| os.remove(dev_expect['image']) |
| |
| def createSnapshotInTransaction(self, snapshot_num, abort = False): |
| actions = [] |
| for dev_expect in self.expect: |
| num = dev_expect['snapshots_name_counter'] |
| for j in range(0, snapshot_num): |
| name = '%s_sn%d' % (dev_expect['device'], num) |
| num = num + 1 |
| if abort == False: |
| dev_expect['snapshots'].append({'name': name}) |
| dev_expect['snapshots_name_counter'] = num |
| actions.append({ |
| 'type': 'blockdev-snapshot-internal-sync', |
| 'data': { 'device': dev_expect['device'], |
| 'name': name }, |
| }) |
| |
| if abort == True: |
| actions.append({ |
| 'type': 'abort', |
| 'data': {}, |
| }) |
| |
| result = self.vm.qmp('transaction', actions = actions) |
| |
| if abort == True: |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| else: |
| self.assert_qmp(result, 'return', {}) |
| |
| def verifySnapshotInfo(self): |
| result = self.vm.qmp('query-block') |
| |
| # Verify each expected result |
| for dev_expect in self.expect: |
| # 1. Find the returned image value and snapshot info |
| image_result = None |
| for device in result['return']: |
| if device['device'] == dev_expect['device']: |
| image_result = device['inserted']['image'] |
| break |
| self.assertTrue(image_result != None) |
| # Do not consider zero snapshot case now |
| sn_list_result = image_result['snapshots'] |
| sn_list_expect = dev_expect['snapshots'] |
| |
| # 2. Verify it with expect |
| self.assertTrue(len(sn_list_result) == len(sn_list_expect)) |
| |
| for sn_expect in sn_list_expect: |
| sn_result = None |
| for sn in sn_list_result: |
| if sn_expect['name'] == sn['name']: |
| sn_result = sn |
| break |
| self.assertTrue(sn_result != None) |
| # Fill in the detail info |
| sn_expect.update(sn_result) |
| |
| def deleteSnapshot(self, device, id = None, name = None): |
| sn_list_expect = None |
| sn_expect = None |
| |
| self.assertTrue(id != None or name != None) |
| |
| # Fill in the detail info include ID |
| self.verifySnapshotInfo() |
| |
| #find the expected snapshot list |
| for dev_expect in self.expect: |
| if dev_expect['device'] == device: |
| sn_list_expect = dev_expect['snapshots'] |
| break |
| self.assertTrue(sn_list_expect != None) |
| |
| if id != None and name != None: |
| for sn in sn_list_expect: |
| if sn['id'] == id and sn['name'] == name: |
| sn_expect = sn |
| result = \ |
| self.vm.qmp('blockdev-snapshot-delete-internal-sync', |
| device = device, |
| id = id, |
| name = name) |
| break |
| elif id != None: |
| for sn in sn_list_expect: |
| if sn['id'] == id: |
| sn_expect = sn |
| result = \ |
| self.vm.qmp('blockdev-snapshot-delete-internal-sync', |
| device = device, |
| id = id) |
| break |
| else: |
| for sn in sn_list_expect: |
| if sn['name'] == name: |
| sn_expect = sn |
| result = \ |
| self.vm.qmp('blockdev-snapshot-delete-internal-sync', |
| device = device, |
| name = name) |
| break |
| |
| self.assertTrue(sn_expect != None) |
| |
| self.assert_qmp(result, 'return', sn_expect) |
| sn_list_expect.remove(sn_expect) |
| |
| class TestSingleTransaction(ImageSnapshotTestCase): |
| def setUp(self): |
| self._setUp('test_a.img', 1) |
| |
| def test_create(self): |
| self.createSnapshotInTransaction(1) |
| self.verifySnapshotInfo() |
| |
| def test_error_name_empty(self): |
| actions = [{'type': 'blockdev-snapshot-internal-sync', |
| 'data': { 'device': self.expect[0]['device'], |
| 'name': '' }, |
| }] |
| result = self.vm.qmp('transaction', actions = actions) |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| def test_error_device(self): |
| actions = [{'type': 'blockdev-snapshot-internal-sync', |
| 'data': { 'device': 'drive_error', |
| 'name': 'a' }, |
| }] |
| result = self.vm.qmp('transaction', actions = actions) |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| def test_error_exist(self): |
| self.createSnapshotInTransaction(1) |
| self.verifySnapshotInfo() |
| actions = [{'type': 'blockdev-snapshot-internal-sync', |
| 'data': { 'device': self.expect[0]['device'], |
| 'name': self.expect[0]['snapshots'][0] }, |
| }] |
| result = self.vm.qmp('transaction', actions = actions) |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| class TestMultipleTransaction(ImageSnapshotTestCase): |
| def setUp(self): |
| self._setUp('test_b.img', 2) |
| |
| def test_create(self): |
| self.createSnapshotInTransaction(3) |
| self.verifySnapshotInfo() |
| |
| def test_abort(self): |
| self.createSnapshotInTransaction(2) |
| self.verifySnapshotInfo() |
| self.createSnapshotInTransaction(3, abort = True) |
| self.verifySnapshotInfo() |
| |
| class TestSnapshotDelete(ImageSnapshotTestCase): |
| def setUp(self): |
| self._setUp('test_c.img', 1) |
| |
| def test_delete_with_id(self): |
| self.createSnapshotInTransaction(2) |
| self.verifySnapshotInfo() |
| self.deleteSnapshot(self.expect[0]['device'], |
| id = self.expect[0]['snapshots'][0]['id']) |
| self.verifySnapshotInfo() |
| |
| def test_delete_with_name(self): |
| self.createSnapshotInTransaction(3) |
| self.verifySnapshotInfo() |
| self.deleteSnapshot(self.expect[0]['device'], |
| name = self.expect[0]['snapshots'][1]['name']) |
| self.verifySnapshotInfo() |
| |
| def test_delete_with_id_and_name(self): |
| self.createSnapshotInTransaction(4) |
| self.verifySnapshotInfo() |
| self.deleteSnapshot(self.expect[0]['device'], |
| id = self.expect[0]['snapshots'][2]['id'], |
| name = self.expect[0]['snapshots'][2]['name']) |
| self.verifySnapshotInfo() |
| |
| |
| def test_error_device(self): |
| result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', |
| device = 'drive_error', |
| id = '0') |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| def test_error_no_id_and_name(self): |
| result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', |
| device = self.expect[0]['device']) |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| def test_error_snapshot_not_exist(self): |
| self.createSnapshotInTransaction(2) |
| self.verifySnapshotInfo() |
| result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', |
| device = self.expect[0]['device'], |
| id = self.expect[0]['snapshots'][0]['id'], |
| name = self.expect[0]['snapshots'][1]['name']) |
| self.assert_qmp(result, 'error/class', 'GenericError') |
| |
| if __name__ == '__main__': |
| iotests.main(supported_fmts=['qcow2'], |
| supported_protocols=['file']) |