1

Ajoute les agrégats et visualisations globales des couleurs de têtes

This commit is contained in:
2025-12-01 23:56:03 +01:00
parent d7b4ad8031
commit 47ee76cacf
10 changed files with 502 additions and 0 deletions

View File

@@ -0,0 +1,90 @@
"""Visualisation des couleurs de têtes de minifigs sur le catalogue complet."""
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
import matplotlib.pyplot as plt
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.stats import read_rows
def load_global_heads(heads_path: Path) -> List[dict]:
"""Charge l'agrégat global des têtes par année."""
return read_rows(heads_path)
def select_top_colors(rows: Iterable[dict], limit: int = 12) -> List[Tuple[str, str, str]]:
"""Retourne les couleurs les plus fréquentes globalement (nom, rgb, is_translucent)."""
totals: Dict[Tuple[str, str, str], int] = {}
for row in rows:
key = (row["color_name"], row["color_rgb"], row["is_translucent"])
totals[key] = totals.get(key, 0) + int(row["quantity"])
sorted_colors = sorted(totals.items(), key=lambda item: (-item[1], item[0][0], item[0][1]))
return [color for color, _ in sorted_colors[:limit]]
def build_share_matrix(
rows: Iterable[dict], top_colors: List[Tuple[str, str, str]]
) -> Tuple[List[int], List[Tuple[str, str, str]], List[Dict[str, float]]]:
"""Construit les parts par année en regroupant les couleurs hors top dans 'Autres'."""
years = sorted({int(row["year"]) for row in rows})
colors = top_colors + [("Autres", "444444", "false")]
shares_by_year: List[Dict[str, float]] = []
rows_by_year: Dict[int, List[dict]] = {year: [] for year in years}
for row in rows:
rows_by_year[int(row["year"])].append(row)
for year in years:
year_rows = rows_by_year[year]
total = sum(int(r["quantity"]) for r in year_rows)
shares: Dict[str, float] = {color[0]: 0.0 for color in colors}
for r in year_rows:
key = (r["color_name"], r["color_rgb"], r["is_translucent"])
quantity = int(r["quantity"])
target = "Autres" if key not in top_colors else r["color_name"]
shares[target] = shares.get(target, 0.0) + quantity / total if total > 0 else 0.0
shares_by_year.append(shares)
return years, colors, shares_by_year
def plot_global_head_shares(
heads_path: Path,
destination_path: Path,
top_limit: int = 12,
) -> None:
"""Trace les parts des couleurs de têtes de minifigs par année (catalogue complet)."""
rows = load_global_heads(heads_path)
top_colors = select_top_colors(rows, limit=top_limit)
years, colors, shares_by_year = build_share_matrix(rows, top_colors)
fig, ax = plt.subplots(figsize=(14, 6))
bottoms = [0.0] * len(years)
y_positions = list(range(len(years)))
for name, color_rgb, is_trans in colors:
values = [shares[name] for shares in shares_by_year]
edge = "#f2f2f2" if is_trans == "true" else "#0d0d0d"
ax.bar(
years,
values,
bottom=bottoms,
color=f"#{color_rgb}",
edgecolor=edge,
label=name,
linewidth=0.7,
)
bottoms = [b + v for b, v in zip(bottoms, values)]
ax.set_ylim(0, 1.05)
ax.set_ylabel("Part des couleurs (têtes de minifigs, catalogue complet)")
ax.set_xlabel("Année")
if len(years) > 15:
step = max(1, len(years) // 10)
ax.set_xticks(years[::step])
else:
ax.set_xticks(years)
ax.set_title("Répartition des couleurs de têtes de minifigs par année (catalogue complet)")
ax.legend(loc="upper left", bbox_to_anchor=(1.02, 1), frameon=False)
ax.grid(True, axis="y", linestyle="--", alpha=0.25)
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=170)
plt.close(fig)

View File

