| # SPDX-License-Identifier: GPL-2.0-or-later |
| # |
| # Utilities for python-based QEMU tests |
| # |
| # Copyright 2024 Red Hat, Inc. |
| # |
| # Authors: |
| # Thomas Huth <thuth@redhat.com> |
| |
| import os |
| from subprocess import check_call, run, DEVNULL |
| import tarfile |
| from urllib.parse import urlparse |
| import zipfile |
| |
| from .asset import Asset |
| |
| |
| def tar_extract(archive, dest_dir, member=None): |
| with tarfile.open(archive) as tf: |
| if hasattr(tarfile, 'data_filter'): |
| tf.extraction_filter = getattr(tarfile, 'data_filter', |
| (lambda member, path: member)) |
| if member: |
| tf.extract(member=member, path=dest_dir) |
| else: |
| tf.extractall(path=dest_dir) |
| |
| def cpio_extract(archive, output_path): |
| cwd = os.getcwd() |
| os.chdir(output_path) |
| # Not passing 'check=True' as cpio exits with non-zero |
| # status if the archive contains any device nodes :-( |
| if type(archive) == str: |
| run(['cpio', '-i', '-F', archive], |
| stdout=DEVNULL, stderr=DEVNULL) |
| else: |
| run(['cpio', '-i'], |
| input=archive.read(), |
| stdout=DEVNULL, stderr=DEVNULL) |
| os.chdir(cwd) |
| |
| def zip_extract(archive, dest_dir, member=None): |
| with zipfile.ZipFile(archive, 'r') as zf: |
| if member: |
| zf.extract(member=member, path=dest_dir) |
| else: |
| zf.extractall(path=dest_dir) |
| |
| def deb_extract(archive, dest_dir, member=None): |
| cwd = os.getcwd() |
| os.chdir(dest_dir) |
| try: |
| proc = run(['ar', 't', archive], |
| check=True, capture_output=True, encoding='utf8') |
| file_path = proc.stdout.split()[2] |
| check_call(['ar', 'x', archive, file_path], |
| stdout=DEVNULL, stderr=DEVNULL) |
| tar_extract(file_path, dest_dir, member) |
| finally: |
| os.chdir(cwd) |
| |
| ''' |
| @params archive: filename, Asset, or file-like object to extract |
| @params dest_dir: target directory to extract into |
| @params member: optional member file to limit extraction to |
| |
| Extracts @archive into @dest_dir. All files are extracted |
| unless @member specifies a limit. |
| |
| If @format is None, heuristics will be applied to guess the format |
| from the filename or Asset URL. @format must be non-None if @archive |
| is a file-like object. |
| ''' |
| def archive_extract(archive, dest_dir, format=None, member=None): |
| if format is None: |
| format = guess_archive_format(archive) |
| if type(archive) == Asset: |
| archive = str(archive) |
| |
| if format == "tar": |
| tar_extract(archive, dest_dir, member) |
| elif format == "zip": |
| zip_extract(archive, dest_dir, member) |
| elif format == "cpio": |
| if member is not None: |
| raise Exception("Unable to filter cpio extraction") |
| cpio_extract(archive, dest_dir) |
| elif format == "deb": |
| if type(archive) != str: |
| raise Exception("Unable to use file-like object with deb archives") |
| deb_extract(archive, dest_dir, "./" + member) |
| else: |
| raise Exception(f"Unknown archive format {format}") |
| |
| ''' |
| @params archive: filename, or Asset to guess |
| |
| Guess the format of @compressed, raising an exception if |
| no format can be determined |
| ''' |
| def guess_archive_format(archive): |
| if type(archive) == Asset: |
| archive = urlparse(archive.url).path |
| elif type(archive) != str: |
| raise Exception(f"Unable to guess archive format for {archive}") |
| |
| if ".tar." in archive or archive.endswith("tgz"): |
| return "tar" |
| elif archive.endswith(".zip"): |
| return "zip" |
| elif archive.endswith(".cpio"): |
| return "cpio" |
| elif archive.endswith(".deb") or archive.endswith(".udeb"): |
| return "deb" |
| else: |
| raise Exception(f"Unknown archive format for {archive}") |