"""Mesure la rareté des pièces présentes dans les sets filtrés.""" import csv from pathlib import Path from typing import Dict, Iterable, List, Sequence, Set from lib.filesystem import ensure_parent_dir from lib.rebrickable.parts_inventory import index_inventory_parts_by_inventory, select_latest_inventories from lib.rebrickable.stats import read_rows IGNORED_PART_CATEGORY_IDS = {"28", "58", "74", "75"} MINIFIG_PART_CATEGORY_IDS = {"13", "27", "59", "60", "61", "65", "70", "71", "72", "73"} def load_parts_catalog(path: Path) -> Dict[str, dict]: """Indexe les pièces par référence avec leur catégorie et leur nom.""" catalog: Dict[str, dict] = {} with path.open() as csv_file: reader = csv.DictReader(csv_file) for row in reader: catalog[row["part_num"]] = row return catalog def load_part_categories(path: Path) -> Dict[str, str]: """Associe les identifiants de catégorie à leur libellé.""" categories: Dict[str, str] = {} with path.open() as csv_file: reader = csv.DictReader(csv_file) for row in reader: categories[row["id"]] = row["name"] return categories def load_filtered_sets(path: Path) -> Dict[str, dict]: """Charge les sets filtrés avec leurs métadonnées.""" lookup: Dict[str, dict] = {} for row in read_rows(path): lookup[row["set_num"]] = row return lookup def aggregate_filtered_parts( rows: Iterable[dict], parts_catalog: Dict[str, dict], ignored_categories: Set[str] = IGNORED_PART_CATEGORY_IDS, ignored_minifig_categories: Set[str] = MINIFIG_PART_CATEGORY_IDS, ) -> Dict[str, dict]: """Agrège les quantités par pièce pour les sets filtrés (rechanges incluses).""" aggregated: Dict[str, dict] = {} for row in rows: if row["is_minifig_part"] == "true": continue part = parts_catalog[row["part_num"]] if part["part_cat_id"] in ignored_categories: continue if part["part_cat_id"] in ignored_minifig_categories: continue entry = aggregated.get(row["part_num"]) if entry is None: entry = {"quantity": 0, "set_numbers": set()} aggregated[row["part_num"]] = entry entry["quantity"] += int(row["quantity_in_set"]) entry["set_numbers"].add(row["set_num"]) return aggregated def compute_other_set_usage( inventories_path: Path, inventory_parts_path: Path, parts_catalog: Dict[str, dict], filtered_set_numbers: Set[str], ignored_categories: Set[str] = IGNORED_PART_CATEGORY_IDS, ignored_minifig_categories: Set[str] = MINIFIG_PART_CATEGORY_IDS, ) -> Dict[str, int]: """Compte les occurrences des pièces dans le reste du catalogue (rechanges incluses).""" inventories = select_latest_inventories(inventories_path) parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path) totals: Dict[str, int] = {} for set_num, inventory in inventories.items(): if set_num in filtered_set_numbers: continue for row in parts_by_inventory.get(inventory["id"], []): part = parts_catalog[row["part_num"]] if part["part_cat_id"] in ignored_categories: continue if part["part_cat_id"] in ignored_minifig_categories: continue totals[row["part_num"]] = totals.get(row["part_num"], 0) + int(row["quantity"]) return totals def build_part_rarity( parts_filtered_path: Path, inventories_path: Path, inventory_parts_path: Path, parts_catalog_path: Path, part_categories_path: Path, filtered_sets_path: Path, ) -> List[dict]: """Construit le classement de rareté des pièces filtrées.""" parts_catalog = load_parts_catalog(parts_catalog_path) categories = load_part_categories(part_categories_path) filtered_sets = load_filtered_sets(filtered_sets_path) filtered_set_numbers = set(filtered_sets.keys()) filtered_rows = read_rows(parts_filtered_path) filtered_usage = aggregate_filtered_parts(filtered_rows, parts_catalog) other_usage = compute_other_set_usage( inventories_path, inventory_parts_path, parts_catalog, filtered_set_numbers, ) rows: List[dict] = [] for part_num, entry in filtered_usage.items(): part = parts_catalog[part_num] other_quantity = other_usage.get(part_num, 0) total_quantity = entry["quantity"] + other_quantity sample_set_num = sorted(entry["set_numbers"])[0] sample_set_id = filtered_sets[sample_set_num]["set_id"] rows.append( { "part_num": part_num, "part_name": part["name"], "part_cat_id": part["part_cat_id"], "part_category": categories[part["part_cat_id"]], "sample_set_num": sample_set_num, "sample_set_id": sample_set_id, "filtered_quantity": str(entry["quantity"]), "filtered_set_count": str(len(entry["set_numbers"])), "other_sets_quantity": str(other_quantity), "catalog_total_quantity": str(total_quantity), "filtered_share": f"{entry['quantity'] / total_quantity:.4f}", } ) rows.sort(key=lambda row: (int(row["other_sets_quantity"]), int(row["catalog_total_quantity"]), row["part_num"])) return rows def write_part_rarity(destination_path: Path, rows: Sequence[dict]) -> None: """Écrit le CSV complet des pièces classées par rareté.""" ensure_parent_dir(destination_path) fieldnames = [ "part_num", "part_name", "part_cat_id", "part_category", "sample_set_num", "sample_set_id", "filtered_quantity", "filtered_set_count", "other_sets_quantity", "catalog_total_quantity", "filtered_share", ] with destination_path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row) def select_until_reused(rows: Sequence[dict]) -> List[dict]: """Retient les pièces exclusives puis la première réutilisée dans d’autres sets.""" selected: List[dict] = [] for row in rows: selected.append(row) if int(row["other_sets_quantity"]) > 0: break return selected def load_part_rarity(path: Path) -> List[dict]: """Charge le CSV de rareté des pièces.""" return read_rows(path)