You've already forked etude_lego_jurassic_world
Premiers éléments de l'étude
This commit is contained in:
1
lib/__init__.py
Normal file
1
lib/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Fonctions de support pour l'étude des sets LEGO."""
|
||||
59
lib/color_sort.py
Normal file
59
lib/color_sort.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Outils de tri de couleurs dans un espace perceptuel."""
|
||||
|
||||
import math
|
||||
from typing import Iterable, List, Tuple
|
||||
|
||||
import numpy as np
|
||||
from colorspacious import cspace_convert
|
||||
|
||||
|
||||
def hex_to_rgb_unit(hex_value: str) -> np.ndarray:
|
||||
"""Convertit un code hexadécimal en tableau RGB normalisé (0-1)."""
|
||||
return np.array([int(hex_value[index : index + 2], 16) / 255 for index in (0, 2, 4)], dtype=float)
|
||||
|
||||
|
||||
def lab_components(hex_value: str) -> Tuple[float, float, float, float, float]:
|
||||
"""Retourne (hue_angle, chroma, lightness, a*, b*) pour une couleur."""
|
||||
l_component, a_component, b_component = cspace_convert(hex_to_rgb_unit(hex_value), "sRGB1", "CIELab")
|
||||
hue_angle = math.atan2(b_component, a_component)
|
||||
chroma = math.hypot(a_component, b_component)
|
||||
return hue_angle, chroma, l_component, a_component, b_component
|
||||
|
||||
|
||||
def sort_hex_colors_lab(
|
||||
hex_values: Iterable[str],
|
||||
hue_offset_degrees: float = 60.0,
|
||||
neutral_threshold: float = 3.0,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Trie des couleurs par teinte perceptuelle, puis chroma et luminosité.
|
||||
|
||||
- Les couleurs quasi neutres (chroma < seuil) sont déplacées en fin de liste, triées par luminosité.
|
||||
- Le cercle chromatique peut être décalé via hue_offset_degrees (par défaut 60° pour démarrer vers le jaune).
|
||||
"""
|
||||
offset_radians = math.radians(hue_offset_degrees)
|
||||
chromatic: List[Tuple[float, float, float, str]] = []
|
||||
neutrals: List[Tuple[float, str]] = []
|
||||
for hex_value in hex_values:
|
||||
hue_angle, chroma, lightness, _, _ = lab_components(hex_value)
|
||||
if chroma < neutral_threshold:
|
||||
neutrals.append((lightness, hex_value))
|
||||
continue
|
||||
hue = hue_angle + offset_radians
|
||||
if hue < 0:
|
||||
hue += 2 * math.pi
|
||||
chromatic.append((hue, -chroma, lightness, hex_value))
|
||||
chromatic.sort()
|
||||
neutrals.sort()
|
||||
return [item[3] for item in chromatic] + [item[1] for item in neutrals]
|
||||
|
||||
|
||||
def lab_sort_key(hex_value: str, hue_offset_degrees: float = 60.0, neutral_threshold: float = 3.0) -> Tuple[int, float, float, float]:
|
||||
"""Clé de tri unique (bucket chromatique/neutre) pour un usage ponctuel."""
|
||||
hue_angle, chroma, lightness, _, _ = lab_components(hex_value)
|
||||
if chroma < neutral_threshold:
|
||||
return (1, 0.0, lightness, chroma)
|
||||
hue = hue_angle + math.radians(hue_offset_degrees)
|
||||
if hue < 0:
|
||||
hue += 2 * math.pi
|
||||
return (0, hue, -chroma, lightness)
|
||||
8
lib/filesystem.py
Normal file
8
lib/filesystem.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Fonctions utilitaires pour manipuler le système de fichiers."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def ensure_parent_dir(target_path: Path) -> None:
|
||||
"""Crée le répertoire parent d'un chemin de fichier s'il est absent."""
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
15
lib/milestones.py
Normal file
15
lib/milestones.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Chargement des jalons (milestones) thématiques configurables."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
|
||||
def load_milestones(path: Path) -> List[dict]:
|
||||
"""Charge la liste des jalons depuis un fichier CSV à deux colonnes (year, description)."""
|
||||
milestones = []
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
for row in reader:
|
||||
milestones.append({"year": int(row["year"]), "description": row["description"]})
|
||||
return milestones
|
||||
1
lib/plots/__init__.py
Normal file
1
lib/plots/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Utilitaires de visualisation des données LEGO."""
|
||||
174
lib/plots/colors_grid.py
Normal file
174
lib/plots/colors_grid.py
Normal file
@@ -0,0 +1,174 @@
|
||||
"""Visualisation des couleurs utilisées dans l'inventaire filtré."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from matplotlib.lines import Line2D
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.color_sort import lab_sort_key, sort_hex_colors_lab
|
||||
from lib.rebrickable.parts_inventory import normalize_boolean
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
def sort_colors_perceptually(colors: Iterable[dict]) -> List[dict]:
|
||||
"""Trie les couleurs via l'espace Lab (teinte perçue, chroma, luminosité)."""
|
||||
ordered_hex = sort_hex_colors_lab(color["color_rgb"] for color in colors)
|
||||
index_map = {hex_value: index for index, hex_value in enumerate(ordered_hex)}
|
||||
return sorted(colors, key=lambda color: index_map[color["color_rgb"]])
|
||||
|
||||
|
||||
def load_used_colors(parts_path: Path, colors_path: Path, minifig_only: bool = False) -> List[dict]:
|
||||
"""Charge les couleurs utilisées (hors rechanges) et leurs quantités totales.
|
||||
|
||||
Si minifig_only est vrai, ne conserve que les pièces marquées is_minifig_part=true.
|
||||
Sinon, exclut les pièces de minifig.
|
||||
"""
|
||||
rows = read_rows(parts_path)
|
||||
colors_lookup = {(row["rgb"], normalize_boolean(row["is_trans"])): row["name"] for row in read_rows(colors_path)}
|
||||
totals: Dict[Tuple[str, str], int] = {}
|
||||
for row in rows:
|
||||
if minifig_only and row.get("is_minifig_part") != "true":
|
||||
continue
|
||||
if not minifig_only and row.get("is_minifig_part") == "true":
|
||||
continue
|
||||
key = (row["color_rgb"], row["is_translucent"])
|
||||
totals[key] = totals.get(key, 0) + int(row["quantity_in_set"])
|
||||
used_colors = []
|
||||
for (color_rgb, is_translucent), quantity in totals.items():
|
||||
used_colors.append(
|
||||
{
|
||||
"color_rgb": color_rgb,
|
||||
"is_translucent": is_translucent,
|
||||
"name": colors_lookup.get((color_rgb, is_translucent), color_rgb),
|
||||
"quantity": quantity,
|
||||
}
|
||||
)
|
||||
return sort_colors_perceptually(used_colors)
|
||||
|
||||
|
||||
def build_hex_positions(count: int, columns: int = 9, spacing: float = 1.1) -> List[Tuple[float, float]]:
|
||||
"""Construit des positions hexagonales pour une mise en page aérée."""
|
||||
positions: List[Tuple[float, float]] = []
|
||||
rows = (count + columns - 1) // columns
|
||||
vertical_gap = spacing * 0.85
|
||||
for row in range(rows):
|
||||
offset = 0.0 if row % 2 == 0 else spacing / 2
|
||||
for col in range(columns):
|
||||
index = row * columns + col
|
||||
if index >= count:
|
||||
return positions
|
||||
x = col * spacing + offset
|
||||
y = -row * vertical_gap
|
||||
positions.append((x, y))
|
||||
return positions
|
||||
|
||||
|
||||
def build_background(width: float, height: float, resolution: int = 600) -> np.ndarray:
|
||||
"""Génère un fond dégradé pour mettre en valeur les couleurs translucides."""
|
||||
x = np.linspace(-1.0, 1.0, resolution)
|
||||
y = np.linspace(-1.0, 1.0, resolution)
|
||||
xv, yv = np.meshgrid(x, y)
|
||||
radial = np.sqrt(xv**2 + yv**2)
|
||||
diagonal = (xv + yv) / 2
|
||||
layer = 0.35 + 0.35 * (1 - radial) + 0.2 * diagonal
|
||||
layer = np.clip(layer, 0.05, 0.95)
|
||||
background = np.dstack((layer * 0.9, layer * 0.92, layer))
|
||||
return background
|
||||
|
||||
|
||||
def plot_colors_grid(
|
||||
parts_path: Path,
|
||||
colors_path: Path,
|
||||
destination_path: Path,
|
||||
minifig_only: bool = False,
|
||||
) -> None:
|
||||
"""Dessine une grille artistique des couleurs utilisées."""
|
||||
colors = load_used_colors(parts_path, colors_path, minifig_only=minifig_only)
|
||||
positions = build_hex_positions(len(colors))
|
||||
x_values = [x for x, _ in positions]
|
||||
y_values = [y for _, y in positions]
|
||||
width = max(x_values) - min(x_values) + 1.5
|
||||
height = max(y_values) - min(y_values) + 1.5
|
||||
|
||||
fig, ax = plt.subplots(figsize=(10, 10), facecolor="#0b0c10")
|
||||
background = build_background(width, height)
|
||||
ax.imshow(
|
||||
background,
|
||||
extent=[min(x_values) - 0.75, min(x_values) - 0.75 + width, min(y_values) - 0.75, min(y_values) - 0.75 + height],
|
||||
origin="lower",
|
||||
zorder=0,
|
||||
)
|
||||
|
||||
max_quantity = max(color["quantity"] for color in colors)
|
||||
min_marker = 720
|
||||
max_marker = 1600
|
||||
|
||||
for (x, y), color in zip(positions, colors):
|
||||
is_translucent = color["is_translucent"] == "true"
|
||||
alpha = 0.65 if is_translucent else 1.0
|
||||
edge = "#f7f7f7" if is_translucent else "#0d0d0d"
|
||||
size = min_marker + (max_marker - min_marker) * (color["quantity"] / max_quantity)
|
||||
if is_translucent:
|
||||
ax.scatter(
|
||||
x,
|
||||
y,
|
||||
s=size * 1.25,
|
||||
c="#ffffff",
|
||||
alpha=0.18,
|
||||
edgecolors="none",
|
||||
linewidths=0,
|
||||
zorder=2,
|
||||
)
|
||||
ax.scatter(
|
||||
x,
|
||||
y,
|
||||
s=size,
|
||||
c=f"#{color['color_rgb']}",
|
||||
alpha=alpha,
|
||||
edgecolors=edge,
|
||||
linewidths=1.1,
|
||||
zorder=3,
|
||||
)
|
||||
|
||||
legend_handles = [
|
||||
Line2D([0], [0], marker="o", color="none", markerfacecolor="#cccccc", markeredgecolor="#0d0d0d", markersize=10, label="Opaque"),
|
||||
Line2D(
|
||||
[0],
|
||||
[0],
|
||||
marker="o",
|
||||
color="none",
|
||||
markerfacecolor="#cccccc",
|
||||
markeredgecolor="#f7f7f7",
|
||||
markersize=10,
|
||||
alpha=0.65,
|
||||
label="Translucide",
|
||||
),
|
||||
]
|
||||
legend_y = 1.06 if not minifig_only else 1.08
|
||||
ax.legend(
|
||||
handles=legend_handles,
|
||||
loc="upper center",
|
||||
bbox_to_anchor=(0.5, legend_y),
|
||||
ncol=2,
|
||||
frameon=False,
|
||||
labelcolor="#f0f0f0",
|
||||
)
|
||||
|
||||
title_prefix = "Palette des couleurs utilisées (rechanges incluses)"
|
||||
if minifig_only:
|
||||
title_prefix = "Palette des couleurs de minifigs (rechanges incluses)"
|
||||
ax.set_title(title_prefix, fontsize=14, color="#f0f0f0", pad=28)
|
||||
ax.set_xticks([])
|
||||
ax.set_yticks([])
|
||||
ax.set_xlim(min(x_values) - 1.0, max(x_values) + 1.0)
|
||||
ax.set_ylim(min(y_values) - 1.0, max(y_values) + 1.0)
|
||||
for spine in ax.spines.values():
|
||||
spine.set_visible(False)
|
||||
|
||||
ensure_parent_dir(destination_path)
|
||||
fig.tight_layout()
|
||||
fig.savefig(destination_path, dpi=200)
|
||||
plt.close(fig)
|
||||
110
lib/plots/parts_per_set.py
Normal file
110
lib/plots/parts_per_set.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Graphiques sur la taille moyenne des sets (pièces par set)."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.milestones import load_milestones
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
def compute_average_parts_per_set(rows: Iterable[dict]) -> List[Tuple[int, float]]:
|
||||
"""Calcule la moyenne annuelle de pièces par set."""
|
||||
per_year: Dict[int, Dict[str, int]] = {}
|
||||
for row in rows:
|
||||
year = int(row["year"])
|
||||
per_year[year] = per_year.get(year, {"parts": 0, "sets": 0})
|
||||
per_year[year]["parts"] += int(row["num_parts"])
|
||||
per_year[year]["sets"] += 1
|
||||
results: List[Tuple[int, float]] = []
|
||||
for year in sorted(per_year):
|
||||
totals = per_year[year]
|
||||
results.append((year, totals["parts"] / totals["sets"]))
|
||||
return results
|
||||
|
||||
|
||||
def compute_rolling_mean(series: List[Tuple[int, float]], window: int) -> List[Tuple[int, float]]:
|
||||
"""Calcule la moyenne glissante sur une fenêtre donnée."""
|
||||
values = [value for _, value in series]
|
||||
years = [year for year, _ in series]
|
||||
rolling: List[Tuple[int, float]] = []
|
||||
for index in range(len(values)):
|
||||
if index + 1 < window:
|
||||
rolling.append((years[index], 0.0))
|
||||
else:
|
||||
window_values = values[index - window + 1 : index + 1]
|
||||
rolling.append((years[index], sum(window_values) / window))
|
||||
return rolling
|
||||
|
||||
|
||||
def plot_parts_per_set(
|
||||
enriched_sets_path: Path,
|
||||
milestones_path: Path,
|
||||
destination_path: Path,
|
||||
rolling_window: int = 3,
|
||||
) -> None:
|
||||
"""Génère un graphique de la moyenne annuelle et glissante des pièces par set."""
|
||||
sets_rows = read_rows(enriched_sets_path)
|
||||
milestones = load_milestones(milestones_path)
|
||||
annual_series = compute_average_parts_per_set(sets_rows)
|
||||
rolling_series = compute_rolling_mean(annual_series, rolling_window)
|
||||
years = [year for year, _ in annual_series]
|
||||
annual_values = [value for _, value in annual_series]
|
||||
rolling_values = [value for _, value in rolling_series]
|
||||
|
||||
fig, ax = plt.subplots(figsize=(12, 6))
|
||||
ax.plot(years, annual_values, marker="o", color="#2ca02c", label="Moyenne annuelle (pièces/set)")
|
||||
ax.plot(
|
||||
years,
|
||||
rolling_values,
|
||||
marker="^",
|
||||
color="#9467bd",
|
||||
label=f"Moyenne glissante {rolling_window} ans (pièces/set)",
|
||||
)
|
||||
ax.set_xlabel("Année")
|
||||
ax.set_ylabel("Pièces par set")
|
||||
ax.set_title("Évolution de la taille moyenne des sets (thèmes filtrés)")
|
||||
ax.grid(True, linestyle="--", alpha=0.3)
|
||||
ax.set_xlim(min(years) - 0.4, max(years) + 0.4)
|
||||
ax.set_xticks(list(range(min(years), max(years) + 1)))
|
||||
ax.tick_params(axis="x", labelrotation=45)
|
||||
|
||||
peak = max(max(annual_values), max(rolling_values))
|
||||
top_limit = peak * 2
|
||||
milestones_in_range = sorted(
|
||||
[m for m in milestones if min(years) <= m["year"] <= max(years)],
|
||||
key=lambda m: (m["year"], m["description"]),
|
||||
)
|
||||
milestone_offsets: Dict[int, int] = {}
|
||||
offset_step = 0.4
|
||||
max_offset = 0
|
||||
for milestone in milestones_in_range:
|
||||
year = milestone["year"]
|
||||
count_for_year = milestone_offsets.get(year, 0)
|
||||
milestone_offsets[year] = count_for_year + 1
|
||||
horizontal_offset = offset_step * (count_for_year // 2 + 1)
|
||||
max_offset = max(max_offset, count_for_year)
|
||||
if count_for_year % 2 == 1:
|
||||
horizontal_offset *= -1
|
||||
text_x = year + horizontal_offset
|
||||
ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65)
|
||||
ax.text(
|
||||
text_x,
|
||||
top_limit,
|
||||
milestone["description"],
|
||||
rotation=90,
|
||||
verticalalignment="top",
|
||||
horizontalalignment="center",
|
||||
fontsize=8,
|
||||
color="#d62728",
|
||||
)
|
||||
|
||||
ax.set_ylim(0, top_limit * (1 + max_offset * 0.02))
|
||||
ax.legend(loc="upper left", bbox_to_anchor=(1.12, 1))
|
||||
|
||||
ensure_parent_dir(destination_path)
|
||||
fig.tight_layout()
|
||||
fig.savefig(destination_path, dpi=150)
|
||||
plt.close(fig)
|
||||
196
lib/plots/sets_per_year.py
Normal file
196
lib/plots/sets_per_year.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""Graphiques montrant le nombre de sets sortis par année."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.milestones import load_milestones
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
def compute_sets_per_year(rows: Iterable[dict]) -> List[Tuple[int, int]]:
|
||||
"""Retourne la liste (année, nombre de sets) triée chronologiquement."""
|
||||
counts: Dict[int, int] = {}
|
||||
for row in rows:
|
||||
year = int(row["year"])
|
||||
counts[year] = counts.get(year, 0) + 1
|
||||
return sorted(counts.items(), key=lambda item: item[0])
|
||||
|
||||
|
||||
def compute_parts_per_year(rows: Iterable[dict]) -> List[Tuple[int, int]]:
|
||||
"""Retourne la liste (année, total de pièces) triée chronologiquement."""
|
||||
totals: Dict[int, int] = {}
|
||||
for row in rows:
|
||||
year = int(row["year"])
|
||||
totals[year] = totals.get(year, 0) + int(row["num_parts"])
|
||||
return sorted(totals.items(), key=lambda item: item[0])
|
||||
|
||||
|
||||
def plot_sets_per_year(
|
||||
enriched_sets_path: Path,
|
||||
milestones_path: Path,
|
||||
destination_path: Path,
|
||||
) -> None:
|
||||
"""Génère un histogramme annuel avec la moyenne cumulative et les jalons."""
|
||||
sets_rows = read_rows(enriched_sets_path)
|
||||
milestones = load_milestones(milestones_path)
|
||||
raw_series = compute_sets_per_year(sets_rows)
|
||||
raw_parts_series = compute_parts_per_year(sets_rows)
|
||||
min_year = min(year for year, _ in raw_series)
|
||||
max_year = max(year for year, _ in raw_series)
|
||||
series = [(year, dict(raw_series).get(year, 0)) for year in range(min_year, max_year + 1)]
|
||||
parts_series = [(year, dict(raw_parts_series).get(year, 0)) for year in range(min_year, max_year + 1)]
|
||||
years = [year for year, _ in series]
|
||||
counts = [count for _, count in series]
|
||||
parts_totals = [total for _, total in parts_series]
|
||||
owned_counts_map: Dict[int, int] = {}
|
||||
owned_parts_map: Dict[int, int] = {}
|
||||
for row in sets_rows:
|
||||
year = int(row["year"])
|
||||
if row["in_collection"] == "true":
|
||||
owned_counts_map[year] = owned_counts_map.get(year, 0) + 1
|
||||
owned_parts_map[year] = owned_parts_map.get(year, 0) + int(row["num_parts"])
|
||||
owned_counts = [owned_counts_map.get(year, 0) for year in years]
|
||||
missing_counts = [total - owned for total, owned in zip(counts, owned_counts)]
|
||||
owned_parts = [owned_parts_map.get(year, 0) for year in years]
|
||||
missing_parts = [total - owned for total, owned in zip(parts_totals, owned_parts)]
|
||||
first_non_zero_index = next(index for index, value in enumerate(counts) if value > 0)
|
||||
cumulative_mean = []
|
||||
total = 0
|
||||
for index, count in enumerate(counts):
|
||||
total += count
|
||||
cumulative_mean.append(total / (index + 1))
|
||||
cumulative_parts_mean = []
|
||||
rolling_sets = 0
|
||||
rolling_parts = 0
|
||||
for index, (count, parts) in enumerate(zip(counts, parts_totals)):
|
||||
rolling_sets += count
|
||||
rolling_parts += parts
|
||||
if index < first_non_zero_index:
|
||||
cumulative_parts_mean.append(0)
|
||||
else:
|
||||
cumulative_parts_mean.append(rolling_parts / rolling_sets)
|
||||
|
||||
milestones_in_range = sorted(
|
||||
[m for m in milestones if min_year <= m["year"] <= max_year],
|
||||
key=lambda m: (m["year"], m["description"]),
|
||||
)
|
||||
|
||||
fig, ax = plt.subplots(figsize=(14, 6))
|
||||
bar_width = 0.35
|
||||
x_sets = [year - bar_width / 2 for year in years]
|
||||
bars_owned_sets = ax.bar(
|
||||
x_sets,
|
||||
owned_counts,
|
||||
width=bar_width,
|
||||
color="#1f77b4",
|
||||
alpha=0.9,
|
||||
label="Sets possédés",
|
||||
zorder=2,
|
||||
)
|
||||
bars_missing_sets = ax.bar(
|
||||
x_sets,
|
||||
missing_counts,
|
||||
width=bar_width,
|
||||
bottom=owned_counts,
|
||||
color="#9ecae1",
|
||||
alpha=0.8,
|
||||
label="Sets non possédés",
|
||||
)
|
||||
set_mean_line = ax.plot(
|
||||
years,
|
||||
cumulative_mean,
|
||||
color="#ff7f0e",
|
||||
marker="o",
|
||||
label="Moyenne cumulative (sets)",
|
||||
zorder=5,
|
||||
)
|
||||
ax2 = ax.twinx()
|
||||
x_parts = [year + bar_width / 2 for year in years]
|
||||
parts_bars_owned = ax2.bar(
|
||||
x_parts,
|
||||
owned_parts,
|
||||
width=bar_width,
|
||||
color="#2ca02c",
|
||||
alpha=0.9,
|
||||
label="Pièces (sets possédés)",
|
||||
zorder=2,
|
||||
)
|
||||
parts_bars_missing = ax2.bar(
|
||||
x_parts,
|
||||
missing_parts,
|
||||
width=bar_width,
|
||||
bottom=owned_parts,
|
||||
color="#c7e9c0",
|
||||
alpha=0.85,
|
||||
label="Pièces (sets non possédés)",
|
||||
)
|
||||
parts_mean_line = ax2.plot(
|
||||
years,
|
||||
cumulative_parts_mean,
|
||||
color="#9467bd",
|
||||
marker="^",
|
||||
label="Moyenne cumulative (pièces/set)",
|
||||
zorder=6,
|
||||
)
|
||||
parts_peak = max(parts_totals + [1])
|
||||
ax2.set_ylim(0, parts_peak * 1.1)
|
||||
ax.set_xlabel("Année")
|
||||
ax.set_ylabel("Nombre de sets")
|
||||
ax2.set_ylabel("Nombre de pièces")
|
||||
ax.set_title("Nombre de sets par année (thèmes filtrés)")
|
||||
ax.grid(True, linestyle="--", alpha=0.3)
|
||||
ax.set_xlim(min_year - 1, max_year + 0.4)
|
||||
ax.set_xticks(list(range(min_year, max_year + 1)))
|
||||
ax.tick_params(axis="x", labelrotation=45)
|
||||
|
||||
peak = max(max(counts), max(cumulative_mean))
|
||||
top_limit = peak * 2
|
||||
milestone_offsets: Dict[int, int] = {}
|
||||
offset_step = 0.3
|
||||
max_offset = 0
|
||||
for milestone in milestones_in_range:
|
||||
year = milestone["year"]
|
||||
count_for_year = milestone_offsets.get(year, 0)
|
||||
milestone_offsets[year] = count_for_year + 1
|
||||
max_offset = max(max_offset, count_for_year)
|
||||
horizontal_offset = offset_step * (count_for_year // 2 + 1)
|
||||
if count_for_year % 2 == 1:
|
||||
horizontal_offset *= -1
|
||||
text_x = year + horizontal_offset
|
||||
ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65)
|
||||
ax.text(
|
||||
text_x,
|
||||
top_limit,
|
||||
milestone["description"],
|
||||
rotation=90,
|
||||
verticalalignment="top",
|
||||
horizontalalignment="center",
|
||||
fontsize=8,
|
||||
color="#d62728",
|
||||
)
|
||||
|
||||
ax.set_ylim(0, top_limit * (1 + max_offset * 0.02))
|
||||
handles = [
|
||||
bars_owned_sets,
|
||||
bars_missing_sets,
|
||||
parts_bars_owned,
|
||||
parts_bars_missing,
|
||||
set_mean_line[0],
|
||||
parts_mean_line[0],
|
||||
]
|
||||
labels = [
|
||||
"Sets possédés",
|
||||
"Sets non possédés",
|
||||
"Pièces (sets possédés)",
|
||||
"Pièces (sets non possédés)",
|
||||
"Moyenne cumulative (sets)",
|
||||
"Moyenne cumulative (pièces/set)",
|
||||
]
|
||||
ax.legend(handles, labels, loc="upper left", bbox_to_anchor=(1.12, 1))
|
||||
ensure_parent_dir(destination_path)
|
||||
fig.tight_layout()
|
||||
fig.savefig(destination_path, dpi=150)
|
||||
plt.close(fig)
|
||||
1
lib/rebrickable/__init__.py
Normal file
1
lib/rebrickable/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Fonctionnalités liées aux données Rebrickable."""
|
||||
47
lib/rebrickable/downloader.py
Normal file
47
lib/rebrickable/downloader.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Outils de téléchargement pour les fichiers fournis par Rebrickable."""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Iterable, List
|
||||
import gzip
|
||||
import shutil
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
REBRICKABLE_BASE_URL = "https://cdn.rebrickable.com/media/downloads/"
|
||||
CHUNK_SIZE = 8192
|
||||
CACHE_TTL = 7
|
||||
|
||||
|
||||
def build_rebrickable_url(file_name: str) -> str:
|
||||
"""Construit l'URL complète d'un fichier Rebrickable à partir de son nom."""
|
||||
return f"{REBRICKABLE_BASE_URL}{file_name}"
|
||||
|
||||
|
||||
def download_rebrickable_file(file_name: str, destination_dir: Path) -> Path:
|
||||
"""Télécharge un fichier Rebrickable, le décompresse et supprime l'archive."""
|
||||
target_path = destination_dir / file_name
|
||||
destination_dir.mkdir(parents=True, exist_ok=True)
|
||||
decompressed_path = target_path.with_suffix("")
|
||||
if decompressed_path.exists():
|
||||
cache_age = datetime.now() - datetime.fromtimestamp(decompressed_path.stat().st_mtime)
|
||||
if cache_age <= timedelta(days=CACHE_TTL):
|
||||
if target_path.exists():
|
||||
target_path.unlink()
|
||||
return decompressed_path
|
||||
response = requests.get(build_rebrickable_url(file_name), stream=True)
|
||||
response.raise_for_status()
|
||||
with target_path.open("wb") as target_file:
|
||||
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
|
||||
target_file.write(chunk)
|
||||
with gzip.open(target_path, "rb") as compressed_file:
|
||||
with decompressed_path.open("wb") as decompressed_file:
|
||||
shutil.copyfileobj(compressed_file, decompressed_file)
|
||||
target_path.unlink()
|
||||
return decompressed_path
|
||||
|
||||
|
||||
def download_rebrickable_files(file_names: Iterable[str], destination_dir: Path) -> List[Path]:
|
||||
"""Télécharge en série plusieurs fichiers compressés fournis par Rebrickable."""
|
||||
return [download_rebrickable_file(file_name, destination_dir) for file_name in file_names]
|
||||
86
lib/rebrickable/enrich_sets.py
Normal file
86
lib/rebrickable/enrich_sets.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Enrichissement des sets LEGO avec des métadonnées Rebrickable et personnelles."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Set
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
|
||||
REBRICKABLE_SET_BASE_URL = "https://rebrickable.com/sets/"
|
||||
|
||||
|
||||
def extract_set_id(set_num: str) -> str:
|
||||
"""Extrait l'identifiant LEGO (partie avant la révision) depuis set_num."""
|
||||
return set_num.split("-", 1)[0]
|
||||
|
||||
|
||||
def build_rebrickable_set_url(set_num: str) -> str:
|
||||
"""Construit l'URL publique Rebrickable d'un set."""
|
||||
return f"{REBRICKABLE_SET_BASE_URL}{set_num}"
|
||||
|
||||
|
||||
def parse_set_collection_root(raw_value: str) -> Path | None:
|
||||
"""Prépare le chemin de collection, ou None si aucune collection n'est fournie."""
|
||||
cleaned = raw_value.strip()
|
||||
if not cleaned:
|
||||
print("La variable MY_SETS est vide, aucun set en collection.")
|
||||
return None
|
||||
return Path(cleaned)
|
||||
|
||||
|
||||
def load_owned_set_ids(collection_root: Path) -> Set[str]:
|
||||
"""Retourne l'ensemble des identifiants de sets présents dans un dossier de collection."""
|
||||
if not collection_root.exists():
|
||||
print(f"Le dossier {collection_root} n'existe pas, aucun set en collection.")
|
||||
return set()
|
||||
if not collection_root.is_dir():
|
||||
print(f"Le chemin {collection_root} n'est pas un dossier, aucun set en collection.")
|
||||
return set()
|
||||
entries = [path for path in collection_root.iterdir() if path.is_dir()]
|
||||
if not entries:
|
||||
print(f"Le dossier {collection_root} est vide, aucun set en collection.")
|
||||
return set()
|
||||
return {entry.name for entry in entries}
|
||||
|
||||
|
||||
def enrich_sets(
|
||||
source_path: Path,
|
||||
destination_path: Path,
|
||||
owned_set_ids: Iterable[str],
|
||||
) -> None:
|
||||
"""Ajoute les colonnes set_id, rebrickable_url et in_collection au catalogue filtré."""
|
||||
ensure_parent_dir(destination_path)
|
||||
owned_lookup = set(owned_set_ids)
|
||||
with source_path.open() as source_file, destination_path.open("w", newline="") as target_file:
|
||||
reader = csv.DictReader(source_file)
|
||||
fieldnames = reader.fieldnames + ["set_id", "rebrickable_url", "in_collection"]
|
||||
writer = csv.DictWriter(target_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in reader:
|
||||
set_id = extract_set_id(row["set_num"])
|
||||
writer.writerow(
|
||||
{
|
||||
**row,
|
||||
"set_id": set_id,
|
||||
"rebrickable_url": build_rebrickable_set_url(row["set_num"]),
|
||||
"in_collection": str(set_id in owned_lookup).lower(),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def write_missing_sets_markdown(enriched_path: Path, destination_path: Path) -> None:
|
||||
"""Génère un tableau Markdown listant les sets non possédés."""
|
||||
with enriched_path.open() as source_file:
|
||||
reader = csv.DictReader(source_file)
|
||||
rows = [
|
||||
row
|
||||
for row in reader
|
||||
if row["in_collection"] == "false"
|
||||
]
|
||||
ensure_parent_dir(destination_path)
|
||||
with destination_path.open("w") as target_file:
|
||||
target_file.write("| set_id | year | name |\n")
|
||||
target_file.write("| --- | --- | --- |\n")
|
||||
for row in rows:
|
||||
link = f"[{row['set_id']}]({row['rebrickable_url']})"
|
||||
target_file.write(f"| {link} | {row['year']} | {row['name']} |\n")
|
||||
41
lib/rebrickable/filter_sets.py
Normal file
41
lib/rebrickable/filter_sets.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""Filtrage des sets LEGO par identifiants de thèmes Rebrickable."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
|
||||
def parse_theme_ids(raw_value: str) -> List[str]:
|
||||
"""Extrait les identifiants de thèmes depuis une chaîne séparée par des virgules."""
|
||||
values = [value.strip() for value in raw_value.split(",") if value.strip()]
|
||||
if not values:
|
||||
raise ValueError("Au moins un identifiant de thème est requis.")
|
||||
return values
|
||||
|
||||
|
||||
def filter_sets_by_theme(
|
||||
source_path: Path,
|
||||
destination_path: Path,
|
||||
theme_ids: Iterable[str],
|
||||
overrides_path: Path,
|
||||
) -> None:
|
||||
"""Filtre le catalogue des sets en conservant uniquement les thèmes ciblés avec pièces."""
|
||||
ensure_parent_dir(destination_path)
|
||||
allowed_ids = set(theme_ids)
|
||||
overrides = load_num_parts_overrides(overrides_path)
|
||||
with source_path.open() as source_file, destination_path.open("w", newline="") as target_file:
|
||||
reader = csv.DictReader(source_file)
|
||||
writer = csv.DictWriter(target_file, fieldnames=reader.fieldnames)
|
||||
writer.writeheader()
|
||||
for row in reader:
|
||||
if row["theme_id"] in allowed_ids and int(row["num_parts"]) > 0:
|
||||
override = overrides.get(row["set_num"])
|
||||
writer.writerow({**row, "num_parts": override if override is not None else row["num_parts"]})
|
||||
|
||||
|
||||
def load_num_parts_overrides(overrides_path: Path) -> Dict[str, str]:
|
||||
"""Charge les corrections de nombre de pièces par set."""
|
||||
with overrides_path.open() as overrides_file:
|
||||
reader = csv.DictReader(overrides_file)
|
||||
return {row["set_num"]: row["num_parts"] for row in reader}
|
||||
107
lib/rebrickable/inventory_reconciliation.py
Normal file
107
lib/rebrickable/inventory_reconciliation.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Rapport des écarts entre catalogue et inventaire agrégé."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
|
||||
def load_sets(sets_path: Path) -> List[dict]:
|
||||
"""Charge les sets filtrés pour l'analyse."""
|
||||
with sets_path.open() as sets_file:
|
||||
reader = csv.DictReader(sets_file)
|
||||
return list(reader)
|
||||
|
||||
|
||||
def index_sets_by_num(sets: Iterable[dict]) -> Dict[str, dict]:
|
||||
"""Crée un index des sets par numéro complet."""
|
||||
return {row["set_num"]: row for row in sets}
|
||||
|
||||
|
||||
def compute_inventory_totals(parts_path: Path, include_spares: bool) -> Dict[str, int]:
|
||||
"""Calcule le total de pièces par set, avec ou sans rechanges."""
|
||||
totals: Dict[str, int] = {}
|
||||
with parts_path.open() as parts_file:
|
||||
reader = csv.DictReader(parts_file)
|
||||
for row in reader:
|
||||
if not include_spares and row["is_spare"] == "true":
|
||||
continue
|
||||
set_num = row["set_num"]
|
||||
totals[set_num] = totals.get(set_num, 0) + int(row["quantity_in_set"])
|
||||
return totals
|
||||
|
||||
|
||||
def compute_inventory_gaps(sets_path: Path, parts_path: Path) -> List[dict]:
|
||||
"""Liste les sets dont le total de pièces diffère du catalogue."""
|
||||
sets = load_sets(sets_path)
|
||||
totals_with_spares = compute_inventory_totals(parts_path, include_spares=True)
|
||||
totals_without_spares = compute_inventory_totals(parts_path, include_spares=False)
|
||||
gaps: List[dict] = []
|
||||
for set_row in sets:
|
||||
expected_parts = int(set_row["num_parts"])
|
||||
inventory_parts_with_spares = totals_with_spares[set_row["set_num"]]
|
||||
inventory_parts_non_spare = totals_without_spares[set_row["set_num"]]
|
||||
if expected_parts != inventory_parts_with_spares:
|
||||
gaps.append(
|
||||
{
|
||||
"set_num": set_row["set_num"],
|
||||
"set_id": set_row["set_id"],
|
||||
"expected_parts": expected_parts,
|
||||
"inventory_parts": inventory_parts_with_spares,
|
||||
"inventory_parts_non_spare": inventory_parts_non_spare,
|
||||
"delta": abs(expected_parts - inventory_parts_with_spares),
|
||||
"delta_non_spare": abs(expected_parts - inventory_parts_non_spare),
|
||||
"in_collection": set_row["in_collection"],
|
||||
}
|
||||
)
|
||||
return gaps
|
||||
|
||||
|
||||
def write_inventory_gaps_csv(destination_path: Path, gaps: Iterable[dict]) -> None:
|
||||
"""Écrit un CSV listant les sets en écart d'inventaire."""
|
||||
ensure_parent_dir(destination_path)
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
fieldnames = [
|
||||
"set_num",
|
||||
"set_id",
|
||||
"expected_parts",
|
||||
"inventory_parts",
|
||||
"inventory_parts_non_spare",
|
||||
"delta",
|
||||
"delta_non_spare",
|
||||
"in_collection",
|
||||
]
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in gaps:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def build_instructions_url(set_id: str) -> str:
|
||||
"""Construit un lien direct vers la page d'instructions LEGO du set."""
|
||||
return f"https://www.lego.com/service/buildinginstructions/{set_id}"
|
||||
|
||||
|
||||
def write_inventory_gaps_markdown(
|
||||
destination_path: Path,
|
||||
gaps: Iterable[dict],
|
||||
sets_by_num: Dict[str, dict],
|
||||
) -> None:
|
||||
"""Génère un tableau Markdown listant les sets en écart d'inventaire."""
|
||||
ensure_parent_dir(destination_path)
|
||||
with destination_path.open("w") as markdown_file:
|
||||
markdown_file.write(
|
||||
"| set_id | name | year | delta (spares inclus) | delta (spares exclus) | expected_parts | inventory_parts | inventory_parts_non_spare | in_collection | instructions |\n"
|
||||
)
|
||||
markdown_file.write("| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |\n")
|
||||
for row in gaps:
|
||||
if row["delta_non_spare"] == 0:
|
||||
continue
|
||||
set_row = sets_by_num[row["set_num"]]
|
||||
set_link = f"[{row['set_id']}]({set_row['rebrickable_url']})"
|
||||
instructions_link = f"[PDF]({build_instructions_url(row['set_id'])})"
|
||||
markdown_file.write(
|
||||
f"| {set_link} | {set_row['name']} | {set_row['year']} | {row['delta']} | {row['delta_non_spare']} | "
|
||||
f"{row['expected_parts']} | {row['inventory_parts']} | {row['inventory_parts_non_spare']} | "
|
||||
f"{row['in_collection']} | {instructions_link} |\n"
|
||||
)
|
||||
143
lib/rebrickable/parts_inventory.py
Normal file
143
lib/rebrickable/parts_inventory.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Construction d'un inventaire détaillé des pièces par set."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
|
||||
|
||||
def normalize_boolean(raw_value: str) -> str:
|
||||
"""Normalise une valeur booléenne en chaîne lowercase."""
|
||||
return raw_value.lower()
|
||||
|
||||
|
||||
def select_latest_inventories(inventories_path: Path) -> Dict[str, dict]:
|
||||
"""Retient pour chaque set l'inventaire avec la version la plus élevée."""
|
||||
latest_inventories: Dict[str, dict] = {}
|
||||
with inventories_path.open() as inventories_file:
|
||||
reader = csv.DictReader(inventories_file)
|
||||
for row in reader:
|
||||
current = latest_inventories.get(row["set_num"])
|
||||
if current is None or int(row["version"]) > int(current["version"]):
|
||||
latest_inventories[row["set_num"]] = {"id": row["id"], "version": row["version"]}
|
||||
return latest_inventories
|
||||
|
||||
|
||||
def build_color_lookup(colors_path: Path) -> Dict[str, dict]:
|
||||
"""Construit un index des couleurs par identifiant."""
|
||||
colors: Dict[str, dict] = {}
|
||||
with colors_path.open() as colors_file:
|
||||
reader = csv.DictReader(colors_file)
|
||||
for row in reader:
|
||||
colors[row["id"]] = {
|
||||
"rgb": row["rgb"],
|
||||
"is_translucent": normalize_boolean(row["is_trans"]),
|
||||
}
|
||||
return colors
|
||||
|
||||
|
||||
def index_inventory_parts_by_inventory(inventory_parts_path: Path) -> Dict[str, List[dict]]:
|
||||
"""Indexe les lignes d'inventaire par identifiant d'inventaire."""
|
||||
parts_by_inventory: Dict[str, List[dict]] = {}
|
||||
with inventory_parts_path.open() as parts_file:
|
||||
reader = csv.DictReader(parts_file)
|
||||
for row in reader:
|
||||
inventory_id = row["inventory_id"]
|
||||
if inventory_id not in parts_by_inventory:
|
||||
parts_by_inventory[inventory_id] = []
|
||||
parts_by_inventory[inventory_id].append(row)
|
||||
return parts_by_inventory
|
||||
|
||||
|
||||
def index_inventory_minifigs_by_inventory(inventory_minifigs_path: Path) -> Dict[str, List[dict]]:
|
||||
"""Indexe les minifigs par inventaire."""
|
||||
minifigs_by_inventory: Dict[str, List[dict]] = {}
|
||||
with inventory_minifigs_path.open() as minifigs_file:
|
||||
reader = csv.DictReader(minifigs_file)
|
||||
for row in reader:
|
||||
inventory_id = row["inventory_id"]
|
||||
if inventory_id not in minifigs_by_inventory:
|
||||
minifigs_by_inventory[inventory_id] = []
|
||||
minifigs_by_inventory[inventory_id].append(row)
|
||||
return minifigs_by_inventory
|
||||
|
||||
|
||||
def build_minifig_lookup(minifigs_path: Path) -> Dict[str, dict]:
|
||||
"""Construit un index des minifigs avec leur nombre de pièces."""
|
||||
minifigs: Dict[str, dict] = {}
|
||||
with minifigs_path.open() as minifigs_file:
|
||||
reader = csv.DictReader(minifigs_file)
|
||||
for row in reader:
|
||||
minifigs[row["fig_num"]] = row
|
||||
return minifigs
|
||||
|
||||
|
||||
def write_parts_filtered(
|
||||
sets_path: Path,
|
||||
inventories_path: Path,
|
||||
inventory_parts_path: Path,
|
||||
colors_path: Path,
|
||||
inventory_minifigs_path: Path,
|
||||
minifigs_path: Path,
|
||||
destination_path: Path,
|
||||
) -> None:
|
||||
"""Assemble un CSV agrégé listant les pièces par set et par couleur."""
|
||||
latest_inventories = select_latest_inventories(inventories_path)
|
||||
parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path)
|
||||
minifigs_by_inventory = index_inventory_minifigs_by_inventory(inventory_minifigs_path)
|
||||
minifigs = build_minifig_lookup(minifigs_path)
|
||||
colors = build_color_lookup(colors_path)
|
||||
ensure_parent_dir(destination_path)
|
||||
with sets_path.open() as sets_file, destination_path.open("w", newline="") as target_file:
|
||||
sets_reader = csv.DictReader(sets_file)
|
||||
fieldnames = [
|
||||
"part_num",
|
||||
"color_rgb",
|
||||
"is_translucent",
|
||||
"set_num",
|
||||
"set_id",
|
||||
"quantity_in_set",
|
||||
"is_spare",
|
||||
]
|
||||
writer = csv.DictWriter(target_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for set_row in sets_reader:
|
||||
inventory = latest_inventories[set_row["set_num"]]
|
||||
inventory_parts = parts_by_inventory[inventory["id"]]
|
||||
inventory_total_non_spare = sum(
|
||||
int(part_row["quantity"])
|
||||
for part_row in inventory_parts
|
||||
if normalize_boolean(part_row["is_spare"]) == "false"
|
||||
)
|
||||
expected_parts = int(set_row["num_parts"])
|
||||
for part_row in inventory_parts:
|
||||
color = colors[part_row["color_id"]]
|
||||
writer.writerow(
|
||||
{
|
||||
"part_num": part_row["part_num"],
|
||||
"color_rgb": color["rgb"],
|
||||
"is_translucent": color["is_translucent"],
|
||||
"set_num": set_row["set_num"],
|
||||
"set_id": set_row["set_id"],
|
||||
"quantity_in_set": part_row["quantity"],
|
||||
"is_spare": normalize_boolean(part_row["is_spare"]),
|
||||
}
|
||||
)
|
||||
if inventory_total_non_spare < expected_parts:
|
||||
for minifig_row in minifigs_by_inventory.get(inventory["id"], []):
|
||||
minifig_inventory = latest_inventories[minifig_row["fig_num"]]
|
||||
minifig_parts = parts_by_inventory[minifig_inventory["id"]]
|
||||
for part_row in minifig_parts:
|
||||
color = colors[part_row["color_id"]]
|
||||
writer.writerow(
|
||||
{
|
||||
"part_num": part_row["part_num"],
|
||||
"color_rgb": color["rgb"],
|
||||
"is_translucent": color["is_translucent"],
|
||||
"set_num": set_row["set_num"],
|
||||
"set_id": set_row["set_id"],
|
||||
"quantity_in_set": str(int(part_row["quantity"]) * int(minifig_row["quantity"])),
|
||||
"is_spare": normalize_boolean(part_row["is_spare"]),
|
||||
}
|
||||
)
|
||||
101
lib/rebrickable/parts_stats.py
Normal file
101
lib/rebrickable/parts_stats.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Calculs de statistiques simples sur les pièces filtrées."""
|
||||
|
||||
import csv
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Sequence, Tuple
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.rebrickable.inventory_reconciliation import compute_inventory_gaps
|
||||
from lib.rebrickable.stats import read_rows as read_stats_rows
|
||||
|
||||
|
||||
def read_rows(path: Path) -> List[dict]:
|
||||
"""Charge un fichier CSV en mémoire sous forme de dictionnaires."""
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
return list(reader)
|
||||
|
||||
|
||||
def select_non_spare_parts(rows: Iterable[dict]) -> List[dict]:
|
||||
"""Filtre les pièces en excluant les rechanges."""
|
||||
return [row for row in rows if row["is_spare"] == "false"]
|
||||
|
||||
|
||||
def variation_key(row: dict) -> Tuple[str, str, str]:
|
||||
"""Clé d'unicité pour une variation de pièce (référence + couleur)."""
|
||||
return (row["part_num"], row["color_rgb"], row["is_translucent"])
|
||||
|
||||
|
||||
def color_key(row: dict) -> Tuple[str, str]:
|
||||
"""Clé d'unicité pour une couleur."""
|
||||
return (row["color_rgb"], row["is_translucent"])
|
||||
|
||||
|
||||
def aggregate_quantities_by_variation(rows: Iterable[dict]) -> Dict[Tuple[str, str, str], int]:
|
||||
"""Calcule la quantité totale par variation de pièce (hors rechanges)."""
|
||||
quantities: Dict[Tuple[str, str, str], int] = defaultdict(int)
|
||||
for row in rows:
|
||||
quantities[variation_key(row)] += int(row["quantity_in_set"])
|
||||
return quantities
|
||||
|
||||
|
||||
def read_total_filtered_parts(stats_path: Path) -> int:
|
||||
"""Lit le total de pièces attendu pour les thèmes filtrés depuis stats.csv."""
|
||||
rows = read_stats_rows(stats_path)
|
||||
return int(
|
||||
next(row["valeur"] for row in rows if row["libelle"] == "Total de pièces pour les thèmes filtrés")
|
||||
)
|
||||
|
||||
|
||||
def build_stats(
|
||||
rows: Iterable[dict],
|
||||
sets_path: Path,
|
||||
parts_path: Path,
|
||||
stats_path: Path,
|
||||
) -> List[Tuple[str, str]]:
|
||||
"""Construit les statistiques principales sur les pièces filtrées et les écarts d'inventaire."""
|
||||
non_spares = select_non_spare_parts(rows)
|
||||
quantities = aggregate_quantities_by_variation(non_spares)
|
||||
total_variations = len(quantities)
|
||||
color_set = {color_key(row) for row in non_spares}
|
||||
least_used_key = min(quantities, key=quantities.get)
|
||||
most_used_key = max(quantities, key=quantities.get)
|
||||
least_used = quantities[least_used_key]
|
||||
most_used = quantities[most_used_key]
|
||||
total_non_spare = sum(quantities.values())
|
||||
gaps = compute_inventory_gaps(sets_path, parts_path)
|
||||
gap_count = len(gaps)
|
||||
worst_gap = max(gaps, key=lambda gap: gap["delta"]) if gap_count > 0 else {"set_id": "none", "delta": 0}
|
||||
catalog_total_parts = read_total_filtered_parts(stats_path)
|
||||
catalog_inventory_delta = catalog_total_parts - total_non_spare
|
||||
|
||||
return [
|
||||
("Total de variations de pièces (hors rechanges)", str(total_variations)),
|
||||
(
|
||||
"Pièce la moins utilisée (référence + couleur)",
|
||||
f"{least_used_key[0]} / {least_used_key[1]} / {least_used_key[2]} ({least_used})",
|
||||
),
|
||||
(
|
||||
"Pièce la plus commune (référence + couleur)",
|
||||
f"{most_used_key[0]} / {most_used_key[1]} / {most_used_key[2]} ({most_used})",
|
||||
),
|
||||
("Total de couleurs utilisées (hors rechanges)", str(len(color_set))),
|
||||
("Total de pièces hors rechanges", str(total_non_spare)),
|
||||
(
|
||||
"Ecart total catalogue (stats) - inventaire (hors rechanges)",
|
||||
str(catalog_inventory_delta),
|
||||
),
|
||||
("Nombre de sets en écart inventaire/catalogue", str(gap_count)),
|
||||
("Ecart maximal inventaire/catalogue", f"{worst_gap['set_id']} ({worst_gap['delta']})"),
|
||||
]
|
||||
|
||||
|
||||
def write_parts_stats(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None:
|
||||
"""Écrit les statistiques dans un CSV à deux colonnes."""
|
||||
ensure_parent_dir(destination_path)
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
writer = csv.writer(csv_file)
|
||||
writer.writerow(["libelle", "valeur"])
|
||||
for label, value in stats:
|
||||
writer.writerow([label, value])
|
||||
122
lib/rebrickable/stats.py
Normal file
122
lib/rebrickable/stats.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Calcul des statistiques de base sur les sets LEGO filtrés."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Iterable, List, Sequence, Tuple
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
|
||||
def read_rows(path: Path) -> List[dict]:
|
||||
"""Charge un fichier CSV en mémoire sous forme de dictionnaires."""
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
return list(reader)
|
||||
|
||||
|
||||
def write_stats_csv(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None:
|
||||
"""Écrit les statistiques dans un CSV à deux colonnes."""
|
||||
ensure_parent_dir(destination_path)
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
writer = csv.writer(csv_file)
|
||||
writer.writerow(["libelle", "valeur"])
|
||||
for label, value in stats:
|
||||
writer.writerow([label, value])
|
||||
|
||||
|
||||
def compute_median(values: List[int]) -> float:
|
||||
"""Calcule la médiane d'une liste de valeurs entières."""
|
||||
sorted_values = sorted(values)
|
||||
middle = len(sorted_values) // 2
|
||||
if len(sorted_values) % 2 == 1:
|
||||
return float(sorted_values[middle])
|
||||
return (sorted_values[middle - 1] + sorted_values[middle]) / 2
|
||||
|
||||
|
||||
def compute_basic_stats(
|
||||
themes: Iterable[dict],
|
||||
all_sets: Iterable[dict],
|
||||
filtered_sets: Iterable[dict],
|
||||
enriched_sets: Iterable[dict],
|
||||
) -> List[Tuple[str, str]]:
|
||||
"""Calcule les statistiques principales à partir des sets chargés."""
|
||||
themes_list = list(themes)
|
||||
all_sets_list = list(all_sets)
|
||||
filtered_sets_list = list(filtered_sets)
|
||||
enriched_sets_list = list(enriched_sets)
|
||||
|
||||
theme_count_total = len(themes_list)
|
||||
total_sets = len(all_sets_list)
|
||||
filtered_sets_count = len(filtered_sets_list)
|
||||
avg_sets_per_theme = total_sets / theme_count_total
|
||||
percent_filtered = (filtered_sets_count / total_sets) * 100
|
||||
owned_sets_count = sum(1 for row in enriched_sets_list if row["in_collection"] == "true")
|
||||
missing_sets_count = sum(1 for row in enriched_sets_list if row["in_collection"] == "false")
|
||||
percent_owned = (owned_sets_count / filtered_sets_count) * 100
|
||||
parts_per_set = [int(row["num_parts"]) for row in filtered_sets_list]
|
||||
avg_parts_per_set = sum(parts_per_set) / filtered_sets_count
|
||||
median_parts_per_set = compute_median(parts_per_set)
|
||||
years = [int(row["year"]) for row in filtered_sets_list]
|
||||
avg_sets_per_year = filtered_sets_count / len(set(years))
|
||||
total_parts = sum(parts_per_set)
|
||||
theme_ids_filtered = {row["theme_id"] for row in filtered_sets_list}
|
||||
min_year = str(min(years))
|
||||
max_year = str(max(years))
|
||||
year_counts = {}
|
||||
for year in years:
|
||||
year_counts[year] = year_counts.get(year, 0) + 1
|
||||
prolific_year, prolific_count = max(year_counts.items(), key=lambda item: (item[1], -item[0]))
|
||||
richest_set = max(filtered_sets_list, key=lambda row: int(row["num_parts"]))
|
||||
lightest_set = min(filtered_sets_list, key=lambda row: int(row["num_parts"]))
|
||||
oldest_set = min(filtered_sets_list, key=lambda row: (int(row["year"]), row["set_num"]))
|
||||
latest_set = max(filtered_sets_list, key=lambda row: (int(row["year"]), row["set_num"]))
|
||||
owned_parts = [int(row["num_parts"]) for row in enriched_sets_list if row["in_collection"] == "true"]
|
||||
missing_parts = [int(row["num_parts"]) for row in enriched_sets_list if row["in_collection"] == "false"]
|
||||
avg_parts_owned = sum(owned_parts) / len(owned_parts)
|
||||
avg_parts_missing = sum(missing_parts) / len(missing_parts)
|
||||
total_parts_owned = sum(owned_parts)
|
||||
percent_parts_owned = (total_parts_owned / total_parts) * 100
|
||||
|
||||
return [
|
||||
("Nombre total de sets (catalogue complet)", str(total_sets)),
|
||||
("Nombre total de thèmes (catalogue complet)", str(theme_count_total)),
|
||||
("Nombre de sets après filtrage (thèmes ciblés)", str(filtered_sets_count)),
|
||||
("Nombre moyen de sets par thème (catalogue complet)", f"{avg_sets_per_theme:.2f}"),
|
||||
("Pourcentage des sets filtrés vs total", f"{percent_filtered:.2f}%"),
|
||||
("Taux de possession (thèmes filtrés)", f"{percent_owned:.2f}%"),
|
||||
("Sets dans la collection", str(owned_sets_count)),
|
||||
("Sets manquants pour la collection", str(missing_sets_count)),
|
||||
("Nombre moyen de pièces par set (thèmes filtrés)", f"{avg_parts_per_set:.2f}"),
|
||||
("Médiane de pièces par set (thèmes filtrés)", f"{median_parts_per_set:.2f}"),
|
||||
("Nombre moyen de sets commercialisés par an (thèmes filtrés)", f"{avg_sets_per_year:.2f}"),
|
||||
("Total de pièces pour les thèmes filtrés", str(total_parts)),
|
||||
("Total de pièces des sets possédés", str(total_parts_owned)),
|
||||
("Pourcentage de pièces possédées (thèmes filtrés)", f"{percent_parts_owned:.2f}%"),
|
||||
("Nombre de thèmes filtrés", str(len(theme_ids_filtered))),
|
||||
("Première année de sortie (thèmes filtrés)", min_year),
|
||||
("Dernière année de sortie (thèmes filtrés)", max_year),
|
||||
("Année la plus prolifique (thèmes filtrés)", f"{prolific_year} ({prolific_count} sets)"),
|
||||
(
|
||||
"Set avec le plus de pièces (thèmes filtrés)",
|
||||
f"{richest_set['set_num']} - {richest_set['name']} ({richest_set['num_parts']} pièces)",
|
||||
),
|
||||
(
|
||||
"Set avec le moins de pièces (thèmes filtrés)",
|
||||
f"{lightest_set['set_num']} - {lightest_set['name']} ({lightest_set['num_parts']} pièces)",
|
||||
),
|
||||
(
|
||||
"Set le plus ancien (thèmes filtrés)",
|
||||
f"{oldest_set['set_num']} - {oldest_set['name']} ({oldest_set['year']})",
|
||||
),
|
||||
(
|
||||
"Set le plus récent (thèmes filtrés)",
|
||||
f"{latest_set['set_num']} - {latest_set['name']} ({latest_set['year']})",
|
||||
),
|
||||
(
|
||||
"Nombre moyen de pièces des sets possédés",
|
||||
f"{avg_parts_owned:.2f}",
|
||||
),
|
||||
(
|
||||
"Nombre moyen de pièces des sets manquants",
|
||||
f"{avg_parts_missing:.2f}",
|
||||
),
|
||||
]
|
||||
Reference in New Issue
Block a user