"""Télécharge les images des planches d'autocollants des sets filtrés.""" import csv import os from pathlib import Path import requests from dotenv import load_dotenv from lib.filesystem import ensure_parent_dir from lib.rebrickable.resources import ( build_part_img_lookup, download_binary, download_resources, fetch_part_img_url, load_part_img_cache, persist_part_img_cache, ) from lib.rebrickable.stats import read_rows STICKER_PARTS_PATH = Path("data/intermediate/sticker_parts.csv") RESOURCES_DIR = Path("figures/rebrickable") PART_IMG_CACHE_PATH = Path("data/intermediate/part_img_cache.csv") DOWNLOAD_LOG_PATH = Path("data/intermediate/sticker_download_log.csv") REQUEST_DELAY_SECONDS_IMAGES = 0.35 REQUEST_DELAY_SECONDS_LOOKUP = 0.6 def main() -> None: """Construit les URLs manquantes et télécharge les planches d'autocollants.""" load_dotenv() token = os.environ["REBRICKABLE_TOKEN"] session = requests.Session() stickers = read_rows(STICKER_PARTS_PATH) cache = load_part_img_cache(PART_IMG_CACHE_PATH) part_img_lookup = build_part_img_lookup( {row["part_num"] for row in stickers}, fetcher=lambda part_num: fetch_part_img_url(part_num, token, session), cache_path=PART_IMG_CACHE_PATH, existing_cache=cache, delay_seconds=REQUEST_DELAY_SECONDS_LOOKUP, ) if cache: part_img_lookup.update(cache) persist_part_img_cache(PART_IMG_CACHE_PATH, part_img_lookup) plan = [] missing_log = [] for row in stickers: url = part_img_lookup.get(row["part_num"]) path = RESOURCES_DIR / row["set_id"] / "stickers" / f"{row['part_num']}.jpg" if not url or not str(url).startswith("http"): missing_log.append({"url": url or "", "path": str(path), "status": "missing_url"}) continue plan.append({"url": url, "path": path}) download_resources( plan, downloader=lambda url, path: download_binary(url, path, session), delay_seconds=REQUEST_DELAY_SECONDS_IMAGES, log_path=DOWNLOAD_LOG_PATH if not missing_log else None, ) if missing_log: ensure_parent_dir(DOWNLOAD_LOG_PATH) with DOWNLOAD_LOG_PATH.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=["url", "path", "status"]) writer.writeheader() for row in missing_log: writer.writerow(row) if __name__ == "__main__": main()