#!/usr/bin/env python | |
# @ CommonUtility.py | |
# Common utility script | |
# | |
# Copyright (c) 2016 - 2021, Intel Corporation. All rights reserved.<BR> | |
# SPDX-License-Identifier: BSD-2-Clause-Patent | |
# | |
## | |
import os | |
import sys | |
import shutil | |
import subprocess | |
import string | |
from ctypes import ARRAY, c_char, c_uint16, c_uint32, \ | |
c_uint8, Structure, sizeof | |
from importlib.machinery import SourceFileLoader | |
from SingleSign import single_sign_gen_pub_key | |
# Key types defined should match with cryptolib.h | |
PUB_KEY_TYPE = { | |
"RSA": 1, | |
"ECC": 2, | |
"DSA": 3, | |
} | |
# Signing type schemes defined should match with cryptolib.h | |
SIGN_TYPE_SCHEME = { | |
"RSA_PKCS1": 1, | |
"RSA_PSS": 2, | |
"ECC": 3, | |
"DSA": 4, | |
} | |
# Hash values defined should match with cryptolib.h | |
HASH_TYPE_VALUE = { | |
"SHA2_256": 1, | |
"SHA2_384": 2, | |
"SHA2_512": 3, | |
"SM3_256": 4, | |
} | |
# Hash values defined should match with cryptolib.h | |
HASH_VAL_STRING = dict(map(reversed, HASH_TYPE_VALUE.items())) | |
AUTH_TYPE_HASH_VALUE = { | |
"SHA2_256": 1, | |
"SHA2_384": 2, | |
"SHA2_512": 3, | |
"SM3_256": 4, | |
"RSA2048SHA256": 1, | |
"RSA3072SHA384": 2, | |
} | |
HASH_DIGEST_SIZE = { | |
"SHA2_256": 32, | |
"SHA2_384": 48, | |
"SHA2_512": 64, | |
"SM3_256": 32, | |
} | |
class PUB_KEY_HDR (Structure): | |
_pack_ = 1 | |
_fields_ = [ | |
('Identifier', ARRAY(c_char, 4)), # signature ('P', 'U', 'B', 'K') | |
('KeySize', c_uint16), # Length of Public Key | |
('KeyType', c_uint8), # RSA or ECC | |
('Reserved', ARRAY(c_uint8, 1)), | |
('KeyData', ARRAY(c_uint8, 0)), | |
] | |
def __init__(self): | |
self.Identifier = b'PUBK' | |
class SIGNATURE_HDR (Structure): | |
_pack_ = 1 | |
_fields_ = [ | |
('Identifier', ARRAY(c_char, 4)), | |
('SigSize', c_uint16), | |
('SigType', c_uint8), | |
('HashAlg', c_uint8), | |
('Signature', ARRAY(c_uint8, 0)), | |
] | |
def __init__(self): | |
self.Identifier = b'SIGN' | |
class LZ_HEADER(Structure): | |
_pack_ = 1 | |
_fields_ = [ | |
('signature', ARRAY(c_char, 4)), | |
('compressed_len', c_uint32), | |
('length', c_uint32), | |
('version', c_uint16), | |
('svn', c_uint8), | |
('attribute', c_uint8) | |
] | |
_compress_alg = { | |
b'LZDM': 'Dummy', | |
b'LZ4 ': 'Lz4', | |
b'LZMA': 'Lzma', | |
} | |
def print_bytes(data, indent=0, offset=0, show_ascii=False): | |
bytes_per_line = 16 | |
printable = ' ' + string.ascii_letters + string.digits + string.punctuation | |
str_fmt = '{:s}{:04x}: {:%ds} {:s}' % (bytes_per_line * 3) | |
bytes_per_line | |
data_array = bytearray(data) | |
for idx in range(0, len(data_array), bytes_per_line): | |
hex_str = ' '.join( | |
'%02X' % val for val in data_array[idx:idx + bytes_per_line]) | |
asc_str = ''.join('%c' % (val if (chr(val) in printable) else '.') | |
for val in data_array[idx:idx + bytes_per_line]) | |
print(str_fmt.format( | |
indent * ' ', | |
offset + idx, hex_str, | |
' ' + asc_str if show_ascii else '')) | |
def get_bits_from_bytes(bytes, start, length): | |
if length == 0: | |
return 0 | |
byte_start = (start) // 8 | |
byte_end = (start + length - 1) // 8 | |
bit_start = start & 7 | |
mask = (1 << length) - 1 | |
val = bytes_to_value(bytes[byte_start:byte_end + 1]) | |
val = (val >> bit_start) & mask | |
return val | |
def set_bits_to_bytes(bytes, start, length, bvalue): | |
if length == 0: | |
return | |
byte_start = (start) // 8 | |
byte_end = (start + length - 1) // 8 | |
bit_start = start & 7 | |
mask = (1 << length) - 1 | |
val = bytes_to_value(bytes[byte_start:byte_end + 1]) | |
val &= ~(mask << bit_start) | |
val |= ((bvalue & mask) << bit_start) | |
bytes[byte_start:byte_end+1] = value_to_bytearray( | |
val, | |
byte_end + 1 - byte_start) | |
def value_to_bytes(value, length): | |
return value.to_bytes(length, 'little') | |
def bytes_to_value(bytes): | |
return int.from_bytes(bytes, 'little') | |
def value_to_bytearray(value, length): | |
return bytearray(value_to_bytes(value, length)) | |
# def value_to_bytearray (value, length): | |
return bytearray(value_to_bytes(value, length)) | |
def get_aligned_value(value, alignment=4): | |
if alignment != (1 << (alignment.bit_length() - 1)): | |
raise Exception( | |
'Alignment (0x%x) should to be power of 2 !' % alignment) | |
value = (value + (alignment - 1)) & ~(alignment - 1) | |
return value | |
def get_padding_length(data_len, alignment=4): | |
new_data_len = get_aligned_value(data_len, alignment) | |
return new_data_len - data_len | |
def get_file_data(file, mode='rb'): | |
return open(file, mode).read() | |
def gen_file_from_object(file, object): | |
open(file, 'wb').write(object) | |
def gen_file_with_size(file, size): | |
open(file, 'wb').write(b'\xFF' * size) | |
def check_files_exist(base_name_list, dir='', ext=''): | |
for each in base_name_list: | |
if not os.path.exists(os.path.join(dir, each + ext)): | |
return False | |
return True | |
def load_source(name, filepath): | |
mod = SourceFileLoader(name, filepath).load_module() | |
return mod | |
def get_openssl_path(): | |
if os.name == 'nt': | |
if 'OPENSSL_PATH' not in os.environ: | |
openssl_dir = "C:\\Openssl\\bin\\" | |
if os.path.exists(openssl_dir): | |
os.environ['OPENSSL_PATH'] = openssl_dir | |
else: | |
os.environ['OPENSSL_PATH'] = "C:\\Openssl\\" | |
if 'OPENSSL_CONF' not in os.environ: | |
openssl_cfg = "C:\\Openssl\\openssl.cfg" | |
if os.path.exists(openssl_cfg): | |
os.environ['OPENSSL_CONF'] = openssl_cfg | |
openssl = os.path.join( | |
os.environ.get('OPENSSL_PATH', ''), | |
'openssl.exe') | |
else: | |
# Get openssl path for Linux cases | |
openssl = shutil.which('openssl') | |
return openssl | |
def run_process(arg_list, print_cmd=False, capture_out=False): | |
sys.stdout.flush() | |
if os.name == 'nt' and os.path.splitext(arg_list[0])[1] == '' and \ | |
os.path.exists(arg_list[0] + '.exe'): | |
arg_list[0] += '.exe' | |
if print_cmd: | |
print(' '.join(arg_list)) | |
exc = None | |
result = 0 | |
output = '' | |
try: | |
if capture_out: | |
output = subprocess.check_output(arg_list).decode() | |
else: | |
result = subprocess.call(arg_list) | |
except Exception as ex: | |
result = 1 | |
exc = ex | |
if result: | |
if not print_cmd: | |
print('Error in running process:\n %s' % ' '.join(arg_list)) | |
if exc is None: | |
sys.exit(1) | |
else: | |
raise exc | |
return output | |
# Adjust hash type algorithm based on Public key file | |
def adjust_hash_type(pub_key_file): | |
key_type = get_key_type(pub_key_file) | |
if key_type == 'RSA2048': | |
hash_type = 'SHA2_256' | |
elif key_type == 'RSA3072': | |
hash_type = 'SHA2_384' | |
else: | |
hash_type = None | |
return hash_type | |
def rsa_sign_file( | |
priv_key, pub_key, hash_type, sign_scheme, | |
in_file, out_file, inc_dat=False, inc_key=False): | |
bins = bytearray() | |
if inc_dat: | |
bins.extend(get_file_data(in_file)) | |
# def single_sign_file(priv_key, hash_type, sign_scheme, in_file, out_file): | |
out_data = get_file_data(out_file) | |
sign = SIGNATURE_HDR() | |
sign.SigSize = len(out_data) | |
sign.SigType = SIGN_TYPE_SCHEME[sign_scheme] | |
sign.HashAlg = HASH_TYPE_VALUE[hash_type] | |
bins.extend(bytearray(sign) + out_data) | |
if inc_key: | |
key = gen_pub_key(priv_key, pub_key) | |
bins.extend(key) | |
if len(bins) != len(out_data): | |
gen_file_from_object(out_file, bins) | |
def get_key_type(in_key): | |
# Check in_key is file or key Id | |
if not os.path.exists(in_key): | |
key = bytearray(gen_pub_key(in_key)) | |
else: | |
# Check for public key in binary format. | |
key = bytearray(get_file_data(in_key)) | |
pub_key_hdr = PUB_KEY_HDR.from_buffer(key) | |
if pub_key_hdr.Identifier != b'PUBK': | |
pub_key = gen_pub_key(in_key) | |
pub_key_hdr = PUB_KEY_HDR.from_buffer(pub_key) | |
key_type = next( | |
(key for key, | |
value in PUB_KEY_TYPE.items() if value == pub_key_hdr.KeyType)) | |
return '%s%d' % (key_type, (pub_key_hdr.KeySize - 4) * 8) | |
def get_auth_hash_type(key_type, sign_scheme): | |
if key_type == "RSA2048" and sign_scheme == "RSA_PKCS1": | |
hash_type = 'SHA2_256' | |
auth_type = 'RSA2048_PKCS1_SHA2_256' | |
elif key_type == "RSA3072" and sign_scheme == "RSA_PKCS1": | |
hash_type = 'SHA2_384' | |
auth_type = 'RSA3072_PKCS1_SHA2_384' | |
elif key_type == "RSA2048" and sign_scheme == "RSA_PSS": | |
hash_type = 'SHA2_256' | |
auth_type = 'RSA2048_PSS_SHA2_256' | |
elif key_type == "RSA3072" and sign_scheme == "RSA_PSS": | |
hash_type = 'SHA2_384' | |
auth_type = 'RSA3072_PSS_SHA2_384' | |
else: | |
hash_type = '' | |
auth_type = '' | |
return auth_type, hash_type | |
# def single_sign_gen_pub_key(in_key, pub_key_file=None): | |
def gen_pub_key(in_key, pub_key=None): | |
keydata = single_sign_gen_pub_key(in_key, pub_key) | |
publickey = PUB_KEY_HDR() | |
publickey.KeySize = len(keydata) | |
publickey.KeyType = PUB_KEY_TYPE['RSA'] | |
key = bytearray(publickey) + keydata | |
if pub_key: | |
gen_file_from_object(pub_key, key) | |
return key | |
def decompress(in_file, out_file, tool_dir=''): | |
if not os.path.isfile(in_file): | |
raise Exception("Invalid input file '%s' !" % in_file) | |
# Remove the Lz Header | |
fi = open(in_file, 'rb') | |
di = bytearray(fi.read()) | |
fi.close() | |
lz_hdr = LZ_HEADER.from_buffer(di) | |
offset = sizeof(lz_hdr) | |
if lz_hdr.signature == b"LZDM" or lz_hdr.compressed_len == 0: | |
fo = open(out_file, 'wb') | |
fo.write(di[offset:offset + lz_hdr.compressed_len]) | |
fo.close() | |
return | |
temp = os.path.splitext(out_file)[0] + '.tmp' | |
if lz_hdr.signature == b"LZMA": | |
alg = "Lzma" | |
elif lz_hdr.signature == b"LZ4 ": | |
alg = "Lz4" | |
else: | |
raise Exception("Unsupported compression '%s' !" % lz_hdr.signature) | |
fo = open(temp, 'wb') | |
fo.write(di[offset:offset + lz_hdr.compressed_len]) | |
fo.close() | |
compress_tool = "%sCompress" % alg | |
if alg == "Lz4": | |
try: | |
cmdline = [ | |
os.path.join(tool_dir, compress_tool), | |
"-d", | |
"-o", out_file, | |
temp] | |
run_process(cmdline, False, True) | |
except Exception: | |
msg_string = "Could not find/use CompressLz4 tool, " \ | |
"trying with python lz4..." | |
print(msg_string) | |
try: | |
import lz4.block | |
if lz4.VERSION != '3.1.1': | |
msg_string = "Recommended lz4 module version " \ | |
"is '3.1.1'," + lz4.VERSION \ | |
+ " is currently installed." | |
print(msg_string) | |
except ImportError: | |
msg_string = "Could not import lz4, use " \ | |
"'python -m pip install lz4==3.1.1' " \ | |
"to install it." | |
print(msg_string) | |
exit(1) | |
decompress_data = lz4.block.decompress(get_file_data(temp)) | |
with open(out_file, "wb") as lz4bin: | |
lz4bin.write(decompress_data) | |
else: | |
cmdline = [ | |
os.path.join(tool_dir, compress_tool), | |
"-d", | |
"-o", out_file, | |
temp] | |
run_process(cmdline, False, True) | |
os.remove(temp) | |
def compress(in_file, alg, svn=0, out_path='', tool_dir=''): | |
if not os.path.isfile(in_file): | |
raise Exception("Invalid input file '%s' !" % in_file) | |
basename, ext = os.path.splitext(os.path.basename(in_file)) | |
if out_path: | |
if os.path.isdir(out_path): | |
out_file = os.path.join(out_path, basename + '.lz') | |
else: | |
out_file = os.path.join(out_path) | |
else: | |
out_file = os.path.splitext(in_file)[0] + '.lz' | |
if alg == "Lzma": | |
sig = "LZMA" | |
elif alg == "Tiano": | |
sig = "LZUF" | |
elif alg == "Lz4": | |
sig = "LZ4 " | |
elif alg == "Dummy": | |
sig = "LZDM" | |
else: | |
raise Exception("Unsupported compression '%s' !" % alg) | |
in_len = os.path.getsize(in_file) | |
if in_len > 0: | |
compress_tool = "%sCompress" % alg | |
if sig == "LZDM": | |
shutil.copy(in_file, out_file) | |
compress_data = get_file_data(out_file) | |
elif sig == "LZ4 ": | |
try: | |
cmdline = [ | |
os.path.join(tool_dir, compress_tool), | |
"-e", | |
"-o", out_file, | |
in_file] | |
run_process(cmdline, False, True) | |
compress_data = get_file_data(out_file) | |
except Exception: | |
msg_string = "Could not find/use CompressLz4 tool, " \ | |
"trying with python lz4..." | |
print(msg_string) | |
try: | |
import lz4.block | |
if lz4.VERSION != '3.1.1': | |
msg_string = "Recommended lz4 module version " \ | |
"is '3.1.1', " + lz4.VERSION \ | |
+ " is currently installed." | |
print(msg_string) | |
except ImportError: | |
msg_string = "Could not import lz4, use " \ | |
"'python -m pip install lz4==3.1.1' " \ | |
"to install it." | |
print(msg_string) | |
exit(1) | |
compress_data = lz4.block.compress( | |
get_file_data(in_file), | |
mode='high_compression') | |
elif sig == "LZMA": | |
cmdline = [ | |
os.path.join(tool_dir, compress_tool), | |
"-e", | |
"-o", out_file, | |
in_file] | |
run_process(cmdline, False, True) | |
compress_data = get_file_data(out_file) | |
else: | |
compress_data = bytearray() | |
lz_hdr = LZ_HEADER() | |
lz_hdr.signature = sig.encode() | |
lz_hdr.svn = svn | |
lz_hdr.compressed_len = len(compress_data) | |
lz_hdr.length = os.path.getsize(in_file) | |
data = bytearray() | |
data.extend(lz_hdr) | |
data.extend(compress_data) | |
gen_file_from_object(out_file, data) | |
return out_file |