1

117 lines
3.9 KiB
Python

"""Extraction des couleurs de têtes de minifigs."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple
from lib.rebrickable.colors_by_set import build_colors_lookup
from lib.rebrickable.color_ignores import is_ignored_color_rgb
from lib.rebrickable.stats import read_rows
HEAD_CATEGORIES = {"59"}
def load_parts_filtered(path: Path) -> List[dict]:
"""Charge parts_filtered.csv en mémoire."""
return read_rows(path)
def build_head_part_set(parts_catalog_path: Path) -> Set[str]:
"""Sélectionne les références de têtes via leur catégorie."""
head_parts: Set[str] = set()
with parts_catalog_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
if row["part_cat_id"] in HEAD_CATEGORIES:
head_parts.add(row["part_num"])
return head_parts
def aggregate_head_colors_by_set(
parts_rows: Iterable[dict],
head_parts: Set[str],
colors_lookup: Dict[Tuple[str, str], str],
) -> List[dict]:
"""Agrège les quantités de têtes par set et par couleur (hors rechanges)."""
aggregates: Dict[Tuple[str, str, str, str], dict] = {}
for row in parts_rows:
if row["part_num"] not in head_parts:
continue
if row["is_spare"] == "true":
continue
if is_ignored_color_rgb(row["color_rgb"]):
continue
key = (row["set_num"], row["set_id"], row["year"], row["color_rgb"])
existing = aggregates.get(key)
if existing is None:
aggregates[key] = {
"set_num": row["set_num"],
"set_id": row["set_id"],
"year": row["year"],
"color_rgb": row["color_rgb"],
"is_translucent": row["is_translucent"],
"color_name": colors_lookup[(row["color_rgb"], row["is_translucent"])],
"quantity": 0,
}
existing = aggregates[key]
existing["quantity"] += int(row["quantity_in_set"])
results = list(aggregates.values())
results.sort(key=lambda r: (r["set_num"], r["color_name"], r["is_translucent"]))
return results
def aggregate_head_colors_by_year(rows: Iterable[dict]) -> List[dict]:
"""Regroupe les têtes par année et par couleur."""
aggregates: Dict[Tuple[str, str, str], dict] = {}
for row in rows:
key = (row["year"], row["color_rgb"], row["is_translucent"])
existing = aggregates.get(key)
if existing is None:
aggregates[key] = {
"year": row["year"],
"color_rgb": row["color_rgb"],
"is_translucent": row["is_translucent"],
"color_name": row["color_name"],
"quantity": 0,
}
existing = aggregates[key]
existing["quantity"] += int(row["quantity"])
results = list(aggregates.values())
results.sort(key=lambda r: (int(r["year"]), r["color_name"], r["is_translucent"]))
return results
def write_head_colors_by_set(path: Path, rows: Iterable[dict]) -> None:
"""Écrit l'agrégat par set."""
fieldnames = [
"set_num",
"set_id",
"year",
"color_rgb",
"is_translucent",
"color_name",
"quantity",
]
with path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
def write_head_colors_by_year(path: Path, rows: Iterable[dict]) -> None:
"""Écrit l'agrégat par année."""
fieldnames = [
"year",
"color_rgb",
"is_translucent",
"color_name",
"quantity",
]
with path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)