You've already forked etude_lego_jurassic_world
Rend le téléchargement des ressources plus patient et cache
This commit is contained in:
201
lib/rebrickable/resources.py
Normal file
201
lib/rebrickable/resources.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""Téléchargement des ressources (sets, minifigs, têtes) et enrichissement des URLs."""
|
||||
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
import csv
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Callable, Dict, Iterable, List, Sequence
|
||||
|
||||
import requests
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
def load_sets_enriched(path: Path) -> List[dict]:
|
||||
"""Charge les sets enrichis pour accéder aux URLs d'images de set."""
|
||||
return read_rows(path)
|
||||
|
||||
|
||||
def load_minifigs_by_set(path: Path) -> List[dict]:
|
||||
"""Charge minifigs_by_set.csv en mémoire."""
|
||||
return read_rows(path)
|
||||
|
||||
|
||||
def load_minifigs_catalog(path: Path) -> Dict[str, dict]:
|
||||
"""Indexe les minifigs par identifiant."""
|
||||
catalog: Dict[str, dict] = {}
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
for row in reader:
|
||||
catalog[row["fig_num"]] = row
|
||||
return catalog
|
||||
|
||||
|
||||
def fetch_part_img_url(part_num: str, token: str, session: requests.Session) -> str:
|
||||
"""Récupère l'URL d'image d'une pièce via l'API Rebrickable."""
|
||||
retries = 0
|
||||
backoff = 2.0
|
||||
while True:
|
||||
response = session.get(f"https://rebrickable.com/api/v3/lego/parts/{part_num}/", headers={"Authorization": f"key {token}"})
|
||||
if response.status_code == 429:
|
||||
time.sleep(backoff)
|
||||
retries += 1
|
||||
backoff = min(backoff * 1.5, 10.0)
|
||||
if retries > 8:
|
||||
response.raise_for_status()
|
||||
continue
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
return payload["part_img_url"]
|
||||
|
||||
|
||||
def load_part_img_cache(cache_path: Path) -> Dict[str, str]:
|
||||
"""Charge le cache des URLs de têtes s'il existe."""
|
||||
if not cache_path.exists():
|
||||
return {}
|
||||
cache: Dict[str, str] = {}
|
||||
with cache_path.open() as cache_file:
|
||||
reader = csv.DictReader(cache_file)
|
||||
for row in reader:
|
||||
cache[row["part_num"]] = row["part_img_url"]
|
||||
return cache
|
||||
|
||||
|
||||
def persist_part_img_cache(cache_path: Path, cache: Dict[str, str]) -> None:
|
||||
"""Persist le cache des URLs pour reprise après interruption."""
|
||||
ensure_parent_dir(cache_path)
|
||||
with cache_path.open("w", newline="") as cache_file:
|
||||
writer = csv.DictWriter(cache_file, fieldnames=["part_num", "part_img_url"])
|
||||
writer.writeheader()
|
||||
for part_num, url in sorted(cache.items()):
|
||||
writer.writerow({"part_num": part_num, "part_img_url": url})
|
||||
|
||||
|
||||
def build_part_img_lookup(
|
||||
part_numbers: Iterable[str],
|
||||
fetcher: Callable[[str], str],
|
||||
cache_path: Path | None = None,
|
||||
existing_cache: Dict[str, str] | None = None,
|
||||
delay_seconds: float = 1.6,
|
||||
) -> Dict[str, str]:
|
||||
"""Construit un index part_num -> URL d'image en espaçant les requêtes."""
|
||||
cache = dict(existing_cache or {})
|
||||
unique_parts = sorted(set(part_numbers))
|
||||
for part_num in unique_parts:
|
||||
if part_num in cache:
|
||||
continue
|
||||
cache[part_num] = fetcher(part_num)
|
||||
if cache_path is not None:
|
||||
persist_part_img_cache(cache_path, cache)
|
||||
time.sleep(delay_seconds)
|
||||
return cache
|
||||
|
||||
|
||||
def add_part_img_urls(minifigs_rows: Iterable[dict], part_img_lookup: Dict[str, str]) -> List[dict]:
|
||||
"""Ajoute part_img_url aux lignes minifigs_by_set."""
|
||||
enriched: List[dict] = []
|
||||
for row in minifigs_rows:
|
||||
existing = row.get("part_img_url", "").strip()
|
||||
part_img_url = existing if existing != "" else part_img_lookup[row["part_num"]]
|
||||
enriched.append(
|
||||
{
|
||||
"set_num": row["set_num"],
|
||||
"part_num": row["part_num"],
|
||||
"known_character": row["known_character"],
|
||||
"fig_num": row["fig_num"],
|
||||
"gender": row["gender"],
|
||||
"part_img_url": part_img_url,
|
||||
}
|
||||
)
|
||||
return enriched
|
||||
|
||||
|
||||
def write_minifigs_by_set_with_images(destination_path: Path, rows: Sequence[dict]) -> None:
|
||||
"""Réécrit le CSV minifigs_by_set avec la colonne part_img_url."""
|
||||
ensure_parent_dir(destination_path)
|
||||
fieldnames = ["set_num", "part_num", "known_character", "fig_num", "gender", "part_img_url"]
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def sanitize_name(raw_name: str) -> str:
|
||||
"""Nettoie un nom pour construire un chemin de fichier sûr."""
|
||||
cleaned = re.sub(r"[^A-Za-z0-9]+", "_", raw_name).strip("_")
|
||||
if cleaned == "":
|
||||
return "Unknown"
|
||||
return cleaned
|
||||
|
||||
|
||||
def build_download_plan(
|
||||
sets_rows: Iterable[dict],
|
||||
minifigs_rows: Iterable[dict],
|
||||
minifigs_catalog: Dict[str, dict],
|
||||
base_dir: Path,
|
||||
) -> List[dict]:
|
||||
"""Construit la liste des fichiers à télécharger (sets, minifigs, têtes)."""
|
||||
plan: List[dict] = []
|
||||
sets_list = list(sets_rows)
|
||||
set_ids: Dict[str, str] = {row["set_num"]: row["set_id"] for row in sets_list}
|
||||
for set_row in sets_list:
|
||||
set_dir = base_dir / set_row["set_id"]
|
||||
plan.append({"url": set_row["img_url"], "path": set_dir / "set.jpg"})
|
||||
for row in minifigs_rows:
|
||||
if (row.get("known_character") or "").strip().lower() == "figurant":
|
||||
continue
|
||||
set_dir = base_dir / set_ids[row["set_num"]]
|
||||
character_dir = set_dir / sanitize_name(row["known_character"] or "Unknown")
|
||||
minifig = minifigs_catalog[row["fig_num"]]
|
||||
plan.append({"url": minifig["img_url"], "path": character_dir / "minifig.jpg"})
|
||||
plan.append({"url": row["part_img_url"], "path": character_dir / "head.jpg"})
|
||||
return plan
|
||||
|
||||
|
||||
def download_binary(url: str, destination_path: Path, session: requests.Session) -> bool:
|
||||
"""Télécharge un binaire vers un chemin donné. Retourne False si 404."""
|
||||
ensure_parent_dir(destination_path)
|
||||
response = session.get(url, stream=True)
|
||||
if response.status_code == 404:
|
||||
return False
|
||||
response.raise_for_status()
|
||||
with destination_path.open("wb") as target_file:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
target_file.write(chunk)
|
||||
return True
|
||||
|
||||
|
||||
def download_resources(
|
||||
plan: Iterable[dict],
|
||||
downloader: Callable[[str, Path], bool],
|
||||
delay_seconds: float = 0.35,
|
||||
log_path: Path | None = None,
|
||||
) -> None:
|
||||
"""Exécute les téléchargements en espaçant les requêtes et journalise les statuts."""
|
||||
cache: Dict[str, Path] = {}
|
||||
log_rows: List[dict] = []
|
||||
for item in plan:
|
||||
if item["path"].exists():
|
||||
time.sleep(delay_seconds)
|
||||
continue
|
||||
if item["url"] in cache and cache[item["url"]].exists():
|
||||
ensure_parent_dir(item["path"])
|
||||
shutil.copy2(cache[item["url"]], item["path"])
|
||||
else:
|
||||
success = downloader(item["url"], item["path"])
|
||||
if success:
|
||||
cache[item["url"]] = item["path"]
|
||||
else:
|
||||
log_rows.append({"url": item["url"], "path": str(item["path"]), "status": "missing"})
|
||||
time.sleep(delay_seconds)
|
||||
if log_path is not None:
|
||||
ensure_parent_dir(log_path)
|
||||
with log_path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=["url", "path", "status"])
|
||||
writer.writeheader()
|
||||
for row in log_rows:
|
||||
writer.writerow(row)
|
||||
Reference in New Issue
Block a user