1

150 lines
6.3 KiB
Python

"""Construction d'un inventaire détaillé des pièces par set."""
import csv
from pathlib import Path
from typing import Dict, List
from lib.filesystem import ensure_parent_dir
def normalize_boolean(raw_value: str) -> str:
"""Normalise une valeur booléenne en chaîne lowercase."""
return raw_value.lower()
def select_latest_inventories(inventories_path: Path) -> Dict[str, dict]:
"""Retient pour chaque set l'inventaire avec la version la plus élevée."""
latest_inventories: Dict[str, dict] = {}
with inventories_path.open() as inventories_file:
reader = csv.DictReader(inventories_file)
for row in reader:
current = latest_inventories.get(row["set_num"])
if current is None or int(row["version"]) > int(current["version"]):
latest_inventories[row["set_num"]] = {"id": row["id"], "version": row["version"]}
return latest_inventories
def build_color_lookup(colors_path: Path) -> Dict[str, dict]:
"""Construit un index des couleurs par identifiant."""
colors: Dict[str, dict] = {}
with colors_path.open() as colors_file:
reader = csv.DictReader(colors_file)
for row in reader:
colors[row["id"]] = {
"rgb": row["rgb"],
"is_translucent": normalize_boolean(row["is_trans"]),
}
return colors
def index_inventory_parts_by_inventory(inventory_parts_path: Path) -> Dict[str, List[dict]]:
"""Indexe les lignes d'inventaire par identifiant d'inventaire."""
parts_by_inventory: Dict[str, List[dict]] = {}
with inventory_parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
inventory_id = row["inventory_id"]
if inventory_id not in parts_by_inventory:
parts_by_inventory[inventory_id] = []
parts_by_inventory[inventory_id].append(row)
return parts_by_inventory
def index_inventory_minifigs_by_inventory(inventory_minifigs_path: Path) -> Dict[str, List[dict]]:
"""Indexe les minifigs par inventaire."""
minifigs_by_inventory: Dict[str, List[dict]] = {}
with inventory_minifigs_path.open() as minifigs_file:
reader = csv.DictReader(minifigs_file)
for row in reader:
inventory_id = row["inventory_id"]
if inventory_id not in minifigs_by_inventory:
minifigs_by_inventory[inventory_id] = []
minifigs_by_inventory[inventory_id].append(row)
return minifigs_by_inventory
def build_minifig_lookup(minifigs_path: Path) -> Dict[str, dict]:
"""Construit un index des minifigs avec leur nombre de pièces."""
minifigs: Dict[str, dict] = {}
with minifigs_path.open() as minifigs_file:
reader = csv.DictReader(minifigs_file)
for row in reader:
minifigs[row["fig_num"]] = row
return minifigs
def write_parts_filtered(
sets_path: Path,
inventories_path: Path,
inventory_parts_path: Path,
colors_path: Path,
inventory_minifigs_path: Path,
minifigs_path: Path,
destination_path: Path,
) -> None:
"""Assemble un CSV agrégé listant les pièces par set et par couleur."""
latest_inventories = select_latest_inventories(inventories_path)
parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path)
minifigs_by_inventory = index_inventory_minifigs_by_inventory(inventory_minifigs_path)
minifigs = build_minifig_lookup(minifigs_path)
colors = build_color_lookup(colors_path)
ensure_parent_dir(destination_path)
with sets_path.open() as sets_file, destination_path.open("w", newline="") as target_file:
sets_reader = csv.DictReader(sets_file)
fieldnames = [
"part_num",
"color_rgb",
"is_translucent",
"set_num",
"set_id",
"year",
"quantity_in_set",
"is_spare",
"is_minifig_part",
]
writer = csv.DictWriter(target_file, fieldnames=fieldnames)
writer.writeheader()
for set_row in sets_reader:
inventory = latest_inventories[set_row["set_num"]]
inventory_parts = parts_by_inventory[inventory["id"]]
inventory_total_non_spare = sum(
int(part_row["quantity"])
for part_row in inventory_parts
if normalize_boolean(part_row["is_spare"]) == "false"
)
expected_parts = int(set_row["num_parts"])
for part_row in inventory_parts:
color = colors[part_row["color_id"]]
writer.writerow(
{
"part_num": part_row["part_num"],
"color_rgb": color["rgb"],
"is_translucent": color["is_translucent"],
"set_num": set_row["set_num"],
"set_id": set_row["set_id"],
"year": set_row["year"],
"quantity_in_set": part_row["quantity"],
"is_spare": normalize_boolean(part_row["is_spare"]),
"is_minifig_part": "false",
}
)
if inventory_total_non_spare < expected_parts:
for minifig_row in minifigs_by_inventory.get(inventory["id"], []):
minifig_inventory = latest_inventories[minifig_row["fig_num"]]
minifig_parts = parts_by_inventory[minifig_inventory["id"]]
for part_row in minifig_parts:
color = colors[part_row["color_id"]]
writer.writerow(
{
"part_num": part_row["part_num"],
"color_rgb": color["rgb"],
"is_translucent": color["is_translucent"],
"set_num": set_row["set_num"],
"set_id": set_row["set_id"],
"year": set_row["year"],
"quantity_in_set": str(int(part_row["quantity"]) * int(minifig_row["quantity"])),
"is_spare": normalize_boolean(part_row["is_spare"]),
"is_minifig_part": "true",
}
)