"""Détection d'événements météorologiques et extraction de segments alignés.""" from __future__ import annotations from typing import Sequence import numpy as np import pandas as pd from .core import _ensure_datetime_index, _infer_time_step __all__ = ['detect_threshold_events', 'build_event_aligned_segments'] def detect_threshold_events( series: pd.Series, *, threshold: float, min_duration: pd.Timedelta, min_gap: pd.Timedelta, ) -> list[tuple[pd.Timestamp, pd.Timestamp]]: """ Détecte des événements où `series > threshold` (après remplissage des NaN par False) durant au moins `min_duration`. Les événements séparés d'un intervalle < min_gap sont fusionnés. """ if not isinstance(series.index, pd.DatetimeIndex): raise TypeError("series doit être indexée par le temps.") mask = (series > threshold).fillna(False) if not mask.any(): return [] groups = (mask != mask.shift()).cumsum() time_step = _infer_time_step(series.index) raw_events: list[tuple[pd.Timestamp, pd.Timestamp]] = [] for group_id, group_mask in mask.groupby(groups): if not group_mask.iloc[0]: continue start = group_mask.index[0] end = group_mask.index[-1] + time_step duration = end - start if duration >= min_duration: raw_events.append((start, end)) if not raw_events: return [] merged: list[tuple[pd.Timestamp, pd.Timestamp]] = [] for start, end in raw_events: if not merged: merged.append((start, end)) continue prev_start, prev_end = merged[-1] if start - prev_end < min_gap: merged[-1] = (prev_start, max(prev_end, end)) else: merged.append((start, end)) return merged def build_event_aligned_segments( df: pd.DataFrame, events: Sequence[tuple[pd.Timestamp, pd.Timestamp]], columns: Sequence[str], *, window_before_minutes: int, window_after_minutes: int, resample_minutes: int = 1, ) -> pd.DataFrame: """ Extrait, pour chaque événement, les séries centrées sur son début et retourne un DataFrame MultiIndex (event_id, offset_minutes). """ if not events: return pd.DataFrame(columns=columns) index = _ensure_datetime_index(df) data = df[columns].sort_index() freq = pd.Timedelta(minutes=resample_minutes) if resample_minutes > 1: data = data.resample(freq).mean() before = pd.Timedelta(minutes=window_before_minutes) after = pd.Timedelta(minutes=window_after_minutes) segments: list[pd.DataFrame] = [] for event_id, (start, _end) in enumerate(events): window_start = start - before window_end = start + after window_index = pd.date_range(window_start, window_end, freq=freq) segment = data.reindex(window_index) if segment.empty: continue offsets = ((segment.index - start) / pd.Timedelta(minutes=1)).astype(float) multi_index = pd.MultiIndex.from_arrays( [np.full(len(segment), event_id), offsets], names=["event_id", "offset_minutes"], ) segment.index = multi_index segments.append(segment) if not segments: return pd.DataFrame(columns=columns) aligned = pd.concat(segments) return aligned