@@ -0,0 +1,86 @@
"""Visualisation de la part des têtes jaunes sur le catalogue global."""
from pathlib import Path
from typing import Dict, List
import matplotlib.pyplot as plt
from lib.filesystem import ensure_parent_dir
from lib.milestones import load_milestones
from lib.rebrickable.stats import read_rows
def compute_yellow_share(rows: List[dict]) -> List[dict]:
"""Calcule la part de la couleur Yellow par année."""
aggregated: Dict[str, Dict[str, int]] = {}
for row in rows:
year = row["year"]
if year not in aggregated:
aggregated[year] = {"yellow": 0, "total": 0}
aggregated[year]["total"] += int(row["quantity"])
if row["color_name"].lower() == "yellow" or row["color_rgb"].upper() == "FFFF00":
aggregated[year]["yellow"] += int(row["quantity"])
results = []
for year in sorted(aggregated.keys(), key=int):
total = aggregated[year]["total"]
yellow = aggregated[year]["yellow"]
share = yellow / total if total > 0 else 0
results.append({"year": int(year), "yellow_share": share, "total": total})
return results
def plot_yellow_share(heads_path: Path, milestones_path: Path, destination_path: Path) -> None:
"""Trace l'évolution de la part de têtes jaunes dans le catalogue complet."""
rows = read_rows(heads_path)
milestones = load_milestones(milestones_path)
series = compute_yellow_share(rows)
years = [item["year"] for item in series]
shares = [item["yellow_share"] for item in series]
fig, ax = plt.subplots(figsize=(13, 5.5))
ax.plot(years, shares, color="#f2c300", marker="o", linewidth=2.4, label="Part Yellow")
ax.fill_between(years, shares, color="#f2c300", alpha=0.18)
ax.set_ylim(0, min(1.0, max(shares + [0.01]) * 1.1))
ax.set_ylabel("Part de têtes Yellow")
ax.set_xlabel("Année")
if len(years) > 15:
step = max(1, len(years) // 10)
ax.set_xticks(years[::step])
else:
ax.set_xticks(years)
ax.set_title("Evolution de l'usage des têtes Yellow (catalogue complet)")
ax.grid(True, linestyle="--", alpha=0.3)
if milestones:
min_year = min(years)
max_year = max(years)
milestones_in_range = sorted(
[m for m in milestones if min_year <= m["year"] <= max_year],
key=lambda m: (m["year"], m["description"]),
)
offset_map: Dict[int, int] = {}
offset_step = 0.35
top_limit = ax.get_ylim()[1] * 1.05
for milestone in milestones_in_range:
year = milestone["year"]
count_for_year = offset_map.get(year, 0)
offset_map[year] = count_for_year + 1
horizontal_offset = offset_step * (count_for_year // 2 + 1)
if count_for_year % 2 == 1:
horizontal_offset *= -1
text_x = year + horizontal_offset
ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65, zorder=1)
ax.text(
text_x,
top_limit,
milestone["description"],
rotation=90,
verticalalignment="top",
horizontalalignment="center",
fontsize=8,
color="#d62728",
)
ax.set_ylim(ax.get_ylim()[0], top_limit * (1 + max(offset_map.values(), default=0) * 0.02))
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=170)
plt.close(fig)

View File

@@ -0,0 +1,103 @@
"""Extraction des couleurs de têtes de minifigs sur le catalogue complet."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple
from lib.rebrickable.parts_inventory import normalize_boolean, select_latest_inventories
HEAD_CATEGORIES = {"59"}
def load_head_parts(parts_path: Path, head_categories: Set[str] | None = None) -> Set[str]:
"""Construit l'ensemble des références de têtes via leur catégorie."""
categories = head_categories or HEAD_CATEGORIES
head_parts: Set[str] = set()
with parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
if row["part_cat_id"] in categories:
head_parts.add(row["part_num"])
return head_parts
def build_sets_year_lookup(sets_path: Path) -> Dict[str, str]:
"""Indexe les années par set_num."""
lookup: Dict[str, str] = {}
with sets_path.open() as sets_file:
reader = csv.DictReader(sets_file)
for row in reader:
lookup[row["set_num"]] = row["year"]
return lookup
def build_color_lookup(colors_path: Path) -> Dict[str, dict]:
"""Construit un index des couleurs par identifiant."""
lookup: Dict[str, dict] = {}
with colors_path.open() as colors_file:
reader = csv.DictReader(colors_file)
for row in reader:
lookup[row["id"]] = {
"rgb": row["rgb"],
"is_translucent": row["is_trans"].lower(),
"name": row["name"],
}
return lookup
def aggregate_global_heads_by_year(
inventories_path: Path,
inventory_parts_path: Path,
parts_path: Path,
colors_path: Path,
sets_path: Path,
head_categories: Set[str] | None = None,
) -> List[dict]:
"""Agrège les couleurs de têtes par année sur le catalogue complet."""
head_parts = load_head_parts(parts_path, head_categories)
latest_inventories = select_latest_inventories(inventories_path)
latest_inventory_ids = {data["id"]: set_num for set_num, data in latest_inventories.items()}
colors_lookup = build_color_lookup(colors_path)
sets_year = build_sets_year_lookup(sets_path)
aggregates: Dict[Tuple[str, str, str], dict] = {}
with inventory_parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
inventory_id = row["inventory_id"]
if inventory_id not in latest_inventory_ids:
continue
if row["part_num"] not in head_parts:
continue
if normalize_boolean(row["is_spare"]) == "true":
continue
set_num = latest_inventory_ids[inventory_id]
year = sets_year.get(set_num)
if year is None:
continue
color = colors_lookup[row["color_id"]]
key = (year, color["rgb"], color["is_translucent"])
existing = aggregates.get(key)
if existing is None:
aggregates[key] = {
"year": year,
"color_rgb": color["rgb"],
"is_translucent": color["is_translucent"],
"color_name": color["name"],
"quantity": 0,
}
existing = aggregates[key]
existing["quantity"] += int(row["quantity"])
results = list(aggregates.values())
results.sort(key=lambda r: (int(r["year"]), r["color_name"], r["is_translucent"]))
return results
def write_global_heads_by_year(destination_path: Path, rows: Iterable[dict]) -> None:
"""Sérialise l'agrégat global par année."""
fieldnames = ["year", "color_rgb", "is_translucent", "color_name", "quantity"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)