1

Ajoute l’analyse des catégories de pièces

This commit is contained in:
Richard Dern 2025-12-02 17:07:41 +01:00
parent d067e2075f
commit 9d1f2c3089
7 changed files with 632 additions and 0 deletions

View File

@ -298,3 +298,17 @@ Le calcul lit `data/intermediate/colors_by_set.csv` et `data/intermediate/sets_e
- `data/intermediate/color_richness_by_year.csv` : agrégat annuel (moyenne, médiane, bornes de diversité et concentration). - `data/intermediate/color_richness_by_year.csv` : agrégat annuel (moyenne, médiane, bornes de diversité et concentration).
Les graphiques `figures/step28/color_richness_boxplot.png`, `figures/step28/color_richness_top_sets.png` et `figures/step28/color_concentration_scatter.png` montrent respectivement la répartition annuelle, le top des sets les plus colorés et la concentration des palettes (part des 3 couleurs dominantes vs nombre de couleurs). Les graphiques `figures/step28/color_richness_boxplot.png`, `figures/step28/color_richness_top_sets.png` et `figures/step28/color_concentration_scatter.png` montrent respectivement la répartition annuelle, le top des sets les plus colorés et la concentration des palettes (part des 3 couleurs dominantes vs nombre de couleurs).
### Étape 29 : répartition par catégories de pièces (structure vs esthétique)
1. `source .venv/bin/activate`
2. `python -m scripts.compute_part_categories`
3. `python -m scripts.plot_part_categories`
Le calcul lit `data/intermediate/parts_filtered.csv`, `data/raw/parts.csv`, `data/raw/part_categories.csv` et `data/intermediate/sets_enriched.csv` pour mesurer la part de chaque catégorie de pièce (rechanges exclues), marquer celles considérées comme structurelles/technic (liste `IGNORED_PART_CATEGORY_IDS` de `lib/rebrickable/color_ignores.py`), et produire :
- `data/intermediate/part_categories_by_set.csv` : parts par set et par catégorie, avec possession et indicateur structurel.
- `data/intermediate/part_categories_by_year.csv` : parts annuelles par catégorie.
- `data/intermediate/part_categories_global.csv` : parts globales par catégorie.
Les visuels `figures/step29/top_part_categories_area.png`, `figures/step29/part_categories_heatmap.png` et `figures/step29/structural_share_timeline.png` montrent respectivement lévolution des principales catégories (aire empilée), une heatmap exhaustive catégorie × année, et la trajectoire de la part des pièces structurelles.

View File

@ -0,0 +1,130 @@
"""Visualisations des parts par catégorie."""
from pathlib import Path
from typing import Dict, List, Sequence
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import Normalize
from matplotlib.cm import ScalarMappable
from matplotlib.patches import Patch
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.stats import read_rows
def load_rows(path: Path) -> List[dict]:
"""Charge un CSV en mémoire."""
return read_rows(path)
def extract_years(rows: Sequence[dict]) -> List[str]:
"""Récupère la liste ordonnée des années présentes."""
years = {row["year"] for row in rows}
return sorted(years, key=int)
def build_shares_by_year(rows: Sequence[dict]) -> Dict[tuple[str, str], float]:
"""Indexe share_non_spare par (year, category_id)."""
return {(row["year"], row["category_id"]): float(row["share_non_spare"]) for row in rows}
def plot_top_part_categories_area(
categories_by_year_path: Path,
categories_global_path: Path,
destination_path: Path,
top_n: int = 8,
) -> None:
"""Trace l'évolution des catégories principales en parts empilées."""
yearly_rows = load_rows(categories_by_year_path)
global_rows = load_rows(categories_global_path)
years = extract_years(yearly_rows)
top_categories = global_rows[:top_n]
labels = [row["category_name"] for row in top_categories] + ["Autres"]
shares_lookup = build_shares_by_year(yearly_rows)
series: List[List[float]] = []
for top in top_categories:
series.append([shares_lookup.get((year, top["category_id"]), 0.0) for year in years])
other_series: List[float] = []
for year in years:
year_total = sum(value for (yr, _), value in shares_lookup.items() if yr == year)
top_sum = sum(values[years.index(year)] for values in series)
other_series.append(max(0.0, year_total - top_sum))
series.append(other_series)
x = np.arange(len(years))
fig, ax = plt.subplots(figsize=(12, 7))
colors = plt.get_cmap("tab20").colors
ax.stackplot(x, series, labels=labels, colors=colors[: len(labels)], alpha=0.9, linewidth=0.6)
ax.set_xticks(x)
ax.set_xticklabels(years, rotation=45, ha="right")
ax.set_ylabel("Part des pièces (hors rechanges)")
ax.set_title("Part des principales catégories de pièces (par année)")
ax.legend(loc="upper left", frameon=False, ncol=2)
ax.grid(axis="y", linestyle="--", alpha=0.35)
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=170)
plt.close(fig)
def plot_part_categories_heatmap(categories_by_year_path: Path, destination_path: Path) -> None:
"""Heatmap des parts par catégorie et par année."""
rows = load_rows(categories_by_year_path)
years = extract_years(rows)
totals: Dict[str, int] = {}
for row in rows:
totals[row["category_id"]] = totals.get(row["category_id"], 0) + int(row["quantity_non_spare"])
categories = sorted(totals.keys(), key=lambda cat_id: -totals[cat_id])
matrix = np.zeros((len(categories), len(years)))
lookup = {(row["year"], row["category_id"]): float(row["share_non_spare"]) for row in rows}
for i, cat_id in enumerate(categories):
for j, year in enumerate(years):
matrix[i, j] = lookup.get((year, cat_id), 0.0)
fig, ax = plt.subplots(figsize=(12, 10))
cmap = plt.get_cmap("viridis")
im = ax.imshow(matrix, aspect="auto", cmap=cmap, norm=Normalize(vmin=0, vmax=matrix.max()))
ax.set_xticks(np.arange(len(years)))
ax.set_xticklabels(years, rotation=45, ha="right")
labels = {row["category_id"]: row["category_name"] for row in rows}
ax.set_yticks(np.arange(len(categories)))
ax.set_yticklabels([labels[cat_id] for cat_id in categories])
ax.set_xlabel("Année")
ax.set_ylabel("Catégorie de pièce")
ax.set_title("Part des catégories de pièces par année")
cbar = fig.colorbar(ScalarMappable(norm=im.norm, cmap=cmap), ax=ax, fraction=0.025, pad=0.015)
cbar.ax.set_ylabel("Part des pièces", rotation=90)
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=170)
plt.close(fig)
def plot_structural_share_timeline(categories_by_year_path: Path, destination_path: Path) -> None:
"""Trace l'évolution de la part des catégories structurelles."""
rows = load_rows(categories_by_year_path)
years = extract_years(rows)
structural_share: Dict[str, float] = {}
for row in rows:
if row["is_structural"] != "true":
continue
share = structural_share.get(row["year"], 0.0)
structural_share[row["year"]] = share + float(row["share_non_spare"])
x = np.arange(len(years))
y = [structural_share.get(year, 0.0) for year in years]
fig, ax = plt.subplots(figsize=(11, 6))
ax.plot(x, y, color="#d62728", linewidth=2.2)
ax.fill_between(x, y, color="#d62728", alpha=0.18)
ax.set_xticks(x)
ax.set_xticklabels(years, rotation=45, ha="right")
ax.set_ylabel("Part des pièces structurelles")
ax.set_title("Evolution de la part des pièces structurelles")
ax.grid(True, linestyle="--", alpha=0.35)
legend = [Patch(facecolor="#d62728", edgecolor="none", alpha=0.6, label="Structurel / Technic")]
ax.legend(handles=legend, loc="upper right", frameon=False)
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=170)
plt.close(fig)

View File

@ -0,0 +1,240 @@
"""Agrégation des parts par catégorie pour les sets filtrés."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Tuple
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.color_ignores import is_ignored_part_category
from lib.rebrickable.stats import read_rows
def load_parts_catalog(path: Path) -> Dict[str, dict]:
"""Indexe les pièces par référence avec leur catégorie."""
catalog: Dict[str, dict] = {}
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
catalog[row["part_num"]] = row
return catalog
def load_category_names(path: Path) -> Dict[str, str]:
"""Associe chaque catégorie à son libellé."""
names: Dict[str, str] = {}
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
names[row["id"]] = row["name"]
return names
def load_sets_enriched(path: Path) -> Dict[str, dict]:
"""Indexe les sets enrichis par numéro complet."""
sets: Dict[str, dict] = {}
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
sets[row["set_num"]] = row
return sets
def group_rows_by_set(rows: Iterable[dict]) -> Dict[str, List[dict]]:
"""Regroupe les lignes parts_filtered par set."""
grouped: Dict[str, List[dict]] = {}
for row in rows:
set_rows = grouped.get(row["set_num"])
if set_rows is None:
set_rows = []
grouped[row["set_num"]] = set_rows
set_rows.append(row)
return grouped
def build_category_totals(
grouped_parts: Dict[str, List[dict]],
parts_catalog: Dict[str, dict],
category_names: Dict[str, str],
) -> Tuple[List[dict], List[dict]]:
"""Construit les agrégats par set puis par année."""
categories_by_set: List[dict] = []
categories_by_year: Dict[Tuple[str, str], dict] = {}
totals_by_set: Dict[str, int] = {}
totals_minifig_by_set: Dict[str, int] = {}
for set_num, rows in grouped_parts.items():
total_non_spare = sum(int(row["quantity_in_set"]) for row in rows if row["is_spare"] == "false")
totals_by_set[set_num] = total_non_spare
totals_minifig_by_set[set_num] = sum(
int(row["quantity_in_set"])
for row in rows
if row["is_spare"] == "false" and row["is_minifig_part"] == "true"
)
by_category: Dict[str, dict] = {}
for row in rows:
if row["is_spare"] == "true":
continue
part = parts_catalog[row["part_num"]]
cat_id = part["part_cat_id"]
cat_name = category_names[cat_id]
entry = by_category.get(cat_id)
if entry is None:
entry = {
"category_id": cat_id,
"category_name": cat_name,
"quantity_non_spare": 0,
"quantity_minifig": 0,
"quantity_non_minifig": 0,
}
by_category[cat_id] = entry
quantity = int(row["quantity_in_set"])
entry["quantity_non_spare"] += quantity
if row["is_minifig_part"] == "true":
entry["quantity_minifig"] += quantity
else:
entry["quantity_non_minifig"] += quantity
for cat_id, entry in by_category.items():
categories_by_set.append(
{
"set_num": set_num,
"category_id": cat_id,
"category_name": entry["category_name"],
"quantity_non_spare": str(entry["quantity_non_spare"]),
"quantity_minifig": str(entry["quantity_minifig"]),
"quantity_non_minifig": str(entry["quantity_non_minifig"]),
"share_non_spare": f"{entry['quantity_non_spare'] / total_non_spare:.4f}",
}
)
year = rows[0]["year"]
key = (year, cat_id)
year_entry = categories_by_year.get(key)
if year_entry is None:
year_entry = {
"year": year,
"category_id": cat_id,
"category_name": entry["category_name"],
"quantity_non_spare": 0,
}
categories_by_year[key] = year_entry
year_entry["quantity_non_spare"] += entry["quantity_non_spare"]
categories_by_set.sort(key=lambda row: (row["set_num"], row["category_name"]))
categories_year_rows = []
totals_by_year: Dict[str, int] = {}
for (year, _), entry in categories_by_year.items():
totals_by_year[year] = totals_by_year.get(year, 0) + entry["quantity_non_spare"]
for key, entry in categories_by_year.items():
total_year = totals_by_year[key[0]]
categories_year_rows.append(
{
"year": entry["year"],
"category_id": entry["category_id"],
"category_name": entry["category_name"],
"quantity_non_spare": str(entry["quantity_non_spare"]),
"share_non_spare": f"{entry['quantity_non_spare'] / total_year:.4f}",
"is_structural": "true" if is_ignored_part_category(entry["category_id"]) else "false",
}
)
categories_year_rows.sort(key=lambda row: (int(row["year"]), row["category_name"]))
return categories_by_set, categories_year_rows
def enrich_categories_with_sets(rows: Iterable[dict], sets_lookup: Dict[str, dict]) -> List[dict]:
"""Ajoute les métadonnées de set aux agrégats par catégorie."""
enriched: List[dict] = []
for row in rows:
set_row = sets_lookup[row["set_num"]]
enriched.append(
{
"set_num": row["set_num"],
"set_id": set_row["set_id"],
"name": set_row["name"],
"year": set_row["year"],
"in_collection": set_row["in_collection"],
"category_id": row["category_id"],
"category_name": row["category_name"],
"quantity_non_spare": row["quantity_non_spare"],
"quantity_minifig": row["quantity_minifig"],
"quantity_non_minifig": row["quantity_non_minifig"],
"share_non_spare": row["share_non_spare"],
"is_structural": "true" if is_ignored_part_category(row["category_id"]) else "false",
}
)
enriched.sort(key=lambda row: (row["set_num"], row["category_name"]))
return enriched
def build_global_totals(rows: Iterable[dict]) -> List[dict]:
"""Agrège les quantités par catégorie pour l'ensemble des sets filtrés."""
totals: Dict[str, dict] = {}
grand_total = 0
for row in rows:
entry = totals.get(row["category_id"])
if entry is None:
entry = {
"category_id": row["category_id"],
"category_name": row["category_name"],
"quantity_non_spare": 0,
"is_structural": row["is_structural"],
}
totals[row["category_id"]] = entry
value = int(row["quantity_non_spare"])
entry["quantity_non_spare"] += value
grand_total += value
global_rows: List[dict] = []
for entry in totals.values():
global_rows.append(
{
"category_id": entry["category_id"],
"category_name": entry["category_name"],
"quantity_non_spare": str(entry["quantity_non_spare"]),
"share_non_spare": f"{entry['quantity_non_spare'] / grand_total:.4f}",
"is_structural": entry["is_structural"],
}
)
global_rows.sort(key=lambda row: (-int(row["quantity_non_spare"]), row["category_name"]))
return global_rows
def write_categories_by_set(destination_path: Path, rows: Sequence[dict]) -> None:
"""Écrit le CSV par set et par catégorie."""
ensure_parent_dir(destination_path)
fieldnames = [
"set_num",
"set_id",
"name",
"year",
"in_collection",
"category_id",
"category_name",
"quantity_non_spare",
"quantity_minifig",
"quantity_non_minifig",
"share_non_spare",
"is_structural",
]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
def write_categories_by_year(destination_path: Path, rows: Sequence[dict]) -> None:
"""Écrit le CSV des parts par catégorie et par année."""
ensure_parent_dir(destination_path)
fieldnames = ["year", "category_id", "category_name", "quantity_non_spare", "share_non_spare", "is_structural"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
def write_categories_global(destination_path: Path, rows: Sequence[dict]) -> None:
"""Écrit le CSV agrégé globalement."""
ensure_parent_dir(destination_path)
fieldnames = ["category_id", "category_name", "quantity_non_spare", "share_non_spare", "is_structural"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)

View File

@ -0,0 +1,50 @@
"""Calcule la répartition des pièces par catégories."""
from pathlib import Path
from lib.rebrickable.part_categories import (
build_category_totals,
build_global_totals,
enrich_categories_with_sets,
load_category_names,
load_parts_catalog,
load_sets_enriched,
write_categories_by_set,
write_categories_by_year,
write_categories_global,
)
from lib.rebrickable.stats import read_rows
PARTS_PATH = Path("data/intermediate/parts_filtered.csv")
PARTS_CATALOG_PATH = Path("data/raw/parts.csv")
PART_CATEGORIES_PATH = Path("data/raw/part_categories.csv")
SETS_ENRICHED_PATH = Path("data/intermediate/sets_enriched.csv")
CATEGORIES_BY_SET_PATH = Path("data/intermediate/part_categories_by_set.csv")
CATEGORIES_BY_YEAR_PATH = Path("data/intermediate/part_categories_by_year.csv")
CATEGORIES_GLOBAL_PATH = Path("data/intermediate/part_categories_global.csv")
def main() -> None:
"""Construit les agrégats par set, par année et globaux."""
parts_rows = read_rows(PARTS_PATH)
parts_catalog = load_parts_catalog(PARTS_CATALOG_PATH)
category_names = load_category_names(PART_CATEGORIES_PATH)
sets_lookup = load_sets_enriched(SETS_ENRICHED_PATH)
grouped_parts = {}
for row in parts_rows:
set_rows = grouped_parts.get(row["set_num"])
if set_rows is None:
set_rows = []
grouped_parts[row["set_num"]] = set_rows
set_rows.append(row)
categories_by_set_raw, categories_by_year = build_category_totals(grouped_parts, parts_catalog, category_names)
categories_by_set = enrich_categories_with_sets(categories_by_set_raw, sets_lookup)
categories_global = build_global_totals(categories_by_set)
write_categories_by_set(CATEGORIES_BY_SET_PATH, categories_by_set)
write_categories_by_year(CATEGORIES_BY_YEAR_PATH, categories_by_year)
write_categories_global(CATEGORIES_GLOBAL_PATH, categories_global)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,27 @@
"""Trace les graphiques de répartition par catégories de pièces."""
from pathlib import Path
from lib.plots.part_categories import (
plot_part_categories_heatmap,
plot_structural_share_timeline,
plot_top_part_categories_area,
)
CATEGORIES_BY_YEAR_PATH = Path("data/intermediate/part_categories_by_year.csv")
CATEGORIES_GLOBAL_PATH = Path("data/intermediate/part_categories_global.csv")
AREA_DESTINATION = Path("figures/step29/top_part_categories_area.png")
HEATMAP_DESTINATION = Path("figures/step29/part_categories_heatmap.png")
STRUCTURAL_DESTINATION = Path("figures/step29/structural_share_timeline.png")
def main() -> None:
"""Génère les visuels de répartition par catégorie."""
plot_top_part_categories_area(CATEGORIES_BY_YEAR_PATH, CATEGORIES_GLOBAL_PATH, AREA_DESTINATION)
plot_part_categories_heatmap(CATEGORIES_BY_YEAR_PATH, HEATMAP_DESTINATION)
plot_structural_share_timeline(CATEGORIES_BY_YEAR_PATH, STRUCTURAL_DESTINATION)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,126 @@
"""Tests des agrégats par catégorie de pièce."""
import csv
from pathlib import Path
from lib.rebrickable.part_categories import (
build_category_totals,
build_global_totals,
enrich_categories_with_sets,
group_rows_by_set,
load_category_names,
load_parts_catalog,
)
from lib.rebrickable.stats import read_rows
def write_csv(path: Path, headers: list[str], rows: list[list[str]]) -> None:
"""Écrit un CSV simple pour les besoins de tests."""
with path.open("w", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(headers)
writer.writerows(rows)
def test_build_category_totals_and_enrichment(tmp_path: Path) -> None:
"""Calcule les parts par catégorie et marque les catégories structurelles."""
parts_path = tmp_path / "parts_filtered.csv"
write_csv(
parts_path,
[
"part_num",
"color_rgb",
"is_translucent",
"set_num",
"set_id",
"year",
"quantity_in_set",
"is_spare",
"is_minifig_part",
],
[
["p1", "aaaaaa", "false", "1000-1", "1000", "2020", "3", "false", "false"],
["p2", "bbbbbb", "false", "1000-1", "1000", "2020", "2", "false", "true"],
["p3", "cccccc", "false", "2000-1", "2000", "2021", "4", "false", "false"],
],
)
parts_catalog_path = tmp_path / "parts.csv"
write_csv(parts_catalog_path, ["part_num", "name", "part_cat_id"], [["p1", "Brick A", "1"], ["p2", "Head", "99"], ["p3", "Slope", "99"]])
categories_path = tmp_path / "part_categories.csv"
write_csv(categories_path, ["id", "name"], [["1", "Baseplates"], ["99", "Bricks"]])
sets_path = tmp_path / "sets_enriched.csv"
write_csv(
sets_path,
["set_num", "set_id", "name", "year", "in_collection"],
[["1000-1", "1000", "Set A", "2020", "true"], ["2000-1", "2000", "Set B", "2021", "false"]],
)
parts_rows = read_rows(parts_path)
grouped = group_rows_by_set(parts_rows)
categories_by_set_raw, categories_by_year = build_category_totals(
grouped,
load_parts_catalog(parts_catalog_path),
load_category_names(categories_path),
)
enriched = enrich_categories_with_sets(categories_by_set_raw, {row["set_num"]: row for row in read_rows(sets_path)})
global_rows = build_global_totals(enriched)
assert categories_by_set_raw == [
{
"set_num": "1000-1",
"category_id": "1",
"category_name": "Baseplates",
"quantity_non_spare": "3",
"quantity_minifig": "0",
"quantity_non_minifig": "3",
"share_non_spare": "0.6000",
},
{
"set_num": "1000-1",
"category_id": "99",
"category_name": "Bricks",
"quantity_non_spare": "2",
"quantity_minifig": "2",
"quantity_non_minifig": "0",
"share_non_spare": "0.4000",
},
{
"set_num": "2000-1",
"category_id": "99",
"category_name": "Bricks",
"quantity_non_spare": "4",
"quantity_minifig": "0",
"quantity_non_minifig": "4",
"share_non_spare": "1.0000",
},
]
assert categories_by_year == [
{
"year": "2020",
"category_id": "1",
"category_name": "Baseplates",
"quantity_non_spare": "3",
"share_non_spare": "0.6000",
"is_structural": "true",
},
{
"year": "2020",
"category_id": "99",
"category_name": "Bricks",
"quantity_non_spare": "2",
"share_non_spare": "0.4000",
"is_structural": "false",
},
{
"year": "2021",
"category_id": "99",
"category_name": "Bricks",
"quantity_non_spare": "4",
"share_non_spare": "1.0000",
"is_structural": "false",
},
]
assert enriched[0]["is_structural"] == "true"
assert enriched[1]["is_structural"] == "false"
assert global_rows[0]["category_id"] == "99"
assert global_rows[1]["category_id"] == "1"

View File

@ -0,0 +1,45 @@
"""Tests des graphiques de répartition par catégorie."""
import matplotlib
from pathlib import Path
from lib.plots.part_categories import (
plot_part_categories_heatmap,
plot_structural_share_timeline,
plot_top_part_categories_area,
)
matplotlib.use("Agg")
def test_plot_part_categories_outputs_images(tmp_path: Path) -> None:
"""Génère les trois visuels principaux."""
by_year = tmp_path / "part_categories_by_year.csv"
by_global = tmp_path / "part_categories_global.csv"
by_year.write_text(
"year,category_id,category_name,quantity_non_spare,share_non_spare,is_structural\n"
"2020,1,Baseplates,5,0.5,true\n"
"2020,2,Bricks,5,0.5,false\n"
"2021,1,Baseplates,2,0.25,true\n"
"2021,2,Bricks,6,0.75,false\n"
)
by_global.write_text(
"category_id,category_name,quantity_non_spare,share_non_spare,is_structural\n"
"2,Bricks,11,0.6875,false\n"
"1,Baseplates,7,0.4375,true\n"
)
area_dest = tmp_path / "figures" / "step29" / "top_part_categories_area.png"
heatmap_dest = tmp_path / "figures" / "step29" / "part_categories_heatmap.png"
structural_dest = tmp_path / "figures" / "step29" / "structural_share_timeline.png"
plot_top_part_categories_area(by_year, by_global, area_dest, top_n=2)
plot_part_categories_heatmap(by_year, heatmap_dest)
plot_structural_share_timeline(by_year, structural_dest)
assert area_dest.exists()
assert heatmap_dest.exists()
assert structural_dest.exists()
assert area_dest.stat().st_size > 0
assert heatmap_dest.stat().st_size > 0
assert structural_dest.stat().st_size > 0