"""Tests du module de téléchargement Rebrickable.""" import gzip from pathlib import Path import responses from lib.rebrickable.downloader import ( build_rebrickable_url, download_rebrickable_file, download_rebrickable_files, ) def test_build_rebrickable_url() -> None: """Construit l'URL complète vers Rebrickable.""" assert build_rebrickable_url("themes.csv.gz") == ( "https://cdn.rebrickable.com/media/downloads/themes.csv.gz" ) @responses.activate def test_download_rebrickable_file(tmp_path: Path) -> None: """Télécharge, enregistre et décompresse le fichier compressé.""" file_name = "themes.csv.gz" uncompressed_content = b"compressed-data" compressed_body = gzip.compress(uncompressed_content) responses.add( responses.GET, build_rebrickable_url(file_name), body=compressed_body, status=200, ) target_path = download_rebrickable_file(file_name, tmp_path) assert target_path == tmp_path / "themes.csv" assert target_path.read_bytes() == uncompressed_content assert not (tmp_path / file_name).exists() @responses.activate def test_download_skips_when_cache_is_fresh(tmp_path: Path) -> None: """Ne retélécharge pas un fichier récent et conserve le contenu.""" file_name = "themes.csv.gz" cached_path = tmp_path / "themes.csv" cached_path.write_bytes(b"cached") target_path = download_rebrickable_file(file_name, tmp_path) assert target_path == cached_path assert target_path.read_bytes() == b"cached" assert not (tmp_path / file_name).exists() assert len(responses.calls) == 0 @responses.activate def test_download_multiple_rebrickable_files(tmp_path: Path) -> None: """Télécharge plusieurs fichiers compressés et les décompresse.""" file_names = [ "inventories.csv.gz", "inventory_parts.csv.gz", "parts.csv.gz", "colors.csv.gz", ] compressed_bodies = {} for file_name in file_names: uncompressed_content = file_name.encode() compressed_body = gzip.compress(uncompressed_content) compressed_bodies[file_name] = compressed_body responses.add( responses.GET, build_rebrickable_url(file_name), body=compressed_body, status=200, ) downloaded_paths = download_rebrickable_files(file_names, tmp_path) assert downloaded_paths == [ tmp_path / "inventories.csv", tmp_path / "inventory_parts.csv", tmp_path / "parts.csv", tmp_path / "colors.csv", ] assert len(responses.calls) == len(file_names) for file_name in file_names: target_path = tmp_path / file_name decompressed_path = target_path.with_suffix("") assert decompressed_path.read_bytes() == file_name.encode() assert not target_path.exists()