blob: 1229279116fd3623aa9b1123b48d62291938e7ad [file] [log] [blame]
#!/usr/bin/env python
# @ CommonUtility.py
# Common utility script
#
# Copyright (c) 2016 - 2021, Intel Corporation. All rights reserved.<BR>
# SPDX-License-Identifier: BSD-2-Clause-Patent
#
##
import os
import sys
import shutil
import subprocess
import string
from ctypes import ARRAY, c_char, c_uint16, c_uint32, \
c_uint8, Structure, sizeof
from importlib.machinery import SourceFileLoader
from SingleSign import single_sign_gen_pub_key
# Key types defined should match with cryptolib.h
PUB_KEY_TYPE = {
"RSA": 1,
"ECC": 2,
"DSA": 3,
}
# Signing type schemes defined should match with cryptolib.h
SIGN_TYPE_SCHEME = {
"RSA_PKCS1": 1,
"RSA_PSS": 2,
"ECC": 3,
"DSA": 4,
}
# Hash values defined should match with cryptolib.h
HASH_TYPE_VALUE = {
"SHA2_256": 1,
"SHA2_384": 2,
"SHA2_512": 3,
"SM3_256": 4,
}
# Hash values defined should match with cryptolib.h
HASH_VAL_STRING = dict(map(reversed, HASH_TYPE_VALUE.items()))
AUTH_TYPE_HASH_VALUE = {
"SHA2_256": 1,
"SHA2_384": 2,
"SHA2_512": 3,
"SM3_256": 4,
"RSA2048SHA256": 1,
"RSA3072SHA384": 2,
}
HASH_DIGEST_SIZE = {
"SHA2_256": 32,
"SHA2_384": 48,
"SHA2_512": 64,
"SM3_256": 32,
}
class PUB_KEY_HDR (Structure):
_pack_ = 1
_fields_ = [
('Identifier', ARRAY(c_char, 4)), # signature ('P', 'U', 'B', 'K')
('KeySize', c_uint16), # Length of Public Key
('KeyType', c_uint8), # RSA or ECC
('Reserved', ARRAY(c_uint8, 1)),
('KeyData', ARRAY(c_uint8, 0)),
]
def __init__(self):
self.Identifier = b'PUBK'
class SIGNATURE_HDR (Structure):
_pack_ = 1
_fields_ = [
('Identifier', ARRAY(c_char, 4)),
('SigSize', c_uint16),
('SigType', c_uint8),
('HashAlg', c_uint8),
('Signature', ARRAY(c_uint8, 0)),
]
def __init__(self):
self.Identifier = b'SIGN'
class LZ_HEADER(Structure):
_pack_ = 1
_fields_ = [
('signature', ARRAY(c_char, 4)),
('compressed_len', c_uint32),
('length', c_uint32),
('version', c_uint16),
('svn', c_uint8),
('attribute', c_uint8)
]
_compress_alg = {
b'LZDM': 'Dummy',
b'LZ4 ': 'Lz4',
b'LZMA': 'Lzma',
}
def print_bytes(data, indent=0, offset=0, show_ascii=False):
bytes_per_line = 16
printable = ' ' + string.ascii_letters + string.digits + string.punctuation
str_fmt = '{:s}{:04x}: {:%ds} {:s}' % (bytes_per_line * 3)
bytes_per_line
data_array = bytearray(data)
for idx in range(0, len(data_array), bytes_per_line):
hex_str = ' '.join(
'%02X' % val for val in data_array[idx:idx + bytes_per_line])
asc_str = ''.join('%c' % (val if (chr(val) in printable) else '.')
for val in data_array[idx:idx + bytes_per_line])
print(str_fmt.format(
indent * ' ',
offset + idx, hex_str,
' ' + asc_str if show_ascii else ''))
def get_bits_from_bytes(bytes, start, length):
if length == 0:
return 0
byte_start = (start) // 8
byte_end = (start + length - 1) // 8
bit_start = start & 7
mask = (1 << length) - 1
val = bytes_to_value(bytes[byte_start:byte_end + 1])
val = (val >> bit_start) & mask
return val
def set_bits_to_bytes(bytes, start, length, bvalue):
if length == 0:
return
byte_start = (start) // 8
byte_end = (start + length - 1) // 8
bit_start = start & 7
mask = (1 << length) - 1
val = bytes_to_value(bytes[byte_start:byte_end + 1])
val &= ~(mask << bit_start)
val |= ((bvalue & mask) << bit_start)
bytes[byte_start:byte_end+1] = value_to_bytearray(
val,
byte_end + 1 - byte_start)
def value_to_bytes(value, length):
return value.to_bytes(length, 'little')
def bytes_to_value(bytes):
return int.from_bytes(bytes, 'little')
def value_to_bytearray(value, length):
return bytearray(value_to_bytes(value, length))
# def value_to_bytearray (value, length):
return bytearray(value_to_bytes(value, length))
def get_aligned_value(value, alignment=4):
if alignment != (1 << (alignment.bit_length() - 1)):
raise Exception(
'Alignment (0x%x) should to be power of 2 !' % alignment)
value = (value + (alignment - 1)) & ~(alignment - 1)
return value
def get_padding_length(data_len, alignment=4):
new_data_len = get_aligned_value(data_len, alignment)
return new_data_len - data_len
def get_file_data(file, mode='rb'):
return open(file, mode).read()
def gen_file_from_object(file, object):
open(file, 'wb').write(object)
def gen_file_with_size(file, size):
open(file, 'wb').write(b'\xFF' * size)
def check_files_exist(base_name_list, dir='', ext=''):
for each in base_name_list:
if not os.path.exists(os.path.join(dir, each + ext)):
return False
return True
def load_source(name, filepath):
mod = SourceFileLoader(name, filepath).load_module()
return mod
def get_openssl_path():
if os.name == 'nt':
if 'OPENSSL_PATH' not in os.environ:
openssl_dir = "C:\\Openssl\\bin\\"
if os.path.exists(openssl_dir):
os.environ['OPENSSL_PATH'] = openssl_dir
else:
os.environ['OPENSSL_PATH'] = "C:\\Openssl\\"
if 'OPENSSL_CONF' not in os.environ:
openssl_cfg = "C:\\Openssl\\openssl.cfg"
if os.path.exists(openssl_cfg):
os.environ['OPENSSL_CONF'] = openssl_cfg
openssl = os.path.join(
os.environ.get('OPENSSL_PATH', ''),
'openssl.exe')
else:
# Get openssl path for Linux cases
openssl = shutil.which('openssl')
return openssl
def run_process(arg_list, print_cmd=False, capture_out=False):
sys.stdout.flush()
if os.name == 'nt' and os.path.splitext(arg_list[0])[1] == '' and \
os.path.exists(arg_list[0] + '.exe'):
arg_list[0] += '.exe'
if print_cmd:
print(' '.join(arg_list))
exc = None
result = 0
output = ''
try:
if capture_out:
output = subprocess.check_output(arg_list).decode()
else:
result = subprocess.call(arg_list)
except Exception as ex:
result = 1
exc = ex
if result:
if not print_cmd:
print('Error in running process:\n %s' % ' '.join(arg_list))
if exc is None:
sys.exit(1)
else:
raise exc
return output
# Adjust hash type algorithm based on Public key file
def adjust_hash_type(pub_key_file):
key_type = get_key_type(pub_key_file)
if key_type == 'RSA2048':
hash_type = 'SHA2_256'
elif key_type == 'RSA3072':
hash_type = 'SHA2_384'
else:
hash_type = None
return hash_type
def rsa_sign_file(
priv_key, pub_key, hash_type, sign_scheme,
in_file, out_file, inc_dat=False, inc_key=False):
bins = bytearray()
if inc_dat:
bins.extend(get_file_data(in_file))
# def single_sign_file(priv_key, hash_type, sign_scheme, in_file, out_file):
out_data = get_file_data(out_file)
sign = SIGNATURE_HDR()
sign.SigSize = len(out_data)
sign.SigType = SIGN_TYPE_SCHEME[sign_scheme]
sign.HashAlg = HASH_TYPE_VALUE[hash_type]
bins.extend(bytearray(sign) + out_data)
if inc_key:
key = gen_pub_key(priv_key, pub_key)
bins.extend(key)
if len(bins) != len(out_data):
gen_file_from_object(out_file, bins)
def get_key_type(in_key):
# Check in_key is file or key Id
if not os.path.exists(in_key):
key = bytearray(gen_pub_key(in_key))
else:
# Check for public key in binary format.
key = bytearray(get_file_data(in_key))
pub_key_hdr = PUB_KEY_HDR.from_buffer(key)
if pub_key_hdr.Identifier != b'PUBK':
pub_key = gen_pub_key(in_key)
pub_key_hdr = PUB_KEY_HDR.from_buffer(pub_key)
key_type = next(
(key for key,
value in PUB_KEY_TYPE.items() if value == pub_key_hdr.KeyType))
return '%s%d' % (key_type, (pub_key_hdr.KeySize - 4) * 8)
def get_auth_hash_type(key_type, sign_scheme):
if key_type == "RSA2048" and sign_scheme == "RSA_PKCS1":
hash_type = 'SHA2_256'
auth_type = 'RSA2048_PKCS1_SHA2_256'
elif key_type == "RSA3072" and sign_scheme == "RSA_PKCS1":
hash_type = 'SHA2_384'
auth_type = 'RSA3072_PKCS1_SHA2_384'
elif key_type == "RSA2048" and sign_scheme == "RSA_PSS":
hash_type = 'SHA2_256'
auth_type = 'RSA2048_PSS_SHA2_256'
elif key_type == "RSA3072" and sign_scheme == "RSA_PSS":
hash_type = 'SHA2_384'
auth_type = 'RSA3072_PSS_SHA2_384'
else:
hash_type = ''
auth_type = ''
return auth_type, hash_type
# def single_sign_gen_pub_key(in_key, pub_key_file=None):
def gen_pub_key(in_key, pub_key=None):
keydata = single_sign_gen_pub_key(in_key, pub_key)
publickey = PUB_KEY_HDR()
publickey.KeySize = len(keydata)
publickey.KeyType = PUB_KEY_TYPE['RSA']
key = bytearray(publickey) + keydata
if pub_key:
gen_file_from_object(pub_key, key)
return key
def decompress(in_file, out_file, tool_dir=''):
if not os.path.isfile(in_file):
raise Exception("Invalid input file '%s' !" % in_file)
# Remove the Lz Header
fi = open(in_file, 'rb')
di = bytearray(fi.read())
fi.close()
lz_hdr = LZ_HEADER.from_buffer(di)
offset = sizeof(lz_hdr)
if lz_hdr.signature == b"LZDM" or lz_hdr.compressed_len == 0:
fo = open(out_file, 'wb')
fo.write(di[offset:offset + lz_hdr.compressed_len])
fo.close()
return
temp = os.path.splitext(out_file)[0] + '.tmp'
if lz_hdr.signature == b"LZMA":
alg = "Lzma"
elif lz_hdr.signature == b"LZ4 ":
alg = "Lz4"
else:
raise Exception("Unsupported compression '%s' !" % lz_hdr.signature)
fo = open(temp, 'wb')
fo.write(di[offset:offset + lz_hdr.compressed_len])
fo.close()
compress_tool = "%sCompress" % alg
if alg == "Lz4":
try:
cmdline = [
os.path.join(tool_dir, compress_tool),
"-d",
"-o", out_file,
temp]
run_process(cmdline, False, True)
except Exception:
msg_string = "Could not find/use CompressLz4 tool, " \
"trying with python lz4..."
print(msg_string)
try:
import lz4.block
if lz4.VERSION != '3.1.1':
msg_string = "Recommended lz4 module version " \
"is '3.1.1'," + lz4.VERSION \
+ " is currently installed."
print(msg_string)
except ImportError:
msg_string = "Could not import lz4, use " \
"'python -m pip install lz4==3.1.1' " \
"to install it."
print(msg_string)
exit(1)
decompress_data = lz4.block.decompress(get_file_data(temp))
with open(out_file, "wb") as lz4bin:
lz4bin.write(decompress_data)
else:
cmdline = [
os.path.join(tool_dir, compress_tool),
"-d",
"-o", out_file,
temp]
run_process(cmdline, False, True)
os.remove(temp)
def compress(in_file, alg, svn=0, out_path='', tool_dir=''):
if not os.path.isfile(in_file):
raise Exception("Invalid input file '%s' !" % in_file)
basename, ext = os.path.splitext(os.path.basename(in_file))
if out_path:
if os.path.isdir(out_path):
out_file = os.path.join(out_path, basename + '.lz')
else:
out_file = os.path.join(out_path)
else:
out_file = os.path.splitext(in_file)[0] + '.lz'
if alg == "Lzma":
sig = "LZMA"
elif alg == "Tiano":
sig = "LZUF"
elif alg == "Lz4":
sig = "LZ4 "
elif alg == "Dummy":
sig = "LZDM"
else:
raise Exception("Unsupported compression '%s' !" % alg)
in_len = os.path.getsize(in_file)
if in_len > 0:
compress_tool = "%sCompress" % alg
if sig == "LZDM":
shutil.copy(in_file, out_file)
compress_data = get_file_data(out_file)
elif sig == "LZ4 ":
try:
cmdline = [
os.path.join(tool_dir, compress_tool),
"-e",
"-o", out_file,
in_file]
run_process(cmdline, False, True)
compress_data = get_file_data(out_file)
except Exception:
msg_string = "Could not find/use CompressLz4 tool, " \
"trying with python lz4..."
print(msg_string)
try:
import lz4.block
if lz4.VERSION != '3.1.1':
msg_string = "Recommended lz4 module version " \
"is '3.1.1', " + lz4.VERSION \
+ " is currently installed."
print(msg_string)
except ImportError:
msg_string = "Could not import lz4, use " \
"'python -m pip install lz4==3.1.1' " \
"to install it."
print(msg_string)
exit(1)
compress_data = lz4.block.compress(
get_file_data(in_file),
mode='high_compression')
elif sig == "LZMA":
cmdline = [
os.path.join(tool_dir, compress_tool),
"-e",
"-o", out_file,
in_file]
run_process(cmdline, False, True)
compress_data = get_file_data(out_file)
else:
compress_data = bytearray()
lz_hdr = LZ_HEADER()
lz_hdr.signature = sig.encode()
lz_hdr.svn = svn
lz_hdr.compressed_len = len(compress_data)
lz_hdr.length = os.path.getsize(in_file)
data = bytearray()
data.extend(lz_hdr)
data.extend(compress_data)
gen_file_from_object(out_file, data)
return out_file