|  | #!/usr/bin/env python3 | 
|  | # group: rw | 
|  | # | 
|  | # Tests for internal snapshot. | 
|  | # | 
|  | # Copyright (C) 2013 IBM, Inc. | 
|  | # | 
|  | # Based on 055. | 
|  | # | 
|  | # This program is free software; you can redistribute it and/or modify | 
|  | # it under the terms of the GNU General Public License as published by | 
|  | # the Free Software Foundation; either version 2 of the License, or | 
|  | # (at your option) any later version. | 
|  | # | 
|  | # This program is distributed in the hope that it will be useful, | 
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | 
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
|  | # GNU General Public License for more details. | 
|  | # | 
|  | # You should have received a copy of the GNU General Public License | 
|  | # along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
|  | # | 
|  |  | 
|  | import time | 
|  | import os | 
|  | import iotests | 
|  | from iotests import qemu_img, qemu_io | 
|  |  | 
|  | test_drv_base_name = 'drive' | 
|  |  | 
|  | class ImageSnapshotTestCase(iotests.QMPTestCase): | 
|  | image_len = 120 * 1024 * 1024 # MB | 
|  |  | 
|  | def __init__(self, *args): | 
|  | self.expect = [] | 
|  | super(ImageSnapshotTestCase, self).__init__(*args) | 
|  |  | 
|  | def _setUp(self, test_img_base_name, image_num): | 
|  | self.vm = iotests.VM() | 
|  | for i in range(0, image_num): | 
|  | filename = '%s%d' % (test_img_base_name, i) | 
|  | img = os.path.join(iotests.test_dir, filename) | 
|  | device = '%s%d' % (test_drv_base_name, i) | 
|  | qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len)) | 
|  | self.vm.add_drive(img) | 
|  | self.expect.append({'image': img, 'device': device, | 
|  | 'snapshots': [], | 
|  | 'snapshots_name_counter': 0}) | 
|  | self.vm.launch() | 
|  |  | 
|  | def tearDown(self): | 
|  | self.vm.shutdown() | 
|  | for dev_expect in self.expect: | 
|  | os.remove(dev_expect['image']) | 
|  |  | 
|  | def createSnapshotInTransaction(self, snapshot_num, abort = False): | 
|  | actions = [] | 
|  | for dev_expect in self.expect: | 
|  | num = dev_expect['snapshots_name_counter'] | 
|  | for j in range(0, snapshot_num): | 
|  | name = '%s_sn%d' % (dev_expect['device'], num) | 
|  | num = num + 1 | 
|  | if abort == False: | 
|  | dev_expect['snapshots'].append({'name': name}) | 
|  | dev_expect['snapshots_name_counter'] = num | 
|  | actions.append({ | 
|  | 'type': 'blockdev-snapshot-internal-sync', | 
|  | 'data': { 'device': dev_expect['device'], | 
|  | 'name': name }, | 
|  | }) | 
|  |  | 
|  | if abort == True: | 
|  | actions.append({ | 
|  | 'type': 'abort', | 
|  | 'data': {}, | 
|  | }) | 
|  |  | 
|  | result = self.vm.qmp('transaction', actions = actions) | 
|  |  | 
|  | if abort == True: | 
|  | self.assert_qmp(result, 'error/class', 'GenericError') | 
|  | else: | 
|  | self.assert_qmp(result, 'return', {}) | 
|  |  | 
|  | def verifySnapshotInfo(self): | 
|  | result = self.vm.qmp('query-block') | 
|  |  | 
|  | # Verify each expected result | 
|  | for dev_expect in self.expect: | 
|  | # 1. Find the returned image value and snapshot info | 
|  | image_result = None | 
|  | for device in result['return']: | 
|  | if device['device'] == dev_expect['device']: | 
|  | image_result = device['inserted']['image'] | 
|  | break | 
|  | self.assertTrue(image_result != None) | 
|  | # Do not consider zero snapshot case now | 
|  | sn_list_result = image_result['snapshots'] | 
|  | sn_list_expect = dev_expect['snapshots'] | 
|  |  | 
|  | # 2. Verify it with expect | 
|  | self.assertTrue(len(sn_list_result) == len(sn_list_expect)) | 
|  |  | 
|  | for sn_expect in sn_list_expect: | 
|  | sn_result = None | 
|  | for sn in sn_list_result: | 
|  | if sn_expect['name'] == sn['name']: | 
|  | sn_result = sn | 
|  | break | 
|  | self.assertTrue(sn_result != None) | 
|  | # Fill in the detail info | 
|  | sn_expect.update(sn_result) | 
|  |  | 
|  | def deleteSnapshot(self, device, id = None, name = None): | 
|  | sn_list_expect = None | 
|  | sn_expect = None | 
|  |  | 
|  | self.assertTrue(id != None or name != None) | 
|  |  | 
|  | # Fill in the detail info include ID | 
|  | self.verifySnapshotInfo() | 
|  |  | 
|  | #find the expected snapshot list | 
|  | for dev_expect in self.expect: | 
|  | if dev_expect['device'] == device: | 
|  | sn_list_expect = dev_expect['snapshots'] | 
|  | break | 
|  | self.assertTrue(sn_list_expect != None) | 
|  |  | 
|  | if id != None and name != None: | 
|  | for sn in sn_list_expect: | 
|  | if sn['id'] == id and sn['name'] == name: | 
|  | sn_expect = sn | 
|  | result = \ | 
|  | self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
|  | device = device, | 
|  | id = id, | 
|  | name = name) | 
|  | break | 
|  | elif id != None: | 
|  | for sn in sn_list_expect: | 
|  | if sn['id'] == id: | 
|  | sn_expect = sn | 
|  | result = \ | 
|  | self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
|  | device = device, | 
|  | id = id) | 
|  | break | 
|  | else: | 
|  | for sn in sn_list_expect: | 
|  | if sn['name'] == name: | 
|  | sn_expect = sn | 
|  | result = \ | 
|  | self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
|  | device = device, | 
|  | name = name) | 
|  | break | 
|  |  | 
|  | self.assertTrue(sn_expect != None) | 
|  |  | 
|  | self.assert_qmp(result, 'return', sn_expect) | 
|  | sn_list_expect.remove(sn_expect) | 
|  |  | 
|  | class TestSingleTransaction(ImageSnapshotTestCase): | 
|  | def setUp(self): | 
|  | self._setUp('test_a.img', 1) | 
|  |  | 
|  | def test_create(self): | 
|  | self.createSnapshotInTransaction(1) | 
|  | self.verifySnapshotInfo() | 
|  |  | 
|  | def test_error_name_empty(self): | 
|  | actions = [{'type': 'blockdev-snapshot-internal-sync', | 
|  | 'data': { 'device': self.expect[0]['device'], | 
|  | 'name': '' }, | 
|  | }] | 
|  | result = self.vm.qmp('transaction', actions = actions) | 
|  | self.assert_qmp(result, 'error/class', 'GenericError') | 
|  |  | 
|  | def test_error_device(self): | 
|  | actions = [{'type': 'blockdev-snapshot-internal-sync', | 
|  | 'data': { 'device': 'drive_error', | 
|  | 'name': 'a' }, | 
|  | }] | 
|  | result = self.vm.qmp('transaction', actions = actions) | 
|  | self.assert_qmp(result, 'error/class', 'GenericError') | 
|  |  | 
|  | def test_error_exist(self): | 
|  | self.createSnapshotInTransaction(1) | 
|  | self.verifySnapshotInfo() | 
|  | actions = [{'type': 'blockdev-snapshot-internal-sync', | 
|  | 'data': { 'device': self.expect[0]['device'], | 
|  | 'name': self.expect[0]['snapshots'][0] }, | 
|  | }] | 
|  | result = self.vm.qmp('transaction', actions = actions) | 
|  | self.assert_qmp(result, 'error/class', 'GenericError') | 
|  |  | 
|  | class TestMultipleTransaction(ImageSnapshotTestCase): | 
|  | def setUp(self): | 
|  | self._setUp('test_b.img', 2) | 
|  |  | 
|  | def test_create(self): | 
|  | self.createSnapshotInTransaction(3) | 
|  | self.verifySnapshotInfo() | 
|  |  | 
|  | def test_abort(self): | 
|  | self.createSnapshotInTransaction(2) | 
|  | self.verifySnapshotInfo() | 
|  | self.createSnapshotInTransaction(3, abort = True) | 
|  | self.verifySnapshotInfo() | 
|  |  | 
|  | class TestSnapshotDelete(ImageSnapshotTestCase): | 
|  | def setUp(self): | 
|  | self._setUp('test_c.img', 1) | 
|  |  | 
|  | def test_delete_with_id(self): | 
|  | self.createSnapshotInTransaction(2) | 
|  | self.verifySnapshotInfo() | 
|  | self.deleteSnapshot(self.expect[0]['device'], | 
|  | id = self.expect[0]['snapshots'][0]['id']) | 
|  | self.verifySnapshotInfo() | 
|  |  | 
|  | def test_delete_with_name(self): | 
|  | self.createSnapshotInTransaction(3) | 
|  | self.verifySnapshotInfo() | 
|  | self.deleteSnapshot(self.expect[0]['device'], | 
|  | name = self.expect[0]['snapshots'][1]['name']) | 
|  | self.verifySnapshotInfo() | 
|  |  | 
|  | def test_delete_with_id_and_name(self): | 
|  | self.createSnapshotInTransaction(4) | 
|  | self.verifySnapshotInfo() | 
|  | self.deleteSnapshot(self.expect[0]['device'], | 
|  | id = self.expect[0]['snapshots'][2]['id'], | 
|  | name = self.expect[0]['snapshots'][2]['name']) | 
|  | self.verifySnapshotInfo() | 
|  |  | 
|  |  | 
|  | def test_error_device(self): | 
|  | result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
|  | device = 'drive_error', | 
|  | id = '0') | 
|  | self.assert_qmp(result, 'error/class', 'GenericError') | 
|  |  | 
|  | def test_error_no_id_and_name(self): | 
|  | result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
|  | device = self.expect[0]['device']) | 
|  | self.assert_qmp(result, 'error/class', 'GenericError') | 
|  |  | 
|  | def test_error_snapshot_not_exist(self): | 
|  | self.createSnapshotInTransaction(2) | 
|  | self.verifySnapshotInfo() | 
|  | result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
|  | device = self.expect[0]['device'], | 
|  | id = self.expect[0]['snapshots'][0]['id'], | 
|  | name = self.expect[0]['snapshots'][1]['name']) | 
|  | self.assert_qmp(result, 'error/class', 'GenericError') | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | iotests.main(supported_fmts=['qcow2'], | 
|  | supported_protocols=['file']) |