"""Téléchargement des ressources (sets, minifigs, têtes) et enrichissement des URLs.""" import os import time import re import csv import shutil from pathlib import Path from typing import Callable, Dict, Iterable, List, Sequence import requests from lib.filesystem import ensure_parent_dir from lib.rebrickable.stats import read_rows def load_sets_enriched(path: Path) -> List[dict]: """Charge les sets enrichis pour accéder aux URLs d'images de set.""" return read_rows(path) def load_minifigs_by_set(path: Path) -> List[dict]: """Charge minifigs_by_set.csv en mémoire.""" return read_rows(path) def load_minifigs_catalog(path: Path) -> Dict[str, dict]: """Indexe les minifigs par identifiant.""" catalog: Dict[str, dict] = {} with path.open() as csv_file: reader = csv.DictReader(csv_file) for row in reader: catalog[row["fig_num"]] = row return catalog def fetch_part_img_url(part_num: str, token: str, session: requests.Session) -> str: """Récupère l'URL d'image d'une pièce via l'API Rebrickable.""" retries = 0 backoff = 2.0 while True: response = session.get(f"https://rebrickable.com/api/v3/lego/parts/{part_num}/", headers={"Authorization": f"key {token}"}) if response.status_code == 429: time.sleep(backoff) retries += 1 backoff = min(backoff * 1.5, 10.0) if retries > 8: response.raise_for_status() continue response.raise_for_status() payload = response.json() return payload["part_img_url"] def load_part_img_cache(cache_path: Path) -> Dict[str, str]: """Charge le cache des URLs de têtes s'il existe.""" if not cache_path.exists(): return {} cache: Dict[str, str] = {} with cache_path.open() as cache_file: reader = csv.DictReader(cache_file) for row in reader: cache[row["part_num"]] = row["part_img_url"] return cache def persist_part_img_cache(cache_path: Path, cache: Dict[str, str]) -> None: """Persist le cache des URLs pour reprise après interruption.""" ensure_parent_dir(cache_path) with cache_path.open("w", newline="") as cache_file: writer = csv.DictWriter(cache_file, fieldnames=["part_num", "part_img_url"]) writer.writeheader() for part_num, url in sorted(cache.items()): writer.writerow({"part_num": part_num, "part_img_url": url}) def build_part_img_lookup( part_numbers: Iterable[str], fetcher: Callable[[str], str], cache_path: Path | None = None, existing_cache: Dict[str, str] | None = None, delay_seconds: float = 1.6, ) -> Dict[str, str]: """Construit un index part_num -> URL d'image en espaçant les requêtes.""" cache = dict(existing_cache or {}) unique_parts = sorted(set(part_numbers)) for part_num in unique_parts: if part_num in cache: continue cache[part_num] = fetcher(part_num) if cache_path is not None: persist_part_img_cache(cache_path, cache) time.sleep(delay_seconds) return cache def add_part_img_urls(minifigs_rows: Iterable[dict], part_img_lookup: Dict[str, str]) -> List[dict]: """Ajoute part_img_url aux lignes minifigs_by_set.""" enriched: List[dict] = [] for row in minifigs_rows: existing = row.get("part_img_url", "").strip() part_img_url = existing if existing != "" else part_img_lookup[row["part_num"]] enriched.append( { "set_num": row["set_num"], "part_num": row["part_num"], "known_character": row["known_character"], "fig_num": row["fig_num"], "gender": row["gender"], "part_img_url": part_img_url, } ) return enriched def write_minifigs_by_set_with_images(destination_path: Path, rows: Sequence[dict]) -> None: """Réécrit le CSV minifigs_by_set avec la colonne part_img_url.""" ensure_parent_dir(destination_path) fieldnames = ["set_num", "part_num", "known_character", "fig_num", "gender", "part_img_url"] with destination_path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row) def sanitize_name(raw_name: str) -> str: """Nettoie un nom pour construire un chemin de fichier sûr.""" cleaned = re.sub(r"[^A-Za-z0-9]+", "_", raw_name).strip("_") if cleaned == "": return "Unknown" return cleaned def build_download_plan( sets_rows: Iterable[dict], minifigs_rows: Iterable[dict], minifigs_catalog: Dict[str, dict], base_dir: Path, ) -> List[dict]: """Construit la liste des fichiers à télécharger (sets, minifigs, têtes).""" plan: List[dict] = [] sets_list = list(sets_rows) set_ids: Dict[str, str] = {row["set_num"]: row["set_id"] for row in sets_list} for set_row in sets_list: set_dir = base_dir / set_row["set_id"] plan.append({"url": set_row["img_url"], "path": set_dir / "set.jpg"}) for row in minifigs_rows: if (row.get("known_character") or "").strip().lower() == "figurant": continue set_dir = base_dir / set_ids[row["set_num"]] character_dir = set_dir / sanitize_name(row["known_character"] or "Unknown") minifig = minifigs_catalog[row["fig_num"]] plan.append({"url": minifig["img_url"], "path": character_dir / "minifig.jpg"}) plan.append({"url": row["part_img_url"], "path": character_dir / "head.jpg"}) return plan def download_binary(url: str, destination_path: Path, session: requests.Session) -> bool: """Télécharge un binaire vers un chemin donné. Retourne False si 404.""" ensure_parent_dir(destination_path) response = session.get(url, stream=True) if response.status_code == 404: return False response.raise_for_status() with destination_path.open("wb") as target_file: for chunk in response.iter_content(chunk_size=8192): target_file.write(chunk) return True def download_resources( plan: Iterable[dict], downloader: Callable[[str, Path], bool], delay_seconds: float = 0.35, log_path: Path | None = None, ) -> None: """Exécute les téléchargements en espaçant les requêtes et journalise les statuts.""" cache: Dict[str, Path] = {} log_rows: List[dict] = [] for item in plan: if item["path"].exists(): time.sleep(delay_seconds) continue if item["url"] in cache and cache[item["url"]].exists(): ensure_parent_dir(item["path"]) shutil.copy2(cache[item["url"]], item["path"]) else: success = downloader(item["url"], item["path"]) if success: cache[item["url"]] = item["path"] else: log_rows.append({"url": item["url"], "path": str(item["path"]), "status": "missing"}) time.sleep(delay_seconds) if log_path is not None: ensure_parent_dir(log_path) with log_path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=["url", "path", "status"]) writer.writeheader() for row in log_rows: writer.writerow(row)