| #!/usr/bin/env python3 |
| # |
| # Doing anything in Alibaba Cloud is unnecessarily difficult and |
| # tedious due to a combination of poor and inconsistent API design, |
| # high API call failure rates, and Chinese state censorship laws. |
| # |
| # We resort to a mixture of strategies to get images imported to all |
| # regions: |
| # |
| # - For regions with working OSS that are not blocked by Chinese |
| # state censorship laws, upload the image files to an OSS bucket |
| # and then import the images. |
| # |
| # - For regions with working OSS that are blocked by Chinese state |
| # censorship laws but that have working FC, use a temporary FC |
| # function to copy the image files from the uncensored OSS buckets |
| # and then import the images. Attempt downloads from a variety of |
| # uncensored buckets, since cross-region OSS traffic tends to |
| # experience a failure rate of around 10% of requests. |
| # |
| # - For regions that have working OSS but are blocked by Chinese |
| # state censorship laws and do not have working FC, or for regions |
| # that don't even have working OSS, resort to using CopyImage to |
| # copy the previously imported images from another region. Spread |
| # the imports across as many source regions as possible to |
| # minimise the effect of the CopyImage rate limiting. |
| |
| import argparse |
| import base64 |
| from collections import namedtuple |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import datetime |
| import http |
| import io |
| from itertools import cycle, groupby |
| import json |
| import logging |
| from operator import itemgetter |
| from pathlib import Path |
| import random |
| import subprocess |
| import tempfile |
| import time |
| from uuid import uuid4 |
| import zipfile |
| |
| import alibabacloud_credentials as credentials |
| import alibabacloud_credentials.client |
| import alibabacloud_credentials.models |
| import alibabacloud_ecs20140526 as ecs |
| import alibabacloud_ecs20140526.client |
| import alibabacloud_ecs20140526.models |
| import alibabacloud_fc20230330 as fc |
| import alibabacloud_fc20230330.client |
| import alibabacloud_fc20230330.models |
| import alibabacloud_oss_v2 as oss |
| import alibabacloud_ram20150501 as ram |
| import alibabacloud_ram20150501.client |
| import alibabacloud_ram20150501.models |
| import alibabacloud_sts20150401 as sts |
| import alibabacloud_sts20150401.client |
| import alibabacloud_sts20150401.models |
| import alibabacloud_tea_openapi as openapi |
| import alibabacloud_tea_openapi.client |
| import alibabacloud_tea_openapi.models |
| import alibabacloud_tea_util as util |
| import alibabacloud_tea_util.client |
| import alibabacloud_tea_util.models |
| |
| logger = logging.getLogger('ali-import') |
| |
| ECS_ENDPOINT = 'ecs.aliyuncs.com' |
| RAM_ENDPOINT = 'ram.aliyuncs.com' |
| STS_ENDPOINT = 'sts.aliyuncs.com' |
| |
| FC_NODE_RUNTIME = 'nodejs20' |
| FC_TIMEOUT_SEC = 120 |
| FC_MEMORY_SIZE_MB = 128 |
| FC_SOURCE_COUNT = 10 |
| |
| OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket' |
| OSS_BUCKET_NAME_LEN = 63 |
| |
| IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-' |
| IPXE_SNAPSHOT_DELETE_TAG = 'ipxe-snapshot-delete' |
| |
| POLL_INTERVAL_SEC = 5 |
| POLL_MAX_RETRIES = 100 |
| |
| # Experimentation suggests Alibaba Cloud API calls are extremely |
| # unreliable, with a failure rate around 1%. It is therefore |
| # necessary to allow for retrying basically every API call. |
| # |
| # Some API calls (e.g. DescribeImages or ModifyImageAttribute) are |
| # naturally idempotent and so safe to retry. Some non-idempotent API |
| # calls (e.g. CopyImage) support explicit idempotence tokens. The |
| # remaining API calls may simply fail on a retry, if the original |
| # request happened to succeed but failed to return a response. |
| # |
| # We could write convoluted retry logic around the non-idempotent |
| # calls, but this would substantially increase the complexity of the |
| # already unnecessarily complex code. For now, we assume that |
| # retrying non-idempotent requests is probably more likely to fix |
| # transient failures than to cause additional problems. |
| # |
| RUNTIME_OPTS = util.models.RuntimeOptions( |
| autoretry=True, |
| max_attempts=5, |
| connect_timeout=10000, |
| read_timeout=120000, |
| ) |
| |
| # For regions in mainland China, the Chinese state censorship laws |
| # prohibit direct access to OSS bucket contents. |
| # |
| # We work around this restriction by creating a temporary Function |
| # Compute function in each region to access OSS via the internal OSS |
| # endpoints, which are not subject to these restrictions. Yes, this |
| # is somewhat absurd. |
| # |
| IPXE_CENSORSHIP_BYPASS_FUNCTION = f''' |
| const prefix = "{IPXE_STORAGE_PREFIX}"; |
| ''' + ''' |
| const assert = require("node:assert"); |
| const OSS = require("ali-oss"); |
| exports.handler = async (event, context) => { |
| const payload = JSON.parse(event.toString()); |
| console.log(JSON.stringify(payload)); |
| const sources = payload.sources || {}; |
| const dest = new OSS({ |
| region: "oss-" + context.region, |
| internal: true, |
| bucket: payload.bucket, |
| accessKeyId: context.credentials.accessKeyId, |
| accessKeySecret: context.credentials.accessKeySecret, |
| stsToken: context.credentials.securityToken, |
| }); |
| const current = ((await dest.listV2({prefix: prefix})).objects || []) |
| .map(x => x.name); |
| const wanted = Object.keys(sources); |
| const add = wanted.filter(x => ! current.includes(x)); |
| const del = current.filter(x => ! wanted.includes(x)); |
| assert(add.every(x => x.startsWith(prefix))); |
| assert(del.every(x => x.startsWith(prefix))); |
| if (add.length) |
| console.log("Creating: " + add.sort().join(", ")); |
| if (del.length) |
| console.log("Deleting: " + del.sort().join(", ")); |
| const copy = async (key) => { |
| for (const url of sources[key]) { |
| console.log("Downloading " + key + " from " + url); |
| try { |
| const download = await fetch(url, {signal: AbortSignal.timeout(15000)}); |
| if (! download.ok) |
| throw new Error(download.status); |
| const content = await download.arrayBuffer(); |
| console.log("Downloaded " + key); |
| console.log("Uploading " + key); |
| await dest.put(key, Buffer.from(content)); |
| console.log("Uploaded " + key); |
| return; |
| } catch (err) { |
| console.error("Download failed", err); |
| } |
| } |
| throw new Error("All downloads failed for " + key); |
| }; |
| await Promise.all([ |
| ...add.map(copy), |
| ...(del.length ? [dest.deleteMulti(del)] : []), |
| ]); |
| console.log("Finished"); |
| }; |
| ''' |
| |
| Clients = namedtuple('Clients', ['region', 'censored', 'ecs', 'fc', 'oss']) |
| Image = namedtuple('Image', |
| ['path', 'family', 'name', 'arch', 'mode', 'key', 'public']) |
| |
| def image(filename, basefamily, basename, public): |
| """Construct image description""" |
| with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc: |
| mtoolsrc.writelines([ |
| 'drive D:', f'file="{filename}"', |
| 'drive P:', f'file="{filename}"', 'partition=4', |
| ]) |
| mtoolsrc.flush() |
| mdir = subprocess.run(['mdir', '-b', 'D:/EFI/BOOT', 'P:/EFI/BOOT'], |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| check=False, env={'MTOOLSRC': mtoolsrc.name}) |
| mapping = { |
| b'BOOTX64.EFI': 'x86_64', |
| b'BOOTAA64.EFI': 'arm64', |
| } |
| uefi = [v for k, v in mapping.items() if k in mdir.stdout] |
| suffix = ('-uefi-%s' % uefi[0].replace('_', '-') if len(uefi) == 1 else |
| '-uefi-multi' if uefi else '') |
| path = Path(filename) |
| family = '%s%s' % (basefamily, suffix) |
| name = '%s%s' % (basename, suffix) |
| arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64' |
| mode = 'UEFI' if uefi else 'BIOS' |
| key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) |
| return Image(path, family, name, arch, mode, key, public) |
| |
| def all_regions(): |
| """Get list of all regions""" |
| cred = credentials.client.Client() |
| conf = openapi.models.Config(credential=cred, endpoint=ECS_ENDPOINT) |
| client = ecs.client.Client(conf) |
| req = ecs.models.DescribeRegionsRequest() |
| rsp = client.describe_regions(req) |
| regions = sorted(x.region_id for x in rsp.body.regions.region) |
| return regions |
| |
| def account_id(): |
| """Get account ID""" |
| cred = credentials.client.Client() |
| conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT) |
| client = sts.client.Client(conf) |
| rsp = client.get_caller_identity() |
| return rsp.body.account_id |
| |
| def role_arn(name): |
| """Get role resource name""" |
| cred = credentials.client.Client() |
| conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT) |
| client = ram.client.Client(conf) |
| req = ram.models.GetRoleRequest(role_name=name) |
| rsp = client.get_role(req) |
| return rsp.body.role.arn |
| |
| def all_clients(region, account): |
| """Construct all per-region clients""" |
| cred = credentials.client.Client() |
| ecsconf = openapi.models.Config(credential=cred, region_id=region) |
| fcep = '%s.%s.fc.aliyuncs.com' % (account, region) |
| fcconf = openapi.models.Config(credential=cred, endpoint=fcep) |
| osscred = oss.credentials.EnvironmentVariableCredentialsProvider() |
| ossconf = oss.config.Config(credentials_provider=osscred, region=region) |
| clients = Clients( |
| region=region, |
| censored=region.startswith('cn-'), |
| ecs=ecs.client.Client(ecsconf), |
| fc=fc.client.Client(fcconf), |
| oss=oss.client.Client(ossconf), |
| ) |
| return clients |
| |
| def delete_temp_function(clients, func): |
| """Remove temporary function""" |
| logger.info("delete function %s %s" % (clients.region, func)) |
| assert func.startswith(IPXE_STORAGE_PREFIX) |
| clients.fc.delete_function_with_options(func, {}, RUNTIME_OPTS) |
| |
| def create_temp_function(clients, role): |
| """Create temporary function (and remove any stale temporary functions)""" |
| req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX) |
| try: |
| rsp = clients.fc.list_functions_with_options(req, {}, RUNTIME_OPTS) |
| except openapi.client.UnretryableException: |
| # AliCloud provides no other way to detect non-working regions |
| return None |
| funcs = [x.function_name for x in rsp.body.functions or ()] |
| for func in funcs: |
| delete_temp_function(clients, func) |
| if not clients.censored: |
| # Functions are not required in uncensored regions |
| return None |
| buf = io.BytesIO() |
| with zipfile.ZipFile(buf, 'w') as zfh: |
| zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION) |
| zf = base64.b64encode(buf.getvalue()).decode() |
| code = fc.models.InputCodeLocation(zip_file=zf) |
| func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) |
| body = fc.models.CreateFunctionInput( |
| code=code, |
| function_name=func, |
| handler='index.handler', |
| memory_size=FC_MEMORY_SIZE_MB, |
| role=role, |
| runtime=FC_NODE_RUNTIME, |
| timeout=FC_TIMEOUT_SEC, |
| ) |
| req = fc.models.CreateFunctionRequest(body=body) |
| rsp = clients.fc.create_function_with_options(req, {}, RUNTIME_OPTS) |
| logger.info("create function %s %s" % (clients.region, func)) |
| return func |
| |
| def call_temp_function(clients, func, payload): |
| """Call temporary function""" |
| hdr = fc.models.InvokeFunctionHeaders( |
| x_fc_invocation_type='Sync', |
| x_fc_log_type='Tail', |
| ) |
| body = json.dumps(payload) |
| req = fc.models.InvokeFunctionRequest(body=body) |
| rsp = clients.fc.invoke_function_with_options(func, req, hdr, RUNTIME_OPTS) |
| log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode() |
| if rsp.status_code != http.HTTPStatus.OK: |
| raise RuntimeError(rsp) |
| if 'x-fc-error-type' in rsp.headers: |
| raise RuntimeError(log) |
| |
| def delete_temp_bucket(clients, func, bucket): |
| """Remove temporary bucket""" |
| logger.info("delete bucket %s %s" % (clients.region, bucket)) |
| assert bucket.startswith(IPXE_STORAGE_PREFIX) |
| # Delete bucket contents |
| if not clients.censored: |
| # Uncensored region: use OSS API calls to delete contents |
| req = oss.models.ListObjectsV2Request( |
| bucket=bucket, |
| prefix=IPXE_STORAGE_PREFIX, |
| ) |
| rsp = clients.oss.list_objects_v2(req) |
| delete = [x.key for x in rsp.contents or ()] |
| if delete: |
| req = oss.models.DeleteMultipleObjectsRequest( |
| bucket=bucket, |
| objects=[oss.models.DeleteObject(x) for x in delete], |
| ) |
| rsp = clients.oss.delete_multiple_objects(req) |
| elif func: |
| # Censored region with FC: use function to delete contents |
| payload = {'bucket': bucket} |
| call_temp_function(clients, func, payload) |
| else: |
| # Censored region without FC: assume bucket must be empty, |
| # since we could not have uploaded to it in the first place |
| pass |
| # Delete the now-empty bucket |
| req = oss.models.DeleteBucketRequest(bucket=bucket) |
| rsp = clients.oss.delete_bucket(req) |
| |
| def create_temp_bucket(clients, func): |
| """Create temporary bucket (and remove any stale temporary buckets)""" |
| prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region) |
| req = oss.models.ListBucketsRequest(prefix=prefix) |
| rsp = clients.oss.list_buckets(req) |
| buckets = [x.name for x in rsp.buckets or ()] |
| for bucket in buckets: |
| delete_temp_bucket(clients, func, bucket) |
| if clients.censored and not func: |
| # We cannot use OSS in censored regions with no Function Compute |
| return None |
| bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN] |
| req = oss.models.PutBucketRequest(bucket=bucket) |
| try: |
| rsp = clients.oss.put_bucket(req) |
| except oss.exceptions.OperationError as exc: |
| # AliCloud provides no other way to detect non-working regions |
| if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE: |
| return None |
| raise exc |
| logger.info("create bucket %s %s" % (clients.region, bucket)) |
| return bucket |
| |
| def upload_object(clients, bucket, image): |
| """Upload disk image object to uncensored bucket""" |
| logger.info("upload %s %s" % (clients.region, image.name)) |
| req = oss.models.PutObjectRequest(bucket=bucket, key=image.key) |
| rsp = clients.oss.put_object_from_file(req, image.path) |
| req = oss.models.GetObjectRequest(bucket=bucket, key=image.key) |
| rsp = clients.oss.presign(req) |
| return rsp.url |
| |
| def copy_objects(clients, bucket, func, uploads): |
| """Copy disk image objects to censored bucket from uncensored bucket""" |
| logger.info("upload %s (censored)" % clients.region) |
| payload = { |
| 'bucket': bucket, |
| 'sources': { |
| key: random.choices([url for key, url in urls], k=FC_SOURCE_COUNT) |
| for key, urls in groupby(sorted( |
| ((image.key, url) for (region, image), url in uploads.items()) |
| ), key=itemgetter(0)) |
| } |
| } |
| call_temp_function(clients, func, payload) |
| |
| def delete_image(clients, name): |
| """Remove existing image (if applicable)""" |
| req = ecs.models.DescribeImagesRequest( |
| region_id=clients.region, |
| image_name=name, |
| image_owner_alias='self', |
| ) |
| rsp = clients.ecs.describe_images_with_options(req, RUNTIME_OPTS) |
| for image in rsp.body.images.image or (): |
| logger.info("delete image %s %s (%s)" % |
| (clients.region, image.image_name, image.image_id)) |
| # Unpublish image |
| if image.is_public: |
| req = ecs.models.ModifyImageSharePermissionRequest( |
| region_id=clients.region, |
| image_id=image.image_id, |
| is_public=False, |
| ) |
| rsp = clients.ecs.modify_image_share_permission_with_options( |
| req, RUNTIME_OPTS |
| ) |
| # Tag associated snapshots for deletion |
| for disk in image.disk_device_mappings.disk_device_mapping or (): |
| snapshot_id = disk.snapshot_id |
| tag = ecs.models.TagResourcesRequestTag( |
| key=IPXE_SNAPSHOT_DELETE_TAG, |
| value=IPXE_SNAPSHOT_DELETE_TAG, |
| ) |
| req = ecs.models.TagResourcesRequest( |
| region_id=clients.region, |
| resource_type='snapshot', |
| resource_id=[snapshot_id], |
| tag=[tag], |
| ) |
| rsp = clients.ecs.tag_resources_with_options(req, RUNTIME_OPTS) |
| # Delete image |
| req = ecs.models.DeleteImageRequest( |
| region_id=clients.region, |
| image_id=image.image_id |
| ) |
| rsp = clients.ecs.delete_image_with_options(req, RUNTIME_OPTS) |
| |
| def delete_snapshots(clients): |
| """Remove stale snapshots left behind by deleted images""" |
| tag = ecs.models.ListTagResourcesRequestTag( |
| key=IPXE_SNAPSHOT_DELETE_TAG, |
| value=IPXE_SNAPSHOT_DELETE_TAG, |
| ) |
| req = ecs.models.ListTagResourcesRequest( |
| region_id=clients.region, |
| resource_type='snapshot', |
| tag=[tag], |
| ) |
| rsp = clients.ecs.list_tag_resources_with_options(req, RUNTIME_OPTS) |
| for snapshot in rsp.body.tag_resources.tag_resource or (): |
| logger.info("delete snapshot %s %s" % |
| (clients.region, snapshot.resource_id)) |
| req = ecs.models.DeleteSnapshotRequest( |
| snapshot_id=snapshot.resource_id, |
| force=True, |
| ) |
| rsp = clients.ecs.delete_snapshot_with_options(req, RUNTIME_OPTS) |
| |
| def wait_for_task(clients, task_id): |
| """Wait for task to complete""" |
| status = 'Unknowable' |
| for i in range(POLL_MAX_RETRIES): |
| time.sleep(POLL_INTERVAL_SEC) |
| req = ecs.models.DescribeTasksRequest( |
| region_id=clients.region, |
| task_ids=task_id, |
| ) |
| try: |
| rsp = clients.ecs.describe_tasks_with_options(req, RUNTIME_OPTS) |
| except openapi.client.UnretryableException: |
| continue |
| assert len(rsp.body.task_set.task) == 1 |
| assert rsp.body.task_set.task[0].task_id == task_id |
| status = rsp.body.task_set.task[0].task_status |
| if status not in ('Waiting', 'Processing'): |
| break |
| if status != 'Finished': |
| raise RuntimeError(status) |
| |
| def wait_for_image(clients, image_id): |
| """Wait for image to become available""" |
| status = 'Unknowable' |
| for i in range(POLL_MAX_RETRIES): |
| time.sleep(POLL_INTERVAL_SEC) |
| req = ecs.models.DescribeImagesRequest( |
| region_id=clients.region, |
| image_id=image_id, |
| ) |
| try: |
| rsp = clients.ecs.describe_images_with_options(req, RUNTIME_OPTS) |
| except openapi.client.UnretryableException: |
| continue |
| if len(rsp.body.images.image): |
| assert len(rsp.body.images.image) == 1 |
| assert rsp.body.images.image[0].image_id == image_id |
| status = rsp.body.images.image[0].status |
| if status != 'Creating': |
| break |
| if status != 'Available': |
| raise RuntimeError(status) |
| |
| def import_image(clients, image, bucket): |
| """Import image""" |
| logger.info("import %s %s" % (clients.region, image.name)) |
| disk = ecs.models.ImportImageRequestDiskDeviceMapping( |
| disk_image_size = 1, |
| format = 'RAW', |
| ossbucket = bucket, |
| ossobject = image.key, |
| ) |
| features = ecs.models.ImportImageRequestFeatures( |
| imds_support='v1', |
| nvme_support='supported', |
| ) |
| req = ecs.models.ImportImageRequest( |
| region_id=clients.region, |
| image_name=image.name, |
| architecture=image.arch, |
| boot_mode=image.mode, |
| disk_device_mapping=[disk], |
| features=features, |
| client_token=str(uuid4()), |
| ) |
| rsp = clients.ecs.import_image_with_options(req, RUNTIME_OPTS) |
| image_id = rsp.body.image_id |
| task_id = rsp.body.task_id |
| wait_for_task(clients, task_id) |
| wait_for_image(clients, image_id) |
| logger.info("image %s %s (%s)" % |
| (clients.region, image.name, image_id)) |
| return image_id |
| |
| def copy_image(clients, image, image_id, censored): |
| """Copy imported image to censored region""" |
| logger.info("import %s %s via %s" % |
| (censored.region, image.name, clients.region)) |
| req = ecs.models.CopyImageRequest( |
| region_id=clients.region, |
| image_id=image_id, |
| destination_region_id=censored.region, |
| destination_image_name=image.name, |
| client_token=str(uuid4()), |
| ) |
| rsp = clients.ecs.copy_image_with_options(req, RUNTIME_OPTS) |
| copy_id = rsp.body.image_id |
| wait_for_image(censored, copy_id) |
| logger.info("image %s %s (%s)" % (censored.region, image.name, copy_id)) |
| return copy_id |
| |
| def finalise_image(clients, image, image_id): |
| """Finalise image attributes and permissions""" |
| logger.info("finalise %s %s (%s)" % (clients.region, image.name, image_id)) |
| req = ecs.models.ModifyImageAttributeRequest( |
| region_id=clients.region, |
| image_id=image_id, |
| image_family=image.family, |
| ) |
| rsp = clients.ecs.modify_image_attribute_with_options(req, RUNTIME_OPTS) |
| if image.public: |
| req = ecs.models.ModifyImageSharePermissionRequest( |
| region_id=clients.region, |
| image_id=image_id, |
| is_public=True, |
| ) |
| rsp = clients.ecs.modify_image_share_permission_with_options( |
| req, RUNTIME_OPTS |
| ) |
| |
| # Parse command-line arguments |
| parser = argparse.ArgumentParser(description="Import Alibaba Cloud image") |
| parser.add_argument('--verbose', '-v', action='count', default=0) |
| parser.add_argument('--name', '-n', |
| help="Base image name") |
| parser.add_argument('--family', '-f', default='ipxe', |
| help="Base family name") |
| parser.add_argument('--public', '-p', action='store_true', |
| help="Make image(s) public") |
| parser.add_argument('--overwrite', action='store_true', |
| help="Overwrite any existing image with same name") |
| parser.add_argument('--region', '-r', action='append', |
| help="AliCloud region(s)") |
| parser.add_argument('--role', '-R', default="AliyunFcDefaultRole", |
| help="AliCloud role for censorship bypass function") |
| parser.add_argument('image', nargs='+', help="iPXE disk image") |
| args = parser.parse_args() |
| |
| # Configure logging |
| loglevels = [logging.WARNING, logging.INFO, logging.DEBUG] |
| verbosity = min(args.verbose, (len(loglevels) - 1)) |
| logging.basicConfig(level=loglevels[verbosity]) |
| logging.getLogger('apscheduler').setLevel(logging.WARNING) |
| |
| # Use default name if none specified |
| if not args.name: |
| args.name = '%s-%s' % (args.family, |
| datetime.date.today().strftime('%Y%m%d')) |
| |
| # Use all regions if none specified |
| regions = args.region or all_regions() |
| |
| # Construct image list |
| images = [image(x, args.family, args.name, args.public) for x in args.image] |
| imports = [(region, image) for region in regions for image in images] |
| workers = len(imports) |
| |
| # Look up resource names |
| fcrole = role_arn(args.role) |
| |
| # Construct per-region clients |
| account = account_id() |
| clients = {region: all_clients(region, account) for region in regions} |
| |
| # Delete existing images from all regions, if applicable |
| if args.overwrite: |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(delete_image, |
| clients=clients[region], |
| name=image.name): (region, image) |
| for region, image in imports} |
| done = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Delete any stale snapshots from all regions |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(delete_snapshots, |
| clients=clients[region]): region |
| for region in regions} |
| done = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Create temporary function in each censored region with usable FC |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(create_temp_function, |
| clients=clients[region], |
| role=fcrole): region |
| for region in regions} |
| funcs = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Create temporary bucket in each region with usable OSS |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(create_temp_bucket, |
| clients=clients[region], |
| func=funcs[region]): region |
| for region in regions} |
| buckets = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Upload image objects directly to each uncensored region with usable OSS |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(upload_object, |
| clients=clients[region], |
| bucket=buckets[region], |
| image=image): (region, image) |
| for region, image in imports |
| if buckets[region] and not funcs[region]} |
| uploads = {futures[x]: x.result() for x in as_completed(futures)} |
| if not uploads: |
| parser.error("At least one working non-Chinese region is required") |
| |
| # Copy image objects to each censored region with usable OSS and usable FC |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(copy_objects, |
| clients=clients[region], |
| bucket=buckets[region], |
| func=funcs[region], |
| uploads=uploads): region |
| for region in regions |
| if buckets[region] and funcs[region]} |
| done = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Import images in each region with usable OSS |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(import_image, |
| clients=clients[region], |
| image=image, |
| bucket=buckets[region]): (region, image) |
| for region, image in imports if buckets[region]} |
| results = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Copy images to regions without usable OSS |
| # |
| # Copies are rate-limited by source region, so spread the copies |
| # across all available regions with imported images. |
| # |
| copies = [(region, censored, image) for region, (censored, image) in zip( |
| cycle(region for region in regions if buckets[region]), |
| ((region, image) for region, image in imports if not buckets[region]), |
| )] |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(copy_image, |
| clients=clients[region], |
| censored=clients[censored], |
| image=image, |
| image_id=results[(region, image)]): |
| (censored, image) |
| for region, censored, image in copies} |
| copied = {futures[x]: x.result() for x in as_completed(futures)} |
| results.update(copied) |
| |
| # Finalise images |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(finalise_image, |
| clients=clients[region], |
| image=image, |
| image_id=results[(region, image)]): |
| (region, image) |
| for region, image in imports} |
| done = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Remove temporary buckets |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(delete_temp_bucket, |
| clients=clients[region], |
| func=funcs[region], |
| bucket=buckets[region]): region |
| for region in regions if buckets[region]} |
| done = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Remove temporary functions |
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = {executor.submit(delete_temp_function, |
| clients=clients[region], |
| func=funcs[region]): region |
| for region in regions if funcs[region]} |
| done = {futures[x]: x.result() for x in as_completed(futures)} |
| |
| # Show created images |
| for region, image in imports: |
| image_id = results[(region, image)] |
| print("%s %s (%s) %s" % (region, image.name, image.family, image_id)) |