Source code for nrk_psapi.utils

from __future__ import annotations

import asyncio
from fractions import Fraction
from http import HTTPStatus
from io import BytesIO
import math
import re
from typing import TYPE_CHECKING

from aiohttp import ClientError, ClientResponseError, ClientSession
from PIL import Image as PILImage

from nrk_psapi.const import LOGGER as _LOGGER

if TYPE_CHECKING:
    from yarl import URL

    from nrk_psapi.models import FetchedFileInfo, Image


[docs] def get_nested_items(data: dict[str, any], items_key: str) -> list[dict[str, any]]: """Get nested items from a dictionary based on the provided items_key.""" items = data for key in items_key.split("."): items = items.get(key, {}) if not isinstance(items, list): # pragma: no cover raise TypeError(f"Expected a list at '{items_key}', but got {type(items)}") return items
[docs] def get_image(images: list[Image], min_size: int | None = None) -> Image | None: candidates = [img for img in images if img.width is not None] if min_size is None: candidates.sort(key=lambda img: img.width, reverse=True) return candidates[0] if candidates else None return next((img for img in candidates if img.width >= min_size), None)
[docs] def sanitize_string(s: str, delimiter: str = "_"): """Sanitize a string to be used as a URL parameter.""" s = s.lower().replace(" ", delimiter) s = s.replace("æ", "ae").replace("ø", "oe").replace("å", "aa") return re.sub(rf"^[0-9{delimiter}]+", "", re.sub(rf"[^a-z0-9{delimiter}]", "", s))[:50].rstrip(delimiter)
[docs] async def fetch_file_info(url: URL | str, session: ClientSession | None = None) -> FetchedFileInfo: """Retrieve content-length and content-type for the given URL.""" close_session = False if session is None: session = ClientSession() close_session = True _LOGGER.debug("Fetching file info from %s", url) response = await session.head(url, allow_redirects=True) content_length = response.headers.get("Content-Length") mime_type = response.headers.get("Content-Type") if close_session: await session.close() return {"content_length": int(content_length), "content_type": mime_type}
[docs] def parse_aspect_ratio(ar: str) -> Fraction: width, height = map(int, ar.split(":")) if width <= 0 or height <= 0: raise ValueError("Invalid aspect ratio format. Aspect ratio values must be positive integers") if width > 100 or height > 100: # noqa: PLR2004 raise ValueError("Invalid aspect ratio format. Aspect ratio values should be reasonable (<=100)") return Fraction(width, height)
[docs] async def tiled_images( image_urls: list[str], tile_size: int = 100, columns: int = 3, aspect_ratio: str | None = None, *, session: ClientSession | None = None, ) -> bytes: """Generate a tiled image from a list of image URLs. Args: image_urls: List of image URLs to tile. tile_size: Size of each tile in pixels. columns: Number of columns in the tiled image. aspect_ratio: Desired aspect ratio of the resulting image, e.g. "16:9". If not provided, the aspect ratio will be a result of the number of image_urls divided by columns. session: Optional aiohttp session to use. If not provided, a new session will be created. """ async def process_image(sess, url): try: async with sess.get(url) as response: if response.status == HTTPStatus.OK: data = await response.read() content_type = response.headers.get("Content-Type", "").lower() if "jpeg" in content_type or "jpg" in content_type: return data _LOGGER.warning(f"Unsupported image type from {url}: {content_type}") return None _LOGGER.warning(f"Failed to fetch image from {url}. Status code: {response.status}") return None except (ClientError, ClientResponseError) as e: _LOGGER.warning(f"Error processing image from {url}: {e!s}") return None aspect = None if aspect_ratio is not None: aspect = parse_aspect_ratio(aspect_ratio) close_session = False if session is None: session = ClientSession() close_session = True tasks = [process_image(session, url) for url in image_urls] images = await asyncio.gather(*tasks) if close_session: await session.close() # Filter out None values (failed image downloads) images = [img for img in images if img is not None] if aspect_ratio is None: rows = math.ceil(len(images) / columns) total_tiles = len(images) else: # Calculate the number of rows needed to match the aspect ratio total_tiles = columns * math.ceil(columns * aspect.denominator / aspect.numerator) rows = math.ceil(total_tiles / columns) # Create a new image to hold the tiles result = PILImage.new("RGB", (tile_size * columns, tile_size * rows)) # Place each image in the grid for index, image_data in enumerate(images): if aspect_ratio is not None and index >= total_tiles: break row = index // columns col = index % columns with PILImage.open(BytesIO(image_data)) as img: img.thumbnail((tile_size, tile_size)) x = col * tile_size y = row * tile_size result.paste(img, (x, y)) if aspect_ratio is not None: final_height = int(result.width * aspect.denominator / aspect.numerator) result = result.crop((0, 0, result.width, final_height)) # Convert the result to bytes output = BytesIO() result.save(output, format="JPEG") return output.getvalue()