You've already forked etude_lego_jurassic_world
Ajoute l’analyse des catégories de pièces
This commit is contained in:
240
lib/rebrickable/part_categories.py
Normal file
240
lib/rebrickable/part_categories.py
Normal file
@@ -0,0 +1,240 @@
|
||||
"""Agrégation des parts par catégorie pour les sets filtrés."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Sequence, Tuple
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.rebrickable.color_ignores import is_ignored_part_category
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
def load_parts_catalog(path: Path) -> Dict[str, dict]:
|
||||
"""Indexe les pièces par référence avec leur catégorie."""
|
||||
catalog: Dict[str, dict] = {}
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
for row in reader:
|
||||
catalog[row["part_num"]] = row
|
||||
return catalog
|
||||
|
||||
|
||||
def load_category_names(path: Path) -> Dict[str, str]:
|
||||
"""Associe chaque catégorie à son libellé."""
|
||||
names: Dict[str, str] = {}
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
for row in reader:
|
||||
names[row["id"]] = row["name"]
|
||||
return names
|
||||
|
||||
|
||||
def load_sets_enriched(path: Path) -> Dict[str, dict]:
|
||||
"""Indexe les sets enrichis par numéro complet."""
|
||||
sets: Dict[str, dict] = {}
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
for row in reader:
|
||||
sets[row["set_num"]] = row
|
||||
return sets
|
||||
|
||||
|
||||
def group_rows_by_set(rows: Iterable[dict]) -> Dict[str, List[dict]]:
|
||||
"""Regroupe les lignes parts_filtered par set."""
|
||||
grouped: Dict[str, List[dict]] = {}
|
||||
for row in rows:
|
||||
set_rows = grouped.get(row["set_num"])
|
||||
if set_rows is None:
|
||||
set_rows = []
|
||||
grouped[row["set_num"]] = set_rows
|
||||
set_rows.append(row)
|
||||
return grouped
|
||||
|
||||
|
||||
def build_category_totals(
|
||||
grouped_parts: Dict[str, List[dict]],
|
||||
parts_catalog: Dict[str, dict],
|
||||
category_names: Dict[str, str],
|
||||
) -> Tuple[List[dict], List[dict]]:
|
||||
"""Construit les agrégats par set puis par année."""
|
||||
categories_by_set: List[dict] = []
|
||||
categories_by_year: Dict[Tuple[str, str], dict] = {}
|
||||
totals_by_set: Dict[str, int] = {}
|
||||
totals_minifig_by_set: Dict[str, int] = {}
|
||||
for set_num, rows in grouped_parts.items():
|
||||
total_non_spare = sum(int(row["quantity_in_set"]) for row in rows if row["is_spare"] == "false")
|
||||
totals_by_set[set_num] = total_non_spare
|
||||
totals_minifig_by_set[set_num] = sum(
|
||||
int(row["quantity_in_set"])
|
||||
for row in rows
|
||||
if row["is_spare"] == "false" and row["is_minifig_part"] == "true"
|
||||
)
|
||||
by_category: Dict[str, dict] = {}
|
||||
for row in rows:
|
||||
if row["is_spare"] == "true":
|
||||
continue
|
||||
part = parts_catalog[row["part_num"]]
|
||||
cat_id = part["part_cat_id"]
|
||||
cat_name = category_names[cat_id]
|
||||
entry = by_category.get(cat_id)
|
||||
if entry is None:
|
||||
entry = {
|
||||
"category_id": cat_id,
|
||||
"category_name": cat_name,
|
||||
"quantity_non_spare": 0,
|
||||
"quantity_minifig": 0,
|
||||
"quantity_non_minifig": 0,
|
||||
}
|
||||
by_category[cat_id] = entry
|
||||
quantity = int(row["quantity_in_set"])
|
||||
entry["quantity_non_spare"] += quantity
|
||||
if row["is_minifig_part"] == "true":
|
||||
entry["quantity_minifig"] += quantity
|
||||
else:
|
||||
entry["quantity_non_minifig"] += quantity
|
||||
for cat_id, entry in by_category.items():
|
||||
categories_by_set.append(
|
||||
{
|
||||
"set_num": set_num,
|
||||
"category_id": cat_id,
|
||||
"category_name": entry["category_name"],
|
||||
"quantity_non_spare": str(entry["quantity_non_spare"]),
|
||||
"quantity_minifig": str(entry["quantity_minifig"]),
|
||||
"quantity_non_minifig": str(entry["quantity_non_minifig"]),
|
||||
"share_non_spare": f"{entry['quantity_non_spare'] / total_non_spare:.4f}",
|
||||
}
|
||||
)
|
||||
year = rows[0]["year"]
|
||||
key = (year, cat_id)
|
||||
year_entry = categories_by_year.get(key)
|
||||
if year_entry is None:
|
||||
year_entry = {
|
||||
"year": year,
|
||||
"category_id": cat_id,
|
||||
"category_name": entry["category_name"],
|
||||
"quantity_non_spare": 0,
|
||||
}
|
||||
categories_by_year[key] = year_entry
|
||||
year_entry["quantity_non_spare"] += entry["quantity_non_spare"]
|
||||
categories_by_set.sort(key=lambda row: (row["set_num"], row["category_name"]))
|
||||
categories_year_rows = []
|
||||
totals_by_year: Dict[str, int] = {}
|
||||
for (year, _), entry in categories_by_year.items():
|
||||
totals_by_year[year] = totals_by_year.get(year, 0) + entry["quantity_non_spare"]
|
||||
for key, entry in categories_by_year.items():
|
||||
total_year = totals_by_year[key[0]]
|
||||
categories_year_rows.append(
|
||||
{
|
||||
"year": entry["year"],
|
||||
"category_id": entry["category_id"],
|
||||
"category_name": entry["category_name"],
|
||||
"quantity_non_spare": str(entry["quantity_non_spare"]),
|
||||
"share_non_spare": f"{entry['quantity_non_spare'] / total_year:.4f}",
|
||||
"is_structural": "true" if is_ignored_part_category(entry["category_id"]) else "false",
|
||||
}
|
||||
)
|
||||
categories_year_rows.sort(key=lambda row: (int(row["year"]), row["category_name"]))
|
||||
return categories_by_set, categories_year_rows
|
||||
|
||||
|
||||
def enrich_categories_with_sets(rows: Iterable[dict], sets_lookup: Dict[str, dict]) -> List[dict]:
|
||||
"""Ajoute les métadonnées de set aux agrégats par catégorie."""
|
||||
enriched: List[dict] = []
|
||||
for row in rows:
|
||||
set_row = sets_lookup[row["set_num"]]
|
||||
enriched.append(
|
||||
{
|
||||
"set_num": row["set_num"],
|
||||
"set_id": set_row["set_id"],
|
||||
"name": set_row["name"],
|
||||
"year": set_row["year"],
|
||||
"in_collection": set_row["in_collection"],
|
||||
"category_id": row["category_id"],
|
||||
"category_name": row["category_name"],
|
||||
"quantity_non_spare": row["quantity_non_spare"],
|
||||
"quantity_minifig": row["quantity_minifig"],
|
||||
"quantity_non_minifig": row["quantity_non_minifig"],
|
||||
"share_non_spare": row["share_non_spare"],
|
||||
"is_structural": "true" if is_ignored_part_category(row["category_id"]) else "false",
|
||||
}
|
||||
)
|
||||
enriched.sort(key=lambda row: (row["set_num"], row["category_name"]))
|
||||
return enriched
|
||||
|
||||
|
||||
def build_global_totals(rows: Iterable[dict]) -> List[dict]:
|
||||
"""Agrège les quantités par catégorie pour l'ensemble des sets filtrés."""
|
||||
totals: Dict[str, dict] = {}
|
||||
grand_total = 0
|
||||
for row in rows:
|
||||
entry = totals.get(row["category_id"])
|
||||
if entry is None:
|
||||
entry = {
|
||||
"category_id": row["category_id"],
|
||||
"category_name": row["category_name"],
|
||||
"quantity_non_spare": 0,
|
||||
"is_structural": row["is_structural"],
|
||||
}
|
||||
totals[row["category_id"]] = entry
|
||||
value = int(row["quantity_non_spare"])
|
||||
entry["quantity_non_spare"] += value
|
||||
grand_total += value
|
||||
global_rows: List[dict] = []
|
||||
for entry in totals.values():
|
||||
global_rows.append(
|
||||
{
|
||||
"category_id": entry["category_id"],
|
||||
"category_name": entry["category_name"],
|
||||
"quantity_non_spare": str(entry["quantity_non_spare"]),
|
||||
"share_non_spare": f"{entry['quantity_non_spare'] / grand_total:.4f}",
|
||||
"is_structural": entry["is_structural"],
|
||||
}
|
||||
)
|
||||
global_rows.sort(key=lambda row: (-int(row["quantity_non_spare"]), row["category_name"]))
|
||||
return global_rows
|
||||
|
||||
|
||||
def write_categories_by_set(destination_path: Path, rows: Sequence[dict]) -> None:
|
||||
"""Écrit le CSV par set et par catégorie."""
|
||||
ensure_parent_dir(destination_path)
|
||||
fieldnames = [
|
||||
"set_num",
|
||||
"set_id",
|
||||
"name",
|
||||
"year",
|
||||
"in_collection",
|
||||
"category_id",
|
||||
"category_name",
|
||||
"quantity_non_spare",
|
||||
"quantity_minifig",
|
||||
"quantity_non_minifig",
|
||||
"share_non_spare",
|
||||
"is_structural",
|
||||
]
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def write_categories_by_year(destination_path: Path, rows: Sequence[dict]) -> None:
|
||||
"""Écrit le CSV des parts par catégorie et par année."""
|
||||
ensure_parent_dir(destination_path)
|
||||
fieldnames = ["year", "category_id", "category_name", "quantity_non_spare", "share_non_spare", "is_structural"]
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def write_categories_global(destination_path: Path, rows: Sequence[dict]) -> None:
|
||||
"""Écrit le CSV agrégé globalement."""
|
||||
ensure_parent_dir(destination_path)
|
||||
fieldnames = ["category_id", "category_name", "quantity_non_spare", "share_non_spare", "is_structural"]
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
Reference in New Issue
Block a user