From 14a7dc8561c9232e247d87c360c2be2e5c181811 Mon Sep 17 00:00:00 2001 From: Richard Dern Date: Tue, 2 Dec 2025 22:33:13 +0100 Subject: [PATCH] =?UTF-8?q?Analyse=20la=20r=C3=A9utilisation=20des=20t?= =?UTF-8?q?=C3=AAtes=20de=20minifigs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 11 ++++ lib/plots/head_reuse.py | 53 +++++++++++++++++ lib/rebrickable/head_reuse.py | 109 ++++++++++++++++++++++++++++++++++ scripts/compute_head_reuse.py | 27 +++++++++ scripts/plot_head_reuse.py | 18 ++++++ tests/test_head_reuse.py | 96 ++++++++++++++++++++++++++++++ 6 files changed, 314 insertions(+) create mode 100644 lib/plots/head_reuse.py create mode 100644 lib/rebrickable/head_reuse.py create mode 100644 scripts/compute_head_reuse.py create mode 100644 scripts/plot_head_reuse.py create mode 100644 tests/test_head_reuse.py diff --git a/README.md b/README.md index 89a4e05..92154a7 100644 --- a/README.md +++ b/README.md @@ -353,4 +353,15 @@ Le script lit `data/intermediate/minifigs_by_set.csv`, `data/intermediate/sets_e - `data/intermediate/minifig_character_sets.csv` : apparitions des personnages avec set, identifiant de set, année, possession et fig_num. - `figures/step32/minifig_characters/{personnage}.png` : frise horizontale par personnage, composée des visuels de minifigs dans l’ordre chronologique, annotés avec l’année, le numéro de set (avec `*` si possédé) et l’identifiant de minifig. Les minifigs dont l’image n’est pas disponible sont remplacées par un rectangle neutre pour matérialiser le manque. - `figures/step32/minifig_heads/{personnage}.png` : même principe mais en utilisant les visuels de têtes (`head.jpg`) pour chaque apparition, annotés avec l’année, le set (avec `*` si possédé) et le `part_num` de la tête. + +### Étape 33 : réutilisation des têtes de minifigs dans le catalogue + +1. `source .venv/bin/activate` +2. `python -m scripts.compute_head_reuse` +3. `python -m scripts.plot_head_reuse` + +Le calcul lit `data/intermediate/minifigs_by_set.csv`, `data/raw/parts.csv`, `data/raw/inventories.csv` et `data/raw/inventory_parts.csv` pour recenser les têtes présentes dans les sets filtrés, puis compter combien de sets du catalogue les contiennent (rechanges exclues). Il produit : + +- `data/intermediate/head_reuse.csv` : pour chaque tête observée dans les sets filtrés, le nombre de sets filtrés qui la contiennent, le nombre de sets du reste du catalogue et le total. +- `figures/step33/head_reuse.png` : bar chart horizontal montrant, par tête, la part filtrée vs le reste du catalogue (têtes exclusives en haut). - Les étiquettes affichent aussi l’identifiant de la minifig (`fig-*`) et un astérisque à côté du set (`set_num*`) lorsqu’il est présent dans la collection. diff --git a/lib/plots/head_reuse.py b/lib/plots/head_reuse.py new file mode 100644 index 0000000..8eb9aad --- /dev/null +++ b/lib/plots/head_reuse.py @@ -0,0 +1,53 @@ +"""Visualisation de la réutilisation des têtes de minifigs.""" + +import csv +from pathlib import Path +from typing import List + +import matplotlib.pyplot as plt + +from lib.filesystem import ensure_parent_dir + + +def load_head_reuse(path: Path) -> List[dict]: + """Charge le CSV head_reuse.""" + rows: List[dict] = [] + with path.open() as csv_file: + reader = csv.DictReader(csv_file) + for row in reader: + rows.append(row) + return rows + + +def format_label(row: dict) -> str: + """Formate le label affiché sur l'axe vertical.""" + character = row["known_character"] + if character != "": + return f"{row['part_num']} — {character}" + return row["part_num"] + + +def plot_head_reuse(path: Path, destination_path: Path, top: int = 30) -> None: + """Trace un bar chart horizontal mettant en avant les têtes exclusives ou rares.""" + rows = load_head_reuse(path) + rows.sort(key=lambda r: (int(r["other_sets"]), -int(r["filtered_sets"]), r["part_num"])) + selected = rows[:top] + labels = [format_label(r) for r in selected] + filtered_counts = [int(r["filtered_sets"]) for r in selected] + other_counts = [int(r["other_sets"]) for r in selected] + positions = list(reversed(range(len(selected)))) + + fig, ax = plt.subplots(figsize=(12, 0.5 * len(selected) + 1.5)) + ax.barh(positions, filtered_counts, color="#1f78b4", label="Sets filtrés") + ax.barh(positions, other_counts, left=filtered_counts, color="#b2df8a", label="Autres sets") + ax.set_yticks(positions) + ax.set_yticklabels(reversed(labels)) + ax.set_xlabel("Nombre de sets contenant la tête") + ax.invert_yaxis() + ax.grid(axis="x", linestyle="--", alpha=0.4) + ax.legend() + fig.tight_layout() + + ensure_parent_dir(destination_path) + fig.savefig(destination_path, dpi=150) + plt.close(fig) diff --git a/lib/rebrickable/head_reuse.py b/lib/rebrickable/head_reuse.py new file mode 100644 index 0000000..7495c78 --- /dev/null +++ b/lib/rebrickable/head_reuse.py @@ -0,0 +1,109 @@ +"""Mesure la réutilisation des têtes de minifigs dans le catalogue LEGO.""" + +import csv +from collections import defaultdict +from pathlib import Path +from typing import Dict, Iterable, List, Sequence, Set + +from lib.filesystem import ensure_parent_dir +from lib.rebrickable.minifigs_by_set import load_parts_catalog, select_head_parts +from lib.rebrickable.parts_inventory import ( + index_inventory_parts_by_inventory, + normalize_boolean, + select_latest_inventories, +) +from lib.rebrickable.stats import read_rows + + +def load_minifigs_by_set(path: Path) -> List[dict]: + """Charge le CSV minifigs_by_set.""" + return read_rows(path) + + +def build_head_presence( + inventories_path: Path, + inventory_parts_path: Path, + head_parts: Set[str], +) -> Dict[str, Set[str]]: + """Indexe les sets contenant chaque tête (rechanges exclues).""" + inventories = select_latest_inventories(inventories_path) + parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path) + presence: Dict[str, Set[str]] = {} + for set_num, inventory in inventories.items(): + parts = parts_by_inventory.get(inventory["id"], []) + for part_row in parts: + if part_row["part_num"] not in head_parts: + continue + if normalize_boolean(part_row["is_spare"]) == "true": + continue + existing = presence.get(part_row["part_num"]) + if existing is None: + existing = set() + presence[part_row["part_num"]] = existing + existing.add(set_num) + return presence + + +def build_filtered_presence(minifigs_rows: Iterable[dict]) -> Dict[str, Set[str]]: + """Indexe les sets filtrés contenant chaque tête (hors figurants).""" + presence: Dict[str, Set[str]] = {} + for row in minifigs_rows: + if row["known_character"] == "Figurant": + continue + bucket = presence.get(row["part_num"]) + if bucket is None: + bucket = set() + presence[row["part_num"]] = bucket + bucket.add(row["set_num"]) + return presence + + +def build_character_labels(minifigs_rows: Iterable[dict]) -> Dict[str, str]: + """Associe à chaque tête un personnage représentatif (hors figurants).""" + labels: Dict[str, Set[str]] = defaultdict(set) + for row in minifigs_rows: + character = row["known_character"] + if character == "Figurant": + continue + labels[row["part_num"]].add(character) + representative: Dict[str, str] = {} + for part_num, characters in labels.items(): + representative[part_num] = sorted(characters)[0] + return representative + + +def aggregate_head_reuse( + minifigs_rows: Iterable[dict], + parts_catalog: Dict[str, dict], + head_presence: Dict[str, Set[str]], +) -> List[dict]: + """Construit le tableau des têtes présentes dans les sets filtrés avec leur réutilisation globale.""" + filtered_presence = build_filtered_presence(minifigs_rows) + labels = build_character_labels(minifigs_rows) + aggregates: List[dict] = [] + for part_num, filtered_sets in filtered_presence.items(): + all_sets = head_presence.get(part_num, set()) + other_sets = all_sets - filtered_sets + aggregates.append( + { + "part_num": part_num, + "part_name": parts_catalog[part_num]["name"], + "known_character": labels.get(part_num, ""), + "filtered_sets": str(len(filtered_sets)), + "other_sets": str(len(other_sets)), + "total_sets": str(len(all_sets)), + } + ) + aggregates.sort(key=lambda row: (int(row["other_sets"]), -int(row["filtered_sets"]), row["part_num"])) + return aggregates + + +def write_head_reuse(destination_path: Path, rows: Sequence[dict]) -> None: + """Écrit le CSV des usages de têtes filtrées vs reste du catalogue.""" + ensure_parent_dir(destination_path) + fieldnames = ["part_num", "part_name", "known_character", "filtered_sets", "other_sets", "total_sets"] + with destination_path.open("w", newline="") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) diff --git a/scripts/compute_head_reuse.py b/scripts/compute_head_reuse.py new file mode 100644 index 0000000..592057e --- /dev/null +++ b/scripts/compute_head_reuse.py @@ -0,0 +1,27 @@ +"""Calcule la réutilisation des têtes de minifigs dans le catalogue LEGO.""" + +from pathlib import Path + +from lib.rebrickable.head_reuse import aggregate_head_reuse, build_head_presence, load_minifigs_by_set, write_head_reuse +from lib.rebrickable.minifigs_by_set import load_parts_catalog, select_head_parts + + +MINIFIGS_BY_SET_PATH = Path("data/intermediate/minifigs_by_set.csv") +PARTS_CATALOG_PATH = Path("data/raw/parts.csv") +INVENTORIES_PATH = Path("data/raw/inventories.csv") +INVENTORY_PARTS_PATH = Path("data/raw/inventory_parts.csv") +DESTINATION_PATH = Path("data/intermediate/head_reuse.csv") + + +def main() -> None: + """Construit le CSV des têtes filtrées avec leurs usages dans le catalogue complet.""" + minifigs = load_minifigs_by_set(MINIFIGS_BY_SET_PATH) + parts_catalog = load_parts_catalog(PARTS_CATALOG_PATH) + head_parts = select_head_parts(parts_catalog) + presence = build_head_presence(INVENTORIES_PATH, INVENTORY_PARTS_PATH, head_parts) + reuse = aggregate_head_reuse(minifigs, parts_catalog, presence) + write_head_reuse(DESTINATION_PATH, reuse) + + +if __name__ == "__main__": + main() diff --git a/scripts/plot_head_reuse.py b/scripts/plot_head_reuse.py new file mode 100644 index 0000000..62f1f4e --- /dev/null +++ b/scripts/plot_head_reuse.py @@ -0,0 +1,18 @@ +"""Trace la réutilisation des têtes de minifigs dans le catalogue LEGO.""" + +from pathlib import Path + +from lib.plots.head_reuse import plot_head_reuse + + +HEAD_REUSE_PATH = Path("data/intermediate/head_reuse.csv") +DESTINATION_PATH = Path("figures/step33/head_reuse.png") + + +def main() -> None: + """Charge les données d'usage des têtes et produit le graphique associé.""" + plot_head_reuse(HEAD_REUSE_PATH, DESTINATION_PATH) + + +if __name__ == "__main__": + main() diff --git a/tests/test_head_reuse.py b/tests/test_head_reuse.py new file mode 100644 index 0000000..c883803 --- /dev/null +++ b/tests/test_head_reuse.py @@ -0,0 +1,96 @@ +"""Tests du calcul de réutilisation des têtes de minifigs.""" + +import csv +from pathlib import Path + +from lib.rebrickable.head_reuse import aggregate_head_reuse, build_head_presence +from lib.rebrickable.minifigs_by_set import load_parts_catalog, select_head_parts + + +def write_csv(path: Path, headers: list[str], rows: list[list[str]]) -> None: + """Écrit un petit CSV pour les tests.""" + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(headers) + writer.writerows(rows) + + +def test_head_reuse_counts_sets_and_catalog(tmp_path: Path) -> None: + """Compte les usages des têtes filtrées versus reste du catalogue.""" + minifigs_by_set = tmp_path / "minifigs_by_set.csv" + write_csv( + minifigs_by_set, + ["set_num", "part_num", "known_character", "fig_num", "gender"], + [ + ["s1-1", "p1", "Alice", "fig-1", "female"], + ["s2-1", "p2", "Bob", "fig-2", "male"], + ["s1-1", "p2", "Bob", "fig-2", "male"], + ], + ) + + parts_catalog = tmp_path / "parts.csv" + write_csv( + parts_catalog, + ["part_num", "name", "part_cat_id"], + [ + ["p1", "Head 1", "59"], + ["p2", "Head 2", "59"], + ["x1", "Other", "1"], + ], + ) + + inventories = tmp_path / "inventories.csv" + write_csv( + inventories, + ["id", "version", "set_num"], + [ + ["i1", "1", "s1-1"], + ["i2", "1", "s2-1"], + ["i3", "1", "s3-1"], + ["i4", "1", "s4-1"], + ], + ) + + inventory_parts = tmp_path / "inventory_parts.csv" + write_csv( + inventory_parts, + ["inventory_id", "part_num", "color_id", "quantity", "is_spare"], + [ + ["i1", "p1", "1", "1", "false"], + ["i2", "p2", "1", "1", "false"], + ["i3", "p2", "1", "1", "false"], + ["i4", "p2", "1", "1", "true"], + ["i4", "x1", "1", "1", "false"], + ], + ) + + head_parts = select_head_parts(load_parts_catalog(parts_catalog)) + presence = build_head_presence(inventories, inventory_parts, head_parts) + reuse = aggregate_head_reuse( + [ + {"set_num": "s1-1", "part_num": "p1", "known_character": "Alice", "fig_num": "fig-1"}, + {"set_num": "s2-1", "part_num": "p2", "known_character": "Bob", "fig_num": "fig-2"}, + {"set_num": "s1-1", "part_num": "p2", "known_character": "Bob", "fig_num": "fig-2"}, + ], + load_parts_catalog(parts_catalog), + presence, + ) + assert reuse == [ + { + "part_num": "p1", + "part_name": "Head 1", + "known_character": "Alice", + "filtered_sets": "1", + "other_sets": "0", + "total_sets": "1", + }, + { + "part_num": "p2", + "part_name": "Head 2", + "known_character": "Bob", + "filtered_sets": "2", + "other_sets": "1", + "total_sets": "2", + }, + ]