202 lines
7.3 KiB
Python
202 lines
7.3 KiB
Python
"""Téléchargement des ressources (sets, minifigs, têtes) et enrichissement des URLs."""
|
|
|
|
import os
|
|
import time
|
|
import re
|
|
import csv
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Callable, Dict, Iterable, List, Sequence
|
|
|
|
import requests
|
|
|
|
from lib.filesystem import ensure_parent_dir
|
|
from lib.rebrickable.stats import read_rows
|
|
|
|
|
|
def load_sets_enriched(path: Path) -> List[dict]:
|
|
"""Charge les sets enrichis pour accéder aux URLs d'images de set."""
|
|
return read_rows(path)
|
|
|
|
|
|
def load_minifigs_by_set(path: Path) -> List[dict]:
|
|
"""Charge minifigs_by_set.csv en mémoire."""
|
|
return read_rows(path)
|
|
|
|
|
|
def load_minifigs_catalog(path: Path) -> Dict[str, dict]:
|
|
"""Indexe les minifigs par identifiant."""
|
|
catalog: Dict[str, dict] = {}
|
|
with path.open() as csv_file:
|
|
reader = csv.DictReader(csv_file)
|
|
for row in reader:
|
|
catalog[row["fig_num"]] = row
|
|
return catalog
|
|
|
|
|
|
def fetch_part_img_url(part_num: str, token: str, session: requests.Session) -> str:
|
|
"""Récupère l'URL d'image d'une pièce via l'API Rebrickable."""
|
|
retries = 0
|
|
backoff = 2.0
|
|
while True:
|
|
response = session.get(f"https://rebrickable.com/api/v3/lego/parts/{part_num}/", headers={"Authorization": f"key {token}"})
|
|
if response.status_code == 429:
|
|
time.sleep(backoff)
|
|
retries += 1
|
|
backoff = min(backoff * 1.5, 10.0)
|
|
if retries > 8:
|
|
response.raise_for_status()
|
|
continue
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
return payload["part_img_url"]
|
|
|
|
|
|
def load_part_img_cache(cache_path: Path) -> Dict[str, str]:
|
|
"""Charge le cache des URLs de têtes s'il existe."""
|
|
if not cache_path.exists():
|
|
return {}
|
|
cache: Dict[str, str] = {}
|
|
with cache_path.open() as cache_file:
|
|
reader = csv.DictReader(cache_file)
|
|
for row in reader:
|
|
cache[row["part_num"]] = row["part_img_url"]
|
|
return cache
|
|
|
|
|
|
def persist_part_img_cache(cache_path: Path, cache: Dict[str, str]) -> None:
|
|
"""Persist le cache des URLs pour reprise après interruption."""
|
|
ensure_parent_dir(cache_path)
|
|
with cache_path.open("w", newline="") as cache_file:
|
|
writer = csv.DictWriter(cache_file, fieldnames=["part_num", "part_img_url"])
|
|
writer.writeheader()
|
|
for part_num, url in sorted(cache.items()):
|
|
writer.writerow({"part_num": part_num, "part_img_url": url})
|
|
|
|
|
|
def build_part_img_lookup(
|
|
part_numbers: Iterable[str],
|
|
fetcher: Callable[[str], str],
|
|
cache_path: Path | None = None,
|
|
existing_cache: Dict[str, str] | None = None,
|
|
delay_seconds: float = 1.6,
|
|
) -> Dict[str, str]:
|
|
"""Construit un index part_num -> URL d'image en espaçant les requêtes."""
|
|
cache = dict(existing_cache or {})
|
|
unique_parts = sorted(set(part_numbers))
|
|
for part_num in unique_parts:
|
|
if part_num in cache:
|
|
continue
|
|
cache[part_num] = fetcher(part_num)
|
|
if cache_path is not None:
|
|
persist_part_img_cache(cache_path, cache)
|
|
time.sleep(delay_seconds)
|
|
return cache
|
|
|
|
|
|
def add_part_img_urls(minifigs_rows: Iterable[dict], part_img_lookup: Dict[str, str]) -> List[dict]:
|
|
"""Ajoute part_img_url aux lignes minifigs_by_set."""
|
|
enriched: List[dict] = []
|
|
for row in minifigs_rows:
|
|
existing = row.get("part_img_url", "").strip()
|
|
part_img_url = existing if existing != "" else part_img_lookup[row["part_num"]]
|
|
enriched.append(
|
|
{
|
|
"set_num": row["set_num"],
|
|
"part_num": row["part_num"],
|
|
"known_character": row["known_character"],
|
|
"fig_num": row["fig_num"],
|
|
"gender": row["gender"],
|
|
"part_img_url": part_img_url,
|
|
}
|
|
)
|
|
return enriched
|
|
|
|
|
|
def write_minifigs_by_set_with_images(destination_path: Path, rows: Sequence[dict]) -> None:
|
|
"""Réécrit le CSV minifigs_by_set avec la colonne part_img_url."""
|
|
ensure_parent_dir(destination_path)
|
|
fieldnames = ["set_num", "part_num", "known_character", "fig_num", "gender", "part_img_url"]
|
|
with destination_path.open("w", newline="") as csv_file:
|
|
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow(row)
|
|
|
|
|
|
def sanitize_name(raw_name: str) -> str:
|
|
"""Nettoie un nom pour construire un chemin de fichier sûr."""
|
|
cleaned = re.sub(r"[^A-Za-z0-9]+", "_", raw_name).strip("_")
|
|
if cleaned == "":
|
|
return "Unknown"
|
|
return cleaned
|
|
|
|
|
|
def build_download_plan(
|
|
sets_rows: Iterable[dict],
|
|
minifigs_rows: Iterable[dict],
|
|
minifigs_catalog: Dict[str, dict],
|
|
base_dir: Path,
|
|
) -> List[dict]:
|
|
"""Construit la liste des fichiers à télécharger (sets, minifigs, têtes)."""
|
|
plan: List[dict] = []
|
|
sets_list = list(sets_rows)
|
|
set_ids: Dict[str, str] = {row["set_num"]: row["set_id"] for row in sets_list}
|
|
for set_row in sets_list:
|
|
set_dir = base_dir / set_row["set_id"]
|
|
plan.append({"url": set_row["img_url"], "path": set_dir / "set.jpg"})
|
|
for row in minifigs_rows:
|
|
if (row.get("known_character") or "").strip().lower() == "figurant":
|
|
continue
|
|
set_dir = base_dir / set_ids[row["set_num"]]
|
|
character_dir = set_dir / sanitize_name(row["known_character"] or "Unknown")
|
|
minifig = minifigs_catalog[row["fig_num"]]
|
|
plan.append({"url": minifig["img_url"], "path": character_dir / "minifig.jpg"})
|
|
plan.append({"url": row["part_img_url"], "path": character_dir / "head.jpg"})
|
|
return plan
|
|
|
|
|
|
def download_binary(url: str, destination_path: Path, session: requests.Session) -> bool:
|
|
"""Télécharge un binaire vers un chemin donné. Retourne False si 404."""
|
|
ensure_parent_dir(destination_path)
|
|
response = session.get(url, stream=True)
|
|
if response.status_code == 404:
|
|
return False
|
|
response.raise_for_status()
|
|
with destination_path.open("wb") as target_file:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
target_file.write(chunk)
|
|
return True
|
|
|
|
|
|
def download_resources(
|
|
plan: Iterable[dict],
|
|
downloader: Callable[[str, Path], bool],
|
|
delay_seconds: float = 0.35,
|
|
log_path: Path | None = None,
|
|
) -> None:
|
|
"""Exécute les téléchargements en espaçant les requêtes et journalise les statuts."""
|
|
cache: Dict[str, Path] = {}
|
|
log_rows: List[dict] = []
|
|
for item in plan:
|
|
if item["path"].exists():
|
|
time.sleep(delay_seconds)
|
|
continue
|
|
if item["url"] in cache and cache[item["url"]].exists():
|
|
ensure_parent_dir(item["path"])
|
|
shutil.copy2(cache[item["url"]], item["path"])
|
|
else:
|
|
success = downloader(item["url"], item["path"])
|
|
if success:
|
|
cache[item["url"]] = item["path"]
|
|
else:
|
|
log_rows.append({"url": item["url"], "path": str(item["path"]), "status": "missing"})
|
|
time.sleep(delay_seconds)
|
|
if log_path is not None:
|
|
ensure_parent_dir(log_path)
|
|
with log_path.open("w", newline="") as csv_file:
|
|
writer = csv.DictWriter(csv_file, fieldnames=["url", "path", "status"])
|
|
writer.writeheader()
|
|
for row in log_rows:
|
|
writer.writerow(row)
|