You've already forked etude_lego_jurassic_world
Ajoute les visualisations des couleurs de têtes de minifigs et jalons
This commit is contained in:
163
lib/plots/minifig_heads.py
Normal file
163
lib/plots/minifig_heads.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""Visualisations des couleurs de têtes de minifigs."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.milestones import load_milestones
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
def load_heads_by_year(path: Path) -> List[dict]:
|
||||
"""Charge l'agrégat des têtes par année."""
|
||||
return read_rows(path)
|
||||
|
||||
|
||||
def select_top_colors(rows: Iterable[dict], limit: int = 10) -> List[Tuple[str, str, str]]:
|
||||
"""Retourne les couleurs les plus fréquentes (color_name, color_rgb, is_translucent)."""
|
||||
totals: Dict[Tuple[str, str, str], int] = {}
|
||||
for row in rows:
|
||||
key = (row["color_name"], row["color_rgb"], row["is_translucent"])
|
||||
totals[key] = totals.get(key, 0) + int(row["quantity"])
|
||||
sorted_colors = sorted(totals.items(), key=lambda item: (-item[1], item[0][0], item[0][1]))
|
||||
return [color for color, _ in sorted_colors[:limit]]
|
||||
|
||||
|
||||
def build_share_matrix(rows: Iterable[dict], top_colors: List[Tuple[str, str, str]]) -> Tuple[List[int], List[Tuple[str, str, str]], List[Dict[str, float]]]:
|
||||
"""Construit les parts par année, en agrégeant les couleurs hors top dans 'Autres'."""
|
||||
years = sorted({int(row["year"]) for row in rows})
|
||||
colors = top_colors + [("Autres", "444444", "false")]
|
||||
shares_by_year: List[Dict[str, float]] = []
|
||||
rows_by_year: Dict[int, List[dict]] = {year: [] for year in years}
|
||||
for row in rows:
|
||||
rows_by_year[int(row["year"])].append(row)
|
||||
for year in years:
|
||||
year_rows = rows_by_year[year]
|
||||
total = sum(int(r["quantity"]) for r in year_rows)
|
||||
shares: Dict[str, float] = {color[0]: 0.0 for color in colors}
|
||||
for r in year_rows:
|
||||
key = (r["color_name"], r["color_rgb"], r["is_translucent"])
|
||||
quantity = int(r["quantity"])
|
||||
target = "Autres" if key not in top_colors else r["color_name"]
|
||||
shares[target] = shares.get(target, 0.0) + quantity / total
|
||||
shares_by_year.append(shares)
|
||||
return years, colors, shares_by_year
|
||||
|
||||
|
||||
def plot_shares_by_year(
|
||||
heads_path: Path,
|
||||
destination_path: Path,
|
||||
top_limit: int = 10,
|
||||
milestones_path: Path | None = None,
|
||||
) -> None:
|
||||
"""Trace les parts des couleurs de têtes par année (stacked) avec jalons optionnels."""
|
||||
rows = load_heads_by_year(heads_path)
|
||||
top_colors = select_top_colors(rows, limit=top_limit)
|
||||
years, colors, shares_by_year = build_share_matrix(rows, top_colors)
|
||||
milestones = load_milestones(milestones_path) if milestones_path else []
|
||||
|
||||
fig, ax = plt.subplots(figsize=(14, 6))
|
||||
bottoms = [0.0] * len(years)
|
||||
for name, color_rgb, is_trans in colors:
|
||||
values = [shares[name] for shares in shares_by_year]
|
||||
edge = "#f2f2f2" if is_trans == "true" else "#0d0d0d"
|
||||
ax.bar(
|
||||
years,
|
||||
values,
|
||||
bottom=bottoms,
|
||||
color=f"#{color_rgb}",
|
||||
edgecolor=edge,
|
||||
label=name,
|
||||
linewidth=0.7,
|
||||
)
|
||||
bottoms = [b + v for b, v in zip(bottoms, values)]
|
||||
ax.set_ylim(0, 1.05)
|
||||
ax.set_ylabel("Part des couleurs (têtes de minifigs)")
|
||||
ax.set_xlabel("Année")
|
||||
ax.set_xticks(years)
|
||||
ax.set_title("Répartition des couleurs de peau (têtes de minifigs) par année")
|
||||
ax.legend(loc="upper left", bbox_to_anchor=(1.02, 1), frameon=False)
|
||||
ax.grid(True, axis="y", linestyle="--", alpha=0.25)
|
||||
if milestones:
|
||||
min_year = min(years)
|
||||
max_year = max(years)
|
||||
milestones_in_range = sorted(
|
||||
[m for m in milestones if min_year <= m["year"] <= max_year],
|
||||
key=lambda m: (m["year"], m["description"]),
|
||||
)
|
||||
offset_step = 0.3
|
||||
offset_map: Dict[int, int] = {}
|
||||
top_limit = ax.get_ylim()[1] * 2
|
||||
for milestone in milestones_in_range:
|
||||
year = milestone["year"]
|
||||
count_for_year = offset_map.get(year, 0)
|
||||
offset_map[year] = count_for_year + 1
|
||||
horizontal_offset = offset_step * (count_for_year // 2 + 1)
|
||||
if count_for_year % 2 == 1:
|
||||
horizontal_offset *= -1
|
||||
text_x = year + horizontal_offset
|
||||
ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65, zorder=1)
|
||||
ax.text(
|
||||
text_x,
|
||||
top_limit,
|
||||
milestone["description"],
|
||||
rotation=90,
|
||||
verticalalignment="top",
|
||||
horizontalalignment="center",
|
||||
fontsize=8,
|
||||
color="#d62728",
|
||||
)
|
||||
ax.set_ylim(ax.get_ylim()[0], top_limit * (1 + max(offset_map.values(), default=0) * 0.02))
|
||||
ensure_parent_dir(destination_path)
|
||||
fig.tight_layout()
|
||||
fig.savefig(destination_path, dpi=170)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def plot_global_shares(heads_path: Path, destination_path: Path, top_limit: int = 10) -> None:
|
||||
"""Trace une vue globale des parts de couleurs de têtes (donut)."""
|
||||
rows = load_heads_by_year(heads_path)
|
||||
top_colors = select_top_colors(rows, limit=top_limit)
|
||||
totals: Dict[Tuple[str, str, str], int] = {}
|
||||
for row in rows:
|
||||
key = (row["color_name"], row["color_rgb"], row["is_translucent"])
|
||||
totals[key] = totals.get(key, 0) + int(row["quantity"])
|
||||
other_total = sum(value for color, value in totals.items() if color not in top_colors)
|
||||
labels: List[str] = []
|
||||
colors_hex: List[str] = []
|
||||
edgecolors: List[str] = []
|
||||
sizes: List[int] = []
|
||||
for name, color_rgb, is_trans in top_colors:
|
||||
labels.append(name)
|
||||
colors_hex.append(f"#{color_rgb}")
|
||||
edgecolors.append("#f2f2f2" if is_trans == "true" else "#0d0d0d")
|
||||
sizes.append(totals[(name, color_rgb, is_trans)])
|
||||
if other_total > 0:
|
||||
labels.append("Autres")
|
||||
colors_hex.append("#444444")
|
||||
edgecolors.append("#0d0d0d")
|
||||
sizes.append(other_total)
|
||||
|
||||
fig, ax = plt.subplots(figsize=(7, 7))
|
||||
wedges, _ = ax.pie(
|
||||
sizes,
|
||||
labels=[""] * len(labels),
|
||||
colors=colors_hex,
|
||||
startangle=90,
|
||||
counterclock=False,
|
||||
wedgeprops={"linewidth": 1.0, "edgecolor": "#0d0d0d"},
|
||||
)
|
||||
for wedge, edge in zip(wedges, edgecolors):
|
||||
wedge.set_edgecolor(edge)
|
||||
total = sum(sizes)
|
||||
legend_labels = [f"{name} ({size/total:.1%})" for name, size in zip(labels, sizes)]
|
||||
ax.legend(wedges, legend_labels, loc="center left", bbox_to_anchor=(1.02, 0.5), frameon=False)
|
||||
centre_circle = plt.Circle((0, 0), 0.55, fc="white")
|
||||
ax.add_artist(centre_circle)
|
||||
ax.set_title("Répartition globale des couleurs de têtes de minifigs")
|
||||
ensure_parent_dir(destination_path)
|
||||
fig.tight_layout()
|
||||
fig.savefig(destination_path, dpi=170)
|
||||
plt.close(fig)
|
||||
113
lib/rebrickable/minifig_heads.py
Normal file
113
lib/rebrickable/minifig_heads.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""Extraction des couleurs de têtes de minifigs."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Set, Tuple
|
||||
|
||||
from lib.rebrickable.colors_by_set import build_colors_lookup
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
HEAD_CATEGORIES = {"59"}
|
||||
|
||||
|
||||
def load_parts_filtered(path: Path) -> List[dict]:
|
||||
"""Charge parts_filtered.csv en mémoire."""
|
||||
return read_rows(path)
|
||||
|
||||
|
||||
def build_head_part_set(parts_catalog_path: Path) -> Set[str]:
|
||||
"""Sélectionne les références de têtes via leur catégorie."""
|
||||
head_parts: Set[str] = set()
|
||||
with parts_catalog_path.open() as parts_file:
|
||||
reader = csv.DictReader(parts_file)
|
||||
for row in reader:
|
||||
if row["part_cat_id"] in HEAD_CATEGORIES:
|
||||
head_parts.add(row["part_num"])
|
||||
return head_parts
|
||||
|
||||
|
||||
def aggregate_head_colors_by_set(
|
||||
parts_rows: Iterable[dict],
|
||||
head_parts: Set[str],
|
||||
colors_lookup: Dict[Tuple[str, str], str],
|
||||
) -> List[dict]:
|
||||
"""Agrège les quantités de têtes par set et par couleur (hors rechanges)."""
|
||||
aggregates: Dict[Tuple[str, str, str, str], dict] = {}
|
||||
for row in parts_rows:
|
||||
if row["part_num"] not in head_parts:
|
||||
continue
|
||||
if row["is_spare"] == "true":
|
||||
continue
|
||||
key = (row["set_num"], row["set_id"], row["year"], row["color_rgb"])
|
||||
existing = aggregates.get(key)
|
||||
if existing is None:
|
||||
aggregates[key] = {
|
||||
"set_num": row["set_num"],
|
||||
"set_id": row["set_id"],
|
||||
"year": row["year"],
|
||||
"color_rgb": row["color_rgb"],
|
||||
"is_translucent": row["is_translucent"],
|
||||
"color_name": colors_lookup[(row["color_rgb"], row["is_translucent"])],
|
||||
"quantity": 0,
|
||||
}
|
||||
existing = aggregates[key]
|
||||
existing["quantity"] += int(row["quantity_in_set"])
|
||||
results = list(aggregates.values())
|
||||
results.sort(key=lambda r: (r["set_num"], r["color_name"], r["is_translucent"]))
|
||||
return results
|
||||
|
||||
|
||||
def aggregate_head_colors_by_year(rows: Iterable[dict]) -> List[dict]:
|
||||
"""Regroupe les têtes par année et par couleur."""
|
||||
aggregates: Dict[Tuple[str, str, str], dict] = {}
|
||||
for row in rows:
|
||||
key = (row["year"], row["color_rgb"], row["is_translucent"])
|
||||
existing = aggregates.get(key)
|
||||
if existing is None:
|
||||
aggregates[key] = {
|
||||
"year": row["year"],
|
||||
"color_rgb": row["color_rgb"],
|
||||
"is_translucent": row["is_translucent"],
|
||||
"color_name": row["color_name"],
|
||||
"quantity": 0,
|
||||
}
|
||||
existing = aggregates[key]
|
||||
existing["quantity"] += int(row["quantity"])
|
||||
results = list(aggregates.values())
|
||||
results.sort(key=lambda r: (int(r["year"]), r["color_name"], r["is_translucent"]))
|
||||
return results
|
||||
|
||||
|
||||
def write_head_colors_by_set(path: Path, rows: Iterable[dict]) -> None:
|
||||
"""Écrit l'agrégat par set."""
|
||||
fieldnames = [
|
||||
"set_num",
|
||||
"set_id",
|
||||
"year",
|
||||
"color_rgb",
|
||||
"is_translucent",
|
||||
"color_name",
|
||||
"quantity",
|
||||
]
|
||||
with path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def write_head_colors_by_year(path: Path, rows: Iterable[dict]) -> None:
|
||||
"""Écrit l'agrégat par année."""
|
||||
fieldnames = [
|
||||
"year",
|
||||
"color_rgb",
|
||||
"is_translucent",
|
||||
"color_name",
|
||||
"quantity",
|
||||
]
|
||||
with path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
Reference in New Issue
Block a user