blob: 1792208a30c33acdc3e4b1d385f08a3c596a9117 [file]
#!/usr/bin/env python3
import argparse
import base64
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date
import http
import io
import json
from pathlib import Path
import subprocess
import tempfile
import time
from uuid import uuid4
import zipfile
import alibabacloud_credentials as credentials
import alibabacloud_credentials.client
import alibabacloud_credentials.models
import alibabacloud_ecs20140526 as ecs
import alibabacloud_ecs20140526.client
import alibabacloud_ecs20140526.models
import alibabacloud_fc20230330 as fc
import alibabacloud_fc20230330.client
import alibabacloud_fc20230330.models
import alibabacloud_oss_v2 as oss
import alibabacloud_ram20150501 as ram
import alibabacloud_ram20150501.client
import alibabacloud_ram20150501.models
import alibabacloud_sts20150401 as sts
import alibabacloud_sts20150401.client
import alibabacloud_sts20150401.models
import alibabacloud_tea_openapi as openapi
import alibabacloud_tea_openapi.client
import alibabacloud_tea_openapi.models
import alibabacloud_tea_util as util
import alibabacloud_tea_util.client
import alibabacloud_tea_util.models
ECS_ENDPOINT = 'ecs.aliyuncs.com'
RAM_ENDPOINT = 'ram.aliyuncs.com'
STS_ENDPOINT = 'sts.aliyuncs.com'
FC_NODE_RUNTIME = 'nodejs20'
FC_MAX_ATTEMPTS = 5
FC_CONNECT_TIMEOUT_MS = 10000
FC_READ_TIMEOUT_MS = 60000
FC_TIMEOUT_SEC = 60
FC_MEMORY_SIZE_MB = 128
OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket'
OSS_BUCKET_NAME_LEN = 63
IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-'
# For regions in mainland China, the Chinese state censorship laws
# prohibit direct access to OSS bucket contents.
#
# We work around this restriction by creating a temporary Function
# Compute function in each region to access OSS via the internal OSS
# endpoints, which are not subject to these restrictions. Yes, this
# is somewhat absurd.
#
IPXE_CENSORSHIP_BYPASS_FUNCTION = f'''
const prefix = "{IPXE_STORAGE_PREFIX}";
''' + '''
const assert = require("node:assert");
const OSS = require("ali-oss");
exports.handler = async (event, context) => {
const payload = JSON.parse(event.toString());
console.log(JSON.stringify(payload));
const src = payload.source && new OSS({
region: "oss-" + payload.source.region,
bucket: payload.source.bucket,
accessKeyId: context.credentials.accessKeyId,
accessKeySecret: context.credentials.accessKeySecret,
stsToken: context.credentials.securityToken,
});
const dst = new OSS({
region: "oss-" + context.region,
internal: true,
bucket: payload.bucket,
accessKeyId: context.credentials.accessKeyId,
accessKeySecret: context.credentials.accessKeySecret,
stsToken: context.credentials.securityToken,
});
const add = payload.keys || [];
const del = ((await dst.listV2({prefix: prefix})).objects || [])
.map(x => x.name).filter(x => ! add.includes(x));
assert(add.every(x => x.startsWith(prefix)));
assert(del.every(x => x.startsWith(prefix)));
if (add.length)
console.log("Creating: " + add.sort().join(", "));
if (del.length)
console.log("Deleting: " + del.sort().join(", "));
await Promise.all([
...add.map(async (x) => dst.putStream(x, (await src.getStream(x)).stream)),
...(del.length ? [dst.deleteMulti(del)] : []),
]);
};
'''
Clients = namedtuple('Clients', ['region', 'ecs', 'fc', 'oss'])
Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode'])
def image(filename, basefamily, basename):
"""Construct image description"""
with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc:
mtoolsrc.writelines([
'drive D:', f'file="{filename}"',
'drive P:', f'file="{filename}"', 'partition=4',
])
mtoolsrc.flush()
mdir = subprocess.run(['mdir', '-b', 'D:/EFI/BOOT', 'P:/EFI/BOOT'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=False, env={'MTOOLSRC': mtoolsrc.name})
mapping = {
b'BOOTX64.EFI': 'x86_64',
b'BOOTAA64.EFI': 'arm64',
}
uefi = [v for k, v in mapping.items() if k in mdir.stdout]
suffix = ('-uefi-%s' % uefi[0].replace('_', '-') if len(uefi) == 1 else
'-uefi-multi' if uefi else '')
path = Path(filename)
family = '%s%s' % (basefamily, suffix)
name = '%s%s' % (basename, suffix)
arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64'
mode = 'UEFI' if uefi else 'BIOS'
return Image(path, family, name, arch, mode)
def all_regions():
"""Get list of all regions"""
cred = credentials.client.Client()
conf = openapi.models.Config(credential=cred, endpoint=ECS_ENDPOINT)
client = ecs.client.Client(conf)
req = ecs.models.DescribeRegionsRequest()
rsp = client.describe_regions(req)
regions = sorted(x.region_id for x in rsp.body.regions.region)
return regions
def account_id():
"""Get account ID"""
cred = credentials.client.Client()
conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT)
client = sts.client.Client(conf)
rsp = client.get_caller_identity()
return rsp.body.account_id
def role_arn(name):
"""Get role resource name"""
cred = credentials.client.Client()
conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT)
client = ram.client.Client(conf)
req = ram.models.GetRoleRequest(role_name=name)
rsp = client.get_role(req)
return rsp.body.role.arn
def all_clients(region, account):
"""Construct all per-region clients"""
cred = credentials.client.Client()
ecsconf = openapi.models.Config(credential=cred, region_id=region)
fcep = '%s.%s.fc.aliyuncs.com' % (account, region)
fcconf = openapi.models.Config(credential=cred, endpoint=fcep)
osscred = oss.credentials.EnvironmentVariableCredentialsProvider()
ossconf = oss.config.Config(credentials_provider=osscred, region=region)
clients = Clients(
region=region,
ecs=ecs.client.Client(ecsconf),
fc=fc.client.Client(fcconf),
oss=oss.client.Client(ossconf),
)
return clients
def delete_temp_function(clients, func):
"""Remove temporary function"""
assert func.startswith(IPXE_STORAGE_PREFIX)
clients.fc.delete_function(func)
def create_temp_function(clients, role):
"""Create temporary function (and remove any stale temporary functions)"""
req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX)
try:
rsp = clients.fc.list_functions(req)
except openapi.client.UnretryableException:
# AliCloud provides no other way to detect non-functional regions
return None
funcs = [x.function_name for x in rsp.body.functions or ()]
for func in funcs:
delete_temp_function(clients, func)
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zfh:
zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION)
zf = base64.b64encode(buf.getvalue()).decode()
code = fc.models.InputCodeLocation(zip_file=zf)
func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
body = fc.models.CreateFunctionInput(
code=code,
function_name=func,
handler='index.handler',
memory_size=FC_MEMORY_SIZE_MB,
role=role,
runtime=FC_NODE_RUNTIME,
timeout=FC_TIMEOUT_SEC,
)
req = fc.models.CreateFunctionRequest(body=body)
rsp = clients.fc.create_function(req)
return func
def call_temp_function(clients, func, payload):
"""Call temporary function"""
hdr = fc.models.InvokeFunctionHeaders(
x_fc_invocation_type='Sync',
x_fc_log_type='Tail',
)
body = json.dumps(payload)
req = fc.models.InvokeFunctionRequest(body=body)
run = util.models.RuntimeOptions(
autoretry=True,
max_attempts=FC_MAX_ATTEMPTS,
connect_timeout=FC_CONNECT_TIMEOUT_MS,
read_timeout=FC_READ_TIMEOUT_MS,
)
rsp = clients.fc.invoke_function_with_options(func, req, hdr, run)
log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode()
if rsp.status_code != http.HTTPStatus.OK:
raise RuntimeError(rsp)
if 'x-fc-error-type' in rsp.headers:
raise RuntimeError(log)
def delete_temp_bucket(clients, func, bucket):
"""Remove temporary bucket"""
assert bucket.startswith(IPXE_STORAGE_PREFIX)
payload = {'bucket': bucket}
call_temp_function(clients, func, payload)
req = oss.models.DeleteBucketRequest(bucket=bucket)
clients.oss.delete_bucket(req)
def create_temp_bucket(clients, func):
"""Create temporary bucket (and remove any stale temporary buckets)"""
prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region)
req = oss.models.ListBucketsRequest(prefix=prefix)
rsp = clients.oss.list_buckets(req)
buckets = [x.name for x in rsp.buckets or ()]
for bucket in buckets:
delete_temp_bucket(clients, func, bucket)
bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN]
req = oss.models.PutBucketRequest(bucket=bucket)
try:
rsp = clients.oss.put_bucket(req)
except oss.exceptions.OperationError as exc:
# AliCloud provides no other way to detect non-functional regions
if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE:
return None
raise exc
return bucket
def upload_image(clients, bucket, image):
"""Upload disk image to uncensored bucket"""
key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
req = oss.models.PutObjectRequest(bucket=bucket, key=key)
rsp = clients.oss.put_object_from_file(req, image.path)
return key
def copy_images(clients, func, bucket, source, keys):
"""Copy disk images to bucket from uncensored bucket"""
payload = {
'bucket': bucket,
'source': source,
'keys': keys,
}
call_temp_function(clients, func, payload)
def delete_images(clients, name):
"""Remove existing images"""
req = ecs.models.DescribeImagesRequest(
region_id=clients.region,
image_name=name,
image_owner_alias='self',
)
rsp = clients.ecs.describe_images(req)
for image in rsp.body.images.image or ():
req = ecs.models.DeleteImageRequest(
region_id=clients.region,
image_id=image.image_id
)
rsp = clients.ecs.delete_image(req)
def import_image(clients, image, bucket, key, public, overwrite):
"""Import image"""
if overwrite:
delete_images(clients, image.name)
disk = ecs.models.ImportImageRequestDiskDeviceMapping(
disk_image_size = 1,
format = 'RAW',
ossbucket = bucket,
ossobject = key,
)
req = ecs.models.ImportImageRequest(
region_id=clients.region,
image_name=image.name,
architecture=image.arch,
boot_mode=image.mode,
disk_device_mapping=[disk],
)
rsp = clients.ecs.import_image(req)
image_id = rsp.body.image_id
task_id = rsp.body.task_id
while True:
time.sleep(5)
req = ecs.models.DescribeTasksRequest(
region_id=clients.region,
task_ids=task_id,
)
rsp = clients.ecs.describe_tasks(req)
status = rsp.body.task_set.task[0].task_status
if status not in ('Waiting', 'Processing'):
break
if status != 'Finished':
raise RuntimeError(status)
req = ecs.models.ModifyImageAttributeRequest(
region_id=clients.region,
image_id=image_id,
image_family=image.family,
)
rsp = clients.ecs.modify_image_attribute(req)
if public:
req = ecs.models.ModifyImageSharePermissionRequest(
region_id=clients.region,
image_id=image_id,
is_public=True,
)
rsp = clients.ecs.modify_image_share_permission(req)
return image_id
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Import Alibaba Cloud image")
parser.add_argument('--name', '-n',
help="Base image name")
parser.add_argument('--family', '-f', default='ipxe',
help="Base family name")
parser.add_argument('--public', '-p', action='store_true',
help="Make image public")
parser.add_argument('--overwrite', action='store_true',
help="Overwrite any existing image with same name")
parser.add_argument('--region', '-r', action='append',
help="AliCloud region(s)")
parser.add_argument('--fc-role', '-F', default="AliyunFcDefaultRole",
help="AliCloud role for censorship bypass function")
parser.add_argument('image', nargs='+', help="iPXE disk image")
args = parser.parse_args()
# Use default name if none specified
if not args.name:
args.name = '%s-%s' % (args.family, date.today().strftime('%Y%m%d'))
# Construct image list
images = [image(x, args.family, args.name) for x in args.image]
# Use all regions if none specified
if not args.region:
args.region = all_regions()
# Look up resource names
fcrole = role_arn(args.fc_role)
# Construct per-region clients
account = account_id()
clients = {region: all_clients(region, account) for region in args.region}
# Create temporary functions in each region
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(create_temp_function,
clients=clients[region],
role=fcrole): region
for region in args.region}
funcs = {futures[x]: x.result() for x in as_completed(futures)}
# Create temporary buckets in each region (requires function to exist)
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(create_temp_bucket,
clients=clients[region],
func=funcs[region]): region
for region in args.region
if funcs[region] is not None}
buckets = {futures[x]: x.result() for x in as_completed(futures)}
# Select an uncensored region with functioning object storage
uncensored = next((k for k, v in buckets.items()
if v is not None and not k.startswith('cn-')), None)
if uncensored is None:
parser.error("At least one available uncensored region is required")
# Upload images directly to chosen uncensored region
with ThreadPoolExecutor(max_workers=len(images)) as executor:
futures = {executor.submit(upload_image,
clients=clients[uncensored],
bucket=buckets[uncensored],
image=image): image
for image in images}
keys = {futures[x]: x.result() for x in as_completed(futures)}
# Copy images to all other regions
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
source = {'region': uncensored, 'bucket': buckets[uncensored]}
futures = {executor.submit(copy_images,
clients=clients[region],
func=funcs[region],
bucket=buckets[region],
source=source,
keys=list(keys.values())): region
for region in args.region
if funcs[region] is not None and buckets[region] is not None
and region != uncensored}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Import all images
imports = [(region, image) for region in args.region for image in images]
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
futures = {executor.submit(import_image,
clients=clients[region],
image=image,
bucket=buckets[region],
key=keys[image],
public=args.public,
overwrite=args.overwrite): (region, image)
for region, image in imports
if funcs[region] is not None and buckets[region] is not None}
results = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary buckets
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(delete_temp_bucket,
clients=clients[region],
func=funcs[region],
bucket=buckets[region]): region
for region in args.region
if funcs[region] is not None and buckets[region] is not None}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary functions
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(delete_temp_function,
clients=clients[region],
func=funcs[region]): region
for region in args.region
if funcs[region] is not None}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Show created images
for region, image in imports:
mark = "(*)" if region == uncensored else ""
result = ("[no FC]" if funcs[region] is None else
"[no OSS]" if buckets[region] is None else
results[(region, image)])
print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result))