|
from __future__ import annotations |
|
|
|
import os |
|
from io import BytesIO |
|
from typing import IO |
|
|
|
from . import ExifTags, Image, ImageFile |
|
|
|
try: |
|
from . import _avif |
|
|
|
SUPPORTED = True |
|
except ImportError: |
|
SUPPORTED = False |
|
|
|
|
|
|
|
DECODE_CODEC_CHOICE = "auto" |
|
|
|
DEFAULT_MAX_THREADS = 0 |
|
|
|
|
|
def get_codec_version(codec_name: str) -> str | None: |
|
versions = _avif.codec_versions() |
|
for version in versions.split(", "): |
|
if version.split(" [")[0] == codec_name: |
|
return version.split(":")[-1].split(" ")[0] |
|
return None |
|
|
|
|
|
def _accept(prefix: bytes) -> bool | str: |
|
if prefix[4:8] != b"ftyp": |
|
return False |
|
major_brand = prefix[8:12] |
|
if major_brand in ( |
|
|
|
b"avif", |
|
b"avis", |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
b"mif1", |
|
b"msf1", |
|
): |
|
if not SUPPORTED: |
|
return ( |
|
"image file could not be identified because AVIF support not installed" |
|
) |
|
return True |
|
return False |
|
|
|
|
|
def _get_default_max_threads() -> int: |
|
if DEFAULT_MAX_THREADS: |
|
return DEFAULT_MAX_THREADS |
|
if hasattr(os, "sched_getaffinity"): |
|
return len(os.sched_getaffinity(0)) |
|
else: |
|
return os.cpu_count() or 1 |
|
|
|
|
|
class AvifImageFile(ImageFile.ImageFile): |
|
format = "AVIF" |
|
format_description = "AVIF image" |
|
__frame = -1 |
|
|
|
def _open(self) -> None: |
|
if not SUPPORTED: |
|
msg = "image file could not be opened because AVIF support not installed" |
|
raise SyntaxError(msg) |
|
|
|
if DECODE_CODEC_CHOICE != "auto" and not _avif.decoder_codec_available( |
|
DECODE_CODEC_CHOICE |
|
): |
|
msg = "Invalid opening codec" |
|
raise ValueError(msg) |
|
self._decoder = _avif.AvifDecoder( |
|
self.fp.read(), |
|
DECODE_CODEC_CHOICE, |
|
_get_default_max_threads(), |
|
) |
|
|
|
|
|
self._size, self.n_frames, self._mode, icc, exif, exif_orientation, xmp = ( |
|
self._decoder.get_info() |
|
) |
|
self.is_animated = self.n_frames > 1 |
|
|
|
if icc: |
|
self.info["icc_profile"] = icc |
|
if xmp: |
|
self.info["xmp"] = xmp |
|
|
|
if exif_orientation != 1 or exif: |
|
exif_data = Image.Exif() |
|
if exif: |
|
exif_data.load(exif) |
|
original_orientation = exif_data.get(ExifTags.Base.Orientation, 1) |
|
else: |
|
original_orientation = 1 |
|
if exif_orientation != original_orientation: |
|
exif_data[ExifTags.Base.Orientation] = exif_orientation |
|
exif = exif_data.tobytes() |
|
if exif: |
|
self.info["exif"] = exif |
|
self.seek(0) |
|
|
|
def seek(self, frame: int) -> None: |
|
if not self._seek_check(frame): |
|
return |
|
|
|
|
|
self.__frame = frame |
|
self.tile = [ImageFile._Tile("raw", (0, 0) + self.size, 0, self.mode)] |
|
|
|
def load(self) -> Image.core.PixelAccess | None: |
|
if self.tile: |
|
|
|
data, timescale, pts_in_timescales, duration_in_timescales = ( |
|
self._decoder.get_frame(self.__frame) |
|
) |
|
self.info["timestamp"] = round(1000 * (pts_in_timescales / timescale)) |
|
self.info["duration"] = round(1000 * (duration_in_timescales / timescale)) |
|
|
|
if self.fp and self._exclusive_fp: |
|
self.fp.close() |
|
self.fp = BytesIO(data) |
|
|
|
return super().load() |
|
|
|
def load_seek(self, pos: int) -> None: |
|
pass |
|
|
|
def tell(self) -> int: |
|
return self.__frame |
|
|
|
|
|
def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: |
|
_save(im, fp, filename, save_all=True) |
|
|
|
|
|
def _save( |
|
im: Image.Image, fp: IO[bytes], filename: str | bytes, save_all: bool = False |
|
) -> None: |
|
info = im.encoderinfo.copy() |
|
if save_all: |
|
append_images = list(info.get("append_images", [])) |
|
else: |
|
append_images = [] |
|
|
|
total = 0 |
|
for ims in [im] + append_images: |
|
total += getattr(ims, "n_frames", 1) |
|
|
|
quality = info.get("quality", 75) |
|
if not isinstance(quality, int) or quality < 0 or quality > 100: |
|
msg = "Invalid quality setting" |
|
raise ValueError(msg) |
|
|
|
duration = info.get("duration", 0) |
|
subsampling = info.get("subsampling", "4:2:0") |
|
speed = info.get("speed", 6) |
|
max_threads = info.get("max_threads", _get_default_max_threads()) |
|
codec = info.get("codec", "auto") |
|
if codec != "auto" and not _avif.encoder_codec_available(codec): |
|
msg = "Invalid saving codec" |
|
raise ValueError(msg) |
|
range_ = info.get("range", "full") |
|
tile_rows_log2 = info.get("tile_rows", 0) |
|
tile_cols_log2 = info.get("tile_cols", 0) |
|
alpha_premultiplied = bool(info.get("alpha_premultiplied", False)) |
|
autotiling = bool(info.get("autotiling", tile_rows_log2 == tile_cols_log2 == 0)) |
|
|
|
icc_profile = info.get("icc_profile", im.info.get("icc_profile")) |
|
exif_orientation = 1 |
|
if exif := info.get("exif"): |
|
if isinstance(exif, Image.Exif): |
|
exif_data = exif |
|
else: |
|
exif_data = Image.Exif() |
|
exif_data.load(exif) |
|
if ExifTags.Base.Orientation in exif_data: |
|
exif_orientation = exif_data.pop(ExifTags.Base.Orientation) |
|
exif = exif_data.tobytes() if exif_data else b"" |
|
elif isinstance(exif, Image.Exif): |
|
exif = exif_data.tobytes() |
|
|
|
xmp = info.get("xmp") |
|
|
|
if isinstance(xmp, str): |
|
xmp = xmp.encode("utf-8") |
|
|
|
advanced = info.get("advanced") |
|
if advanced is not None: |
|
if isinstance(advanced, dict): |
|
advanced = advanced.items() |
|
try: |
|
advanced = tuple(advanced) |
|
except TypeError: |
|
invalid = True |
|
else: |
|
invalid = any(not isinstance(v, tuple) or len(v) != 2 for v in advanced) |
|
if invalid: |
|
msg = ( |
|
"advanced codec options must be a dict of key-value string " |
|
"pairs or a series of key-value two-tuples" |
|
) |
|
raise ValueError(msg) |
|
|
|
|
|
enc = _avif.AvifEncoder( |
|
im.size, |
|
subsampling, |
|
quality, |
|
speed, |
|
max_threads, |
|
codec, |
|
range_, |
|
tile_rows_log2, |
|
tile_cols_log2, |
|
alpha_premultiplied, |
|
autotiling, |
|
icc_profile or b"", |
|
exif or b"", |
|
exif_orientation, |
|
xmp or b"", |
|
advanced, |
|
) |
|
|
|
|
|
frame_idx = 0 |
|
frame_duration = 0 |
|
cur_idx = im.tell() |
|
is_single_frame = total == 1 |
|
try: |
|
for ims in [im] + append_images: |
|
|
|
nfr = getattr(ims, "n_frames", 1) |
|
|
|
for idx in range(nfr): |
|
ims.seek(idx) |
|
|
|
|
|
frame = ims |
|
rawmode = ims.mode |
|
if ims.mode not in {"RGB", "RGBA"}: |
|
rawmode = "RGBA" if ims.has_transparency_data else "RGB" |
|
frame = ims.convert(rawmode) |
|
|
|
|
|
if isinstance(duration, (list, tuple)): |
|
frame_duration = duration[frame_idx] |
|
else: |
|
frame_duration = duration |
|
|
|
|
|
enc.add( |
|
frame.tobytes("raw", rawmode), |
|
frame_duration, |
|
frame.size, |
|
rawmode, |
|
is_single_frame, |
|
) |
|
|
|
|
|
frame_idx += 1 |
|
|
|
if not save_all: |
|
break |
|
|
|
finally: |
|
im.seek(cur_idx) |
|
|
|
|
|
data = enc.finish() |
|
if data is None: |
|
msg = "cannot write file as AVIF (encoder returned None)" |
|
raise OSError(msg) |
|
|
|
fp.write(data) |
|
|
|
|
|
Image.register_open(AvifImageFile.format, AvifImageFile, _accept) |
|
if SUPPORTED: |
|
Image.register_save(AvifImageFile.format, _save) |
|
Image.register_save_all(AvifImageFile.format, _save_all) |
|
Image.register_extensions(AvifImageFile.format, [".avif", ".avifs"]) |
|
Image.register_mime(AvifImageFile.format, "image/avif") |
|
|