1
etude_lego_jurassic_world/scripts/download_sticker_resources.py

77 lines
2.4 KiB
Python

"""Télécharge les images des planches d'autocollants des sets filtrés."""
import csv
import os
from pathlib import Path
import requests
from dotenv import load_dotenv
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.resources import (
build_part_img_lookup,
download_binary,
download_resources,
fetch_part_img_url,
load_part_img_cache,
persist_part_img_cache,
)
from lib.rebrickable.stats import read_rows
STICKER_PARTS_PATH = Path("data/intermediate/sticker_parts.csv")
RESOURCES_DIR = Path("figures/rebrickable")
PART_IMG_CACHE_PATH = Path("data/intermediate/part_img_cache.csv")
DOWNLOAD_LOG_PATH = Path("data/intermediate/sticker_download_log.csv")
REQUEST_DELAY_SECONDS_IMAGES = 0.35
REQUEST_DELAY_SECONDS_LOOKUP = 0.6
def main() -> None:
"""Construit les URLs manquantes et télécharge les planches d'autocollants."""
load_dotenv()
token = os.environ["REBRICKABLE_TOKEN"]
session = requests.Session()
stickers = read_rows(STICKER_PARTS_PATH)
cache = load_part_img_cache(PART_IMG_CACHE_PATH)
part_img_lookup = build_part_img_lookup(
{row["part_num"] for row in stickers},
fetcher=lambda part_num: fetch_part_img_url(part_num, token, session),
cache_path=PART_IMG_CACHE_PATH,
existing_cache=cache,
delay_seconds=REQUEST_DELAY_SECONDS_LOOKUP,
)
if cache:
part_img_lookup.update(cache)
persist_part_img_cache(PART_IMG_CACHE_PATH, part_img_lookup)
plan = []
missing_log = []
for row in stickers:
url = part_img_lookup.get(row["part_num"])
path = RESOURCES_DIR / row["set_id"] / "stickers" / f"{row['part_num']}.jpg"
if not url or not str(url).startswith("http"):
missing_log.append({"url": url or "", "path": str(path), "status": "missing_url"})
continue
plan.append({"url": url, "path": path})
download_resources(
plan,
downloader=lambda url, path: download_binary(url, path, session),
delay_seconds=REQUEST_DELAY_SECONDS_IMAGES,
log_path=DOWNLOAD_LOG_PATH if not missing_log else None,
)
if missing_log:
ensure_parent_dir(DOWNLOAD_LOG_PATH)
with DOWNLOAD_LOG_PATH.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["url", "path", "status"])
writer.writeheader()
for row in missing_log:
writer.writerow(row)
if __name__ == "__main__":
main()