1

202 lines
7.3 KiB
Python

"""Téléchargement des ressources (sets, minifigs, têtes) et enrichissement des URLs."""
import os
import time
import re
import csv
import shutil
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Sequence
import requests
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.stats import read_rows
def load_sets_enriched(path: Path) -> List[dict]:
"""Charge les sets enrichis pour accéder aux URLs d'images de set."""
return read_rows(path)
def load_minifigs_by_set(path: Path) -> List[dict]:
"""Charge minifigs_by_set.csv en mémoire."""
return read_rows(path)
def load_minifigs_catalog(path: Path) -> Dict[str, dict]:
"""Indexe les minifigs par identifiant."""
catalog: Dict[str, dict] = {}
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
catalog[row["fig_num"]] = row
return catalog
def fetch_part_img_url(part_num: str, token: str, session: requests.Session) -> str:
"""Récupère l'URL d'image d'une pièce via l'API Rebrickable."""
retries = 0
backoff = 2.0
while True:
response = session.get(f"https://rebrickable.com/api/v3/lego/parts/{part_num}/", headers={"Authorization": f"key {token}"})
if response.status_code == 429:
time.sleep(backoff)
retries += 1
backoff = min(backoff * 1.5, 10.0)
if retries > 8:
response.raise_for_status()
continue
response.raise_for_status()
payload = response.json()
return payload["part_img_url"]
def load_part_img_cache(cache_path: Path) -> Dict[str, str]:
"""Charge le cache des URLs de têtes s'il existe."""
if not cache_path.exists():
return {}
cache: Dict[str, str] = {}
with cache_path.open() as cache_file:
reader = csv.DictReader(cache_file)
for row in reader:
cache[row["part_num"]] = row["part_img_url"]
return cache
def persist_part_img_cache(cache_path: Path, cache: Dict[str, str]) -> None:
"""Persist le cache des URLs pour reprise après interruption."""
ensure_parent_dir(cache_path)
with cache_path.open("w", newline="") as cache_file:
writer = csv.DictWriter(cache_file, fieldnames=["part_num", "part_img_url"])
writer.writeheader()
for part_num, url in sorted(cache.items()):
writer.writerow({"part_num": part_num, "part_img_url": url})
def build_part_img_lookup(
part_numbers: Iterable[str],
fetcher: Callable[[str], str],
cache_path: Path | None = None,
existing_cache: Dict[str, str] | None = None,
delay_seconds: float = 1.6,
) -> Dict[str, str]:
"""Construit un index part_num -> URL d'image en espaçant les requêtes."""
cache = dict(existing_cache or {})
unique_parts = sorted(set(part_numbers))
for part_num in unique_parts:
if part_num in cache:
continue
cache[part_num] = fetcher(part_num)
if cache_path is not None:
persist_part_img_cache(cache_path, cache)
time.sleep(delay_seconds)
return cache
def add_part_img_urls(minifigs_rows: Iterable[dict], part_img_lookup: Dict[str, str]) -> List[dict]:
"""Ajoute part_img_url aux lignes minifigs_by_set."""
enriched: List[dict] = []
for row in minifigs_rows:
existing = row.get("part_img_url", "").strip()
part_img_url = existing if existing != "" else part_img_lookup[row["part_num"]]
enriched.append(
{
"set_num": row["set_num"],
"part_num": row["part_num"],
"known_character": row["known_character"],
"fig_num": row["fig_num"],
"gender": row["gender"],
"part_img_url": part_img_url,
}
)
return enriched
def write_minifigs_by_set_with_images(destination_path: Path, rows: Sequence[dict]) -> None:
"""Réécrit le CSV minifigs_by_set avec la colonne part_img_url."""
ensure_parent_dir(destination_path)
fieldnames = ["set_num", "part_num", "known_character", "fig_num", "gender", "part_img_url"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
def sanitize_name(raw_name: str) -> str:
"""Nettoie un nom pour construire un chemin de fichier sûr."""
cleaned = re.sub(r"[^A-Za-z0-9]+", "_", raw_name).strip("_")
if cleaned == "":
return "Unknown"
return cleaned
def build_download_plan(
sets_rows: Iterable[dict],
minifigs_rows: Iterable[dict],
minifigs_catalog: Dict[str, dict],
base_dir: Path,
) -> List[dict]:
"""Construit la liste des fichiers à télécharger (sets, minifigs, têtes)."""
plan: List[dict] = []
sets_list = list(sets_rows)
set_ids: Dict[str, str] = {row["set_num"]: row["set_id"] for row in sets_list}
for set_row in sets_list:
set_dir = base_dir / set_row["set_id"]
plan.append({"url": set_row["img_url"], "path": set_dir / "set.jpg"})
for row in minifigs_rows:
if (row.get("known_character") or "").strip().lower() == "figurant":
continue
set_dir = base_dir / set_ids[row["set_num"]]
character_dir = set_dir / sanitize_name(row["known_character"] or "Unknown")
minifig = minifigs_catalog[row["fig_num"]]
plan.append({"url": minifig["img_url"], "path": character_dir / "minifig.jpg"})
plan.append({"url": row["part_img_url"], "path": character_dir / "head.jpg"})
return plan
def download_binary(url: str, destination_path: Path, session: requests.Session) -> bool:
"""Télécharge un binaire vers un chemin donné. Retourne False si 404."""
ensure_parent_dir(destination_path)
response = session.get(url, stream=True)
if response.status_code == 404:
return False
response.raise_for_status()
with destination_path.open("wb") as target_file:
for chunk in response.iter_content(chunk_size=8192):
target_file.write(chunk)
return True
def download_resources(
plan: Iterable[dict],
downloader: Callable[[str, Path], bool],
delay_seconds: float = 0.35,
log_path: Path | None = None,
) -> None:
"""Exécute les téléchargements en espaçant les requêtes et journalise les statuts."""
cache: Dict[str, Path] = {}
log_rows: List[dict] = []
for item in plan:
if item["path"].exists():
time.sleep(delay_seconds)
continue
if item["url"] in cache and cache[item["url"]].exists():
ensure_parent_dir(item["path"])
shutil.copy2(cache[item["url"]], item["path"])
else:
success = downloader(item["url"], item["path"])
if success:
cache[item["url"]] = item["path"]
else:
log_rows.append({"url": item["url"], "path": str(item["path"]), "status": "missing"})
time.sleep(delay_seconds)
if log_path is not None:
ensure_parent_dir(log_path)
with log_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["url", "path", "status"])
writer.writeheader()
for row in log_rows:
writer.writerow(row)