1
etude_lego_jurassic_world/lib/rebrickable/global_minifig_heads.py

108 lines
4.1 KiB
Python

"""Extraction des couleurs de têtes de minifigs sur le catalogue complet."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple
from lib.rebrickable.parts_inventory import normalize_boolean, select_latest_inventories
HEAD_CATEGORIES = {"59"}
def load_head_parts(parts_path: Path, head_categories: Set[str] | None = None, require_printed: bool = False) -> Dict[str, str]:
"""Construit l'ensemble des références de têtes via leur catégorie."""
categories = head_categories or HEAD_CATEGORIES
head_parts: Dict[str, str] = {}
with parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
if row["part_cat_id"] not in categories:
continue
if require_printed and "print" not in row["name"].lower():
continue
head_parts[row["part_num"]] = row["name"]
return head_parts
def build_sets_year_lookup(sets_path: Path) -> Dict[str, str]:
"""Indexe les années par set_num."""
lookup: Dict[str, str] = {}
with sets_path.open() as sets_file:
reader = csv.DictReader(sets_file)
for row in reader:
lookup[row["set_num"]] = row["year"]
return lookup
def build_color_lookup(colors_path: Path) -> Dict[str, dict]:
"""Construit un index des couleurs par identifiant."""
lookup: Dict[str, dict] = {}
with colors_path.open() as colors_file:
reader = csv.DictReader(colors_file)
for row in reader:
lookup[row["id"]] = {
"rgb": row["rgb"],
"is_translucent": row["is_trans"].lower(),
"name": row["name"],
}
return lookup
def aggregate_global_heads_by_year(
inventories_path: Path,
inventory_parts_path: Path,
parts_path: Path,
colors_path: Path,
sets_path: Path,
head_categories: Set[str] | None = None,
require_printed: bool = False,
) -> List[dict]:
"""Agrège les couleurs de têtes par année sur le catalogue complet."""
head_parts = load_head_parts(parts_path, head_categories, require_printed=require_printed)
latest_inventories = select_latest_inventories(inventories_path)
latest_inventory_ids = {data["id"]: set_num for set_num, data in latest_inventories.items()}
colors_lookup = build_color_lookup(colors_path)
sets_year = build_sets_year_lookup(sets_path)
aggregates: Dict[Tuple[str, str, str], dict] = {}
with inventory_parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
inventory_id = row["inventory_id"]
if inventory_id not in latest_inventory_ids:
continue
if row["part_num"] not in head_parts:
continue
if normalize_boolean(row["is_spare"]) == "true":
continue
set_num = latest_inventory_ids[inventory_id]
year = sets_year.get(set_num)
if year is None:
continue
color = colors_lookup[row["color_id"]]
key = (year, color["rgb"], color["is_translucent"])
existing = aggregates.get(key)
if existing is None:
aggregates[key] = {
"year": year,
"color_rgb": color["rgb"],
"is_translucent": color["is_translucent"],
"color_name": color["name"],
"quantity": 0,
}
existing = aggregates[key]
existing["quantity"] += int(row["quantity"])
results = list(aggregates.values())
results.sort(key=lambda r: (int(r["year"]), r["color_name"], r["is_translucent"]))
return results
def write_global_heads_by_year(destination_path: Path, rows: Iterable[dict]) -> None:
"""Sérialise l'agrégat global par année."""
fieldnames = ["year", "color_rgb", "is_translucent", "color_name", "quantity"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)