| #!/usr/bin/env python3 |
| # |
| # Test case QMP's encrypted key management |
| # |
| # Copyright (C) 2019 Red Hat, Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import iotests |
| import os |
| import time |
| import json |
| |
| test_img = os.path.join(iotests.test_dir, 'test.img') |
| |
| class Secret: |
| def __init__(self, index): |
| self._id = "keysec" + str(index) |
| # you are not supposed to see the password... |
| self._secret = "hunter" + str(index) |
| |
| def id(self): |
| return self._id |
| |
| def secret(self): |
| return self._secret |
| |
| def to_cmdline_object(self): |
| return [ "secret,id=" + self._id + ",data=" + self._secret] |
| |
| def to_qmp_object(self): |
| return { "qom_type" : "secret", "id": self.id(), |
| "props": { "data": self.secret() } } |
| |
| ################################################################################ |
| class EncryptionSetupTestCase(iotests.QMPTestCase): |
| |
| # test case startup |
| def setUp(self): |
| # start the VM |
| self.vm = iotests.VM() |
| self.vm.launch() |
| |
| # create the secrets and load 'em into the VM |
| self.secrets = [ Secret(i) for i in range(0, 6) ] |
| for secret in self.secrets: |
| result = self.vm.qmp("object-add", **secret.to_qmp_object()) |
| self.assert_qmp(result, 'return', {}) |
| |
| if iotests.imgfmt == "qcow2": |
| self.pfx = "encrypt." |
| self.img_opts = [ '-o', "encrypt.format=luks" ] |
| else: |
| self.pfx = "" |
| self.img_opts = [] |
| |
| # test case shutdown |
| def tearDown(self): |
| # stop the VM |
| self.vm.shutdown() |
| |
| ########################################################################### |
| # create the encrypted block device |
| def createImg(self, file, secret): |
| |
| iotests.qemu_img( |
| 'create', |
| '--object', *secret.to_cmdline_object(), |
| '-f', iotests.imgfmt, |
| '-o', self.pfx + 'key-secret=' + secret.id(), |
| '-o', self.pfx + 'iter-time=10', |
| *self.img_opts, |
| file, |
| '1M') |
| |
| ########################################################################### |
| # open an encrypted block device |
| def openImageQmp(self, id, file, secret, read_only = False): |
| |
| encrypt_options = { |
| 'key-secret' : secret.id() |
| } |
| |
| if iotests.imgfmt == "qcow2": |
| encrypt_options = { |
| 'encrypt': { |
| 'format':'luks', |
| **encrypt_options |
| } |
| } |
| |
| result = self.vm.qmp('blockdev-add', ** |
| { |
| 'driver': iotests.imgfmt, |
| 'node-name': id, |
| 'read-only': read_only, |
| |
| **encrypt_options, |
| |
| 'file': { |
| 'driver': 'file', |
| 'filename': test_img, |
| } |
| } |
| ) |
| self.assert_qmp(result, 'return', {}) |
| |
| # close the encrypted block device |
| def closeImageQmp(self, id): |
| result = self.vm.qmp('blockdev-del', **{ 'node-name': id }) |
| self.assert_qmp(result, 'return', {}) |
| |
| ########################################################################### |
| # add a key to an encrypted block device |
| def addKeyQmp(self, id, new_secret, secret = None, |
| slot = None, force = False): |
| |
| crypt_options = { |
| 'state' : 'active', |
| 'new-secret' : new_secret.id(), |
| 'iter-time' : 10 |
| } |
| |
| if slot != None: |
| crypt_options['keyslot'] = slot |
| |
| |
| if secret != None: |
| crypt_options['secret'] = secret.id() |
| |
| if iotests.imgfmt == "qcow2": |
| crypt_options['format'] = 'luks' |
| crypt_options = { |
| 'encrypt': crypt_options |
| } |
| |
| args = { |
| 'node-name': id, |
| 'job-id' : 'job_add_key', |
| 'options' : { |
| 'driver' : iotests.imgfmt, |
| **crypt_options |
| }, |
| } |
| |
| if force == True: |
| args['force'] = True |
| |
| #TODO: check what jobs return |
| result = self.vm.qmp('x-blockdev-amend', **args) |
| assert result['return'] == {} |
| self.vm.run_job('job_add_key') |
| |
| # erase a key from an encrypted block device |
| def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False): |
| |
| crypt_options = { |
| 'state' : 'inactive', |
| } |
| |
| if slot != None: |
| crypt_options['keyslot'] = slot |
| if old_secret != None: |
| crypt_options['old-secret'] = old_secret.id() |
| |
| if iotests.imgfmt == "qcow2": |
| crypt_options['format'] = 'luks' |
| crypt_options = { |
| 'encrypt': crypt_options |
| } |
| |
| args = { |
| 'node-name': id, |
| 'job-id' : 'job_erase_key', |
| 'options' : { |
| 'driver' : iotests.imgfmt, |
| **crypt_options |
| }, |
| } |
| |
| if force == True: |
| args['force'] = True |
| |
| result = self.vm.qmp('x-blockdev-amend', **args) |
| assert result['return'] == {} |
| self.vm.run_job('job_erase_key') |
| |
| ########################################################################### |
| # create image, and change its key |
| def testChangeKey(self): |
| |
| # create the image with secret0 and open it |
| self.createImg(test_img, self.secrets[0]); |
| self.openImageQmp("testdev", test_img, self.secrets[0]) |
| |
| # add key to slot 1 |
| self.addKeyQmp("testdev", new_secret = self.secrets[1]) |
| |
| # add key to slot 5 |
| self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5) |
| |
| # erase key from slot 0 |
| self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) |
| |
| #reopen the image with secret1 |
| self.closeImageQmp("testdev") |
| self.openImageQmp("testdev", test_img, self.secrets[1]) |
| |
| # close and erase the image for good |
| self.closeImageQmp("testdev") |
| os.remove(test_img) |
| |
| # test that if we erase the old password, |
| # we can still change the encryption keys using 'old-secret' |
| def testOldPassword(self): |
| |
| # create the image with secret0 and open it |
| self.createImg(test_img, self.secrets[0]); |
| self.openImageQmp("testdev", test_img, self.secrets[0]) |
| |
| # add key to slot 1 |
| self.addKeyQmp("testdev", new_secret = self.secrets[1]) |
| |
| # erase key from slot 0 |
| self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) |
| |
| # this will fail as the old password is no longer valid |
| self.addKeyQmp("testdev", new_secret = self.secrets[2]) |
| |
| # this will work |
| self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1]) |
| |
| # close and erase the image for good |
| self.closeImageQmp("testdev") |
| os.remove(test_img) |
| |
| def testUseForceLuke(self): |
| |
| self.createImg(test_img, self.secrets[0]); |
| self.openImageQmp("testdev", test_img, self.secrets[0]) |
| |
| # Add bunch of secrets |
| self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4) |
| self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2) |
| |
| # overwrite an active secret |
| self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2) |
| self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True) |
| |
| self.addKeyQmp("testdev", new_secret = self.secrets[0]) |
| |
| # Now erase all the secrets |
| self.eraseKeyQmp("testdev", old_secret = self.secrets[5]) |
| self.eraseKeyQmp("testdev", slot=4) |
| |
| # erase last keyslot |
| self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) |
| self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True) |
| |
| self.closeImageQmp("testdev") |
| os.remove(test_img) |
| |
| |
| if __name__ == '__main__': |
| iotests.verify_working_luks() |
| # Encrypted formats support |
| iotests.activate_logging() |
| iotests.main(supported_fmts = ['qcow2', 'luks']) |