| #!/usr/bin/env python3 |
| # group: rw |
| # |
| # Test case for encryption key management versus image sharing |
| # |
| # Copyright (C) 2019 Red Hat, Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import iotests |
| import os |
| import time |
| import json |
| |
| test_img = os.path.join(iotests.test_dir, 'test.img') |
| |
| class Secret: |
| def __init__(self, index): |
| self._id = "keysec" + str(index) |
| # you are not supposed to see the password... |
| self._secret = "hunter" + str(index) |
| |
| def id(self): |
| return self._id |
| |
| def secret(self): |
| return self._secret |
| |
| def to_cmdline_object(self): |
| return [ "secret,id=" + self._id + ",data=" + self._secret] |
| |
| def to_qmp_object(self): |
| return { "qom_type" : "secret", "id": self.id(), |
| "data": self.secret() } |
| |
| ################################################################################ |
| |
| class EncryptionSetupTestCase(iotests.QMPTestCase): |
| |
| # test case startup |
| def setUp(self): |
| |
| # start the VMs |
| self.vm1 = iotests.VM(path_suffix = 'VM1') |
| self.vm2 = iotests.VM(path_suffix = 'VM2') |
| self.vm1.launch() |
| self.vm2.launch() |
| |
| # create the secrets and load 'em into the VMs |
| self.secrets = [ Secret(i) for i in range(0, 4) ] |
| for secret in self.secrets: |
| result = self.vm1.qmp("object-add", **secret.to_qmp_object()) |
| self.assert_qmp(result, 'return', {}) |
| result = self.vm2.qmp("object-add", **secret.to_qmp_object()) |
| self.assert_qmp(result, 'return', {}) |
| |
| # test case shutdown |
| def tearDown(self): |
| # stop the VM |
| self.vm1.shutdown() |
| self.vm2.shutdown() |
| |
| ########################################################################### |
| # create the encrypted block device using qemu-img |
| def createImg(self, file, secret): |
| |
| output = iotests.qemu_img_pipe( |
| 'create', |
| '--object', *secret.to_cmdline_object(), |
| '-f', iotests.imgfmt, |
| '-o', 'key-secret=' + secret.id(), |
| '-o', 'iter-time=10', |
| file, |
| '1M') |
| |
| iotests.log(output, filters=[iotests.filter_test_dir]) |
| |
| # attempts to add a key using qemu-img |
| def addKey(self, file, secret, new_secret): |
| |
| image_options = { |
| 'key-secret' : secret.id(), |
| 'driver' : iotests.imgfmt, |
| 'file' : { |
| 'driver':'file', |
| 'filename': file, |
| } |
| } |
| |
| output = iotests.qemu_img_pipe( |
| 'amend', |
| '--object', *secret.to_cmdline_object(), |
| '--object', *new_secret.to_cmdline_object(), |
| |
| '-o', 'state=active', |
| '-o', 'new-secret=' + new_secret.id(), |
| '-o', 'iter-time=10', |
| |
| "json:" + json.dumps(image_options) |
| ) |
| |
| iotests.log(output, filters=[iotests.filter_test_dir]) |
| |
| ########################################################################### |
| # open an encrypted block device |
| def openImageQmp(self, vm, id, file, secret, |
| readOnly = False, reOpen = False): |
| |
| command = 'blockdev-reopen' if reOpen else 'blockdev-add' |
| |
| opts = { |
| 'driver': iotests.imgfmt, |
| 'node-name': id, |
| 'read-only': readOnly, |
| 'key-secret' : secret.id(), |
| 'file': { |
| 'driver': 'file', |
| 'filename': test_img, |
| } |
| } |
| |
| if reOpen: |
| result = vm.qmp(command, options=[opts]) |
| else: |
| result = vm.qmp(command, **opts) |
| self.assert_qmp(result, 'return', {}) |
| |
| |
| ########################################################################### |
| # add virtio-blk consumer for a block device |
| def addImageUser(self, vm, id, disk_id, share_rw=False): |
| result = vm.qmp('device_add', ** |
| { |
| 'driver': 'virtio-blk', |
| 'id': id, |
| 'drive': disk_id, |
| 'share-rw' : share_rw |
| } |
| ) |
| |
| iotests.log(result) |
| |
| # close the encrypted block device |
| def closeImageQmp(self, vm, id): |
| result = vm.qmp('blockdev-del', **{ 'node-name': id }) |
| self.assert_qmp(result, 'return', {}) |
| |
| ########################################################################### |
| |
| # add a key to an encrypted block device |
| def addKeyQmp(self, vm, id, new_secret): |
| |
| args = { |
| 'node-name': id, |
| 'job-id' : 'job0', |
| 'options' : { |
| 'state' : 'active', |
| 'driver' : iotests.imgfmt, |
| 'new-secret': new_secret.id(), |
| 'iter-time' : 10 |
| }, |
| } |
| |
| result = vm.qmp('x-blockdev-amend', **args) |
| iotests.log(result) |
| # Run the job only if it was created |
| event = ('JOB_STATUS_CHANGE', |
| {'data': {'id': 'job0', 'status': 'created'}}) |
| if vm.events_wait([event], timeout=0.0) is not None: |
| vm.run_job('job0') |
| |
| # test that when the image opened by two qemu processes, |
| # neither of them can update the encryption keys |
| def test1(self): |
| self.createImg(test_img, self.secrets[0]); |
| |
| # VM1 opens the image and adds a key |
| self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0]) |
| self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[1]) |
| |
| |
| # VM2 opens the image |
| self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0]) |
| |
| |
| # neither VMs now should be able to add a key |
| self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2]) |
| self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2]) |
| |
| |
| # VM 1 closes the image |
| self.closeImageQmp(self.vm1, "testdev") |
| |
| |
| # now VM2 can add the key |
| self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2]) |
| |
| |
| # qemu-img should also not be able to add a key |
| self.addKey(test_img, self.secrets[0], self.secrets[2]) |
| |
| # cleanup |
| self.closeImageQmp(self.vm2, "testdev") |
| os.remove(test_img) |
| |
| |
| # test that when the image opened by two qemu processes, |
| # even if first VM opens it read-only, the second can't update encryption |
| # keys |
| def test2(self): |
| self.createImg(test_img, self.secrets[0]); |
| |
| # VM1 opens the image readonly |
| self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0], |
| readOnly = True) |
| |
| # VM2 opens the image |
| self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0]) |
| |
| # VM1 can't add a key since image is readonly |
| self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2]) |
| |
| # VM2 can't add a key since VM is has the image opened |
| self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2]) |
| |
| |
| #VM1 reopens the image read-write |
| self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0], |
| reOpen = True, readOnly = False) |
| |
| # VM1 still can't add the key |
| self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2]) |
| |
| # VM2 gets away |
| self.closeImageQmp(self.vm2, "testdev") |
| |
| # VM1 now can add the key |
| self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2]) |
| |
| self.closeImageQmp(self.vm1, "testdev") |
| os.remove(test_img) |
| |
| # test that two VMs can't open the same luks image by default |
| # and attach it to a guest device |
| def test3(self): |
| self.createImg(test_img, self.secrets[0]); |
| |
| self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0]) |
| self.addImageUser(self.vm1, "testctrl", "testdev") |
| |
| self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0]) |
| self.addImageUser(self.vm2, "testctrl", "testdev") |
| |
| |
| # test that two VMs can attach the same luks image to a guest device, |
| # if both use share-rw=on |
| def test4(self): |
| self.createImg(test_img, self.secrets[0]); |
| |
| self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0]) |
| self.addImageUser(self.vm1, "testctrl", "testdev", share_rw=True) |
| |
| self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0]) |
| self.addImageUser(self.vm2, "testctrl", "testdev", share_rw=True) |
| |
| |
| |
| if __name__ == '__main__': |
| # support only raw luks since luks encrypted qcow2 is a proper |
| # format driver which doesn't allow any sharing |
| iotests.activate_logging() |
| iotests.main(supported_fmts = ['luks']) |