Source code for standardize_sim2

# -*- coding: utf-8 -*-
"""
SIM2 data standardization module.

Converts raw SIM2 CSV data into CF-compliant NetCDF with
standardized coordinates, units, and metadata. Follows the same
architecture as standardize_era5.py (ERA5StandardizerWL).

Created on 2026-03-11
@author: Bastien Boivin
@contact: bastien.boivin@proton.me
"""

import os
import pathlib
import logging
from typing import Union, Optional, Dict, List

import numpy as np
import pandas as pd
import xarray as xr

from geop4th import geobricks as geo
from geop4th.datasets import (
    SIM2,
    get_variables,
    get_variable_info,
    get_short_to_long_name_mapping,
    get_temporal_aggregation_modes,
)
from geop4th.workflows.standardize._common import (
    detect_frequency,
    export_standardized,
    add_cf_global_attrs,
    clean_encoding_conflicts,
)

logger = logging.getLogger(__name__)

# SIM2 grid resolution in metres
_SIM2_RES = 8000


def standardize_sim2_dataframe(df, var_list=None):
    """
    Convert a raw SIM2 pandas DataFrame (from CSV) into a standardized
    xr.Dataset. This is the low-level routine used by both the
    SIM2Standardizer class and the SIM2Downloader.

    Parameters
    ----------
    df : pd.DataFrame
        Raw SIM2 data with columns LAMBX, LAMBY, DATE, and variable columns.
    var_list : list of str, optional
        Variables to keep. If None, all data columns are kept.

    Returns
    -------
    xr.Dataset
        Gridded dataset with standardized coordinates and units.
    """
    df = df.copy()

    # Resolve variable names
    aliases = SIM2.get('name_aliases', {})
    if var_list is not None:
        if isinstance(var_list, str):
            var_list = [var_list]
        resolved = []
        for v in var_list:
            resolved.append(aliases.get(v, v if v.endswith('_Q') else v + '_Q'))
        var_list = resolved

    # Rename spatial columns
    if 'LAMBX' in df.columns:
        df.rename(columns={'LAMBX': 'x', 'LAMBY': 'y', 'DATE': 'time'}, inplace=True)
        df[['x', 'y']] = df[['x', 'y']] * 100  # hectometres -> metres

    # Compute PRETOT if components are present
    if 'PRELIQ_Q' in df.columns and 'PRENEI_Q' in df.columns:
        if var_list is None or 'PRETOT_Q' in var_list:
            df['PRETOT_Q'] = df['PRELIQ_Q'] + df['PRENEI_Q']

    # Filter columns
    if var_list is not None:
        keep_cols = ['time', 'y', 'x'] + [v for v in var_list if v in df.columns]
        df = df[keep_cols]

    # Pivot to xarray
    df.set_index(['time', 'y', 'x'], inplace=True)
    ds = df.to_xarray()

    # Fill spatial axes to a continuous grid
    ds = ds.reindex(
        x=np.arange(ds.x.min().values, ds.x.max().values + _SIM2_RES, _SIM2_RES),
    )
    ds = ds.reindex(
        y=np.arange(ds.y.min().values, ds.y.max().values + _SIM2_RES, _SIM2_RES),
    )

    # Embed CRS and standardize grid mapping
    ds = geo.georef(ds, crs=27572)

    # Variable metadata and unit conversion
    all_var_info = {}
    for cat in SIM2['temporal_variables'].values():
        all_var_info.update(cat)
    all_var_info.update(SIM2.get('secondary_variables', {}))

    for var in list(ds.data_vars):
        info = all_var_info.get(var)
        if info is None:
            continue

        # Unit conversion first (convert_units overwrites attrs)
        conversion = info.get('conversion')
        if conversion is not None:
            ds[var] = geo.convert_units(ds[var], conversion)

        # Metadata after conversion to ensure correct values
        ds[var].attrs['long_name'] = info['description']
        ds[var].attrs['units'] = info.get('standard_units', info['units'])

    return ds


class SIM2Standardizer:
    """
    Standardize SIM2 data for hydrological modeling.

    Handles CSV files, directories of CSVs, and xr.Datasets already in memory.

    Parameters
    ----------
    data : str, pathlib.Path, pd.DataFrame, or xr.Dataset
        Input data.
        - Directory path: all CSV files inside are merged and standardized.
        - File path: single CSV file.
        - DataFrame: raw SIM2 tabular data.
        - Dataset: already-gridded data (only metadata/units are fixed).
    var_list : list of str, optional
        Variables to standardize.
    output_path : bool, str, pathlib.Path, or None
        - False: return dataset only, don't write to disk.
        - None or True: write next to the input data.
        - str/Path: write to that directory.
    output_prefix : str
        Filename prefix. Default 'SIM2'.
    low_memory : bool
        Process CSV files one at a time (slower but less RAM).
    """

    def __init__(
        self,
        data,
        var_list=None,
        output_path=None,
        output_prefix='SIM2',
        low_memory=False,
    ):
        self.data = data
        self.var_list = self._normalize_var_list(var_list)
        self.output_prefix = output_prefix
        self.low_memory = low_memory

        # Output settings
        if output_path is False:
            self.output_dir = None
            self._want_save = False
        elif isinstance(output_path, (str, pathlib.Path)):
            self.output_dir = pathlib.Path(output_path)
            self._want_save = True
        else:
            self.output_dir = None
            self._want_save = True

    @staticmethod
    def _normalize_var_list(var_list):
        """Normalize variable names."""
        if var_list is None:
            return None
        if isinstance(var_list, str):
            var_list = [var_list]
        else:
            var_list = list(var_list)
        aliases = SIM2.get('name_aliases', {})
        return [aliases.get(v, v if v.endswith('_Q') else v + '_Q') for v in var_list]

    def standardize(self):
        """
        Run the standardization workflow.

        Returns
        -------
        xr.Dataset
            Standardized dataset.
        """
        # Load data depending on input type
        if isinstance(self.data, xr.Dataset):
            ds = self._standardize_dataset(self.data)
        elif isinstance(self.data, pd.DataFrame):
            ds = standardize_sim2_dataframe(self.data, var_list=self.var_list)
            ds = self._finalize(ds)
        elif isinstance(self.data, (str, pathlib.Path)):
            ds = self._standardize_from_path(pathlib.Path(self.data))
        else:
            raise ValueError(f"Unsupported data type: {type(self.data)}")

        # Export
        if self._want_save:
            self._save(ds)

        return ds

    def _standardize_from_path(self, path):
        """Load from file or directory, then standardize."""
        if path.is_dir():
            return self._standardize_directory(path)
        elif path.suffix.lower() == '.csv':
            return self._standardize_csv(path)
        elif path.suffix.lower() == '.nc':
            ds = geo.load(str(path))
            return self._standardize_dataset(ds)
        else:
            raise ValueError(f"Unsupported file format: {path.suffix}")

    def _standardize_csv(self, filepath):
        """Standardize a single CSV file."""
        df = pd.read_csv(
            filepath, sep=';', header=0, decimal='.',
            parse_dates=['DATE'],
        )
        ds = standardize_sim2_dataframe(df, var_list=self.var_list)
        return self._finalize(ds)

    def _standardize_directory(self, dirpath):
        """Merge and standardize all CSVs in a directory."""
        _, filelist = geo.get_filelist(str(dirpath), extension='.csv')
        _, latest = geo.get_filelist(str(dirpath), extension='.csv', tag='latest')
        n_files = len(filelist) - len(latest)

        if n_files == 0:
            raise ValueError(f"No CSV files found in {dirpath}")

        print(f"Merging {n_files} SIM2 CSV files (~{n_files * 7 + 7} GB RAM recommended)")
        if self.low_memory:
            print("Low-memory mode: processing one file at a time")

        if not self.low_memory:
            df = geo.merge(
                str(dirpath), extension='.csv',
                sep=';', header=0, decimal='.', parse_dates=['DATE'],
            )
            ds = standardize_sim2_dataframe(df, var_list=self.var_list)
        else:
            # Process one file at a time
            data_folder, files = geo.get_filelist(str(dirpath), extension='.csv')
            ds = None
            for f in files:
                fpath = os.path.join(data_folder, f)
                df = pd.read_csv(
                    fpath, sep=';', header=0, decimal='.', parse_dates=['DATE'],
                )
                chunk_ds = standardize_sim2_dataframe(df, var_list=self.var_list)
                if ds is None:
                    ds = chunk_ds
                else:
                    ds = geo.merge([ds, chunk_ds], update_val=True)

        return self._finalize(ds)

    def _standardize_dataset(self, ds):
        """Standardize an already-loaded xr.Dataset."""
        all_var_info = {}
        for cat in SIM2['temporal_variables'].values():
            all_var_info.update(cat)
        all_var_info.update(SIM2.get('secondary_variables', {}))

        # Compute PRETOT if missing and components exist
        if 'PRELIQ_Q' in ds.data_vars and 'PRENEI_Q' in ds.data_vars:
            if 'PRETOT_Q' not in ds.data_vars:
                if self.var_list is None or 'PRETOT_Q' in self.var_list:
                    ds['PRETOT_Q'] = ds['PRELIQ_Q'] + ds['PRENEI_Q']
                    print("  Computed PRETOT_Q = PRELIQ_Q + PRENEI_Q")

        # Ensure CRS is embedded
        if ds.rio.crs is None:
            ds = geo.georef(ds, crs=27572)

        # Fix metadata and units
        for var in list(ds.data_vars):
            info = all_var_info.get(var)
            if info is None:
                continue

            # Check current units before overwriting to decide if conversion needed
            conversion = info.get('conversion')
            if conversion is not None:
                current_units = ds[var].attrs.get('units', '')
                target_units = info.get('standard_units', info['units'])
                if current_units != target_units:
                    ds[var] = geo.convert_units(ds[var], conversion)

            # Metadata after conversion
            ds[var].attrs['long_name'] = info['description']
            ds[var].attrs['units'] = info.get('standard_units', info['units'])

        return self._finalize(ds)

    def _finalize(self, ds):
        """Add CF metadata and clean encoding."""
        ds = add_cf_global_attrs(
            ds,
            dataset_name='SIM2 Reanalysis',
            source_institution='Météo-France',
            processor_name='SIM2Standardizer',
            frequency='daily',
        )

        ds.attrs['standardization_applied'] = 'True'
        ds.attrs['standardization_date'] = pd.Timestamp.now().isoformat()
        ds.attrs['target_frequency'] = 'daily'

        ds = clean_encoding_conflicts(ds)
        return ds

    def _save(self, ds):
        """Save the dataset to disk."""
        if self.output_dir is None:
            if isinstance(self.data, (str, pathlib.Path)):
                p = pathlib.Path(self.data)
                out_dir = p.parent if p.is_file() else p
            else:
                out_dir = pathlib.Path.cwd()
        else:
            out_dir = self.output_dir

        if len(ds.data_vars) == 1:
            var_name = list(ds.data_vars)[0]
        else:
            var_name = 'data'

        export_standardized(
            ds, out_dir,
            prefix=self.output_prefix,
            variable_name=var_name,
            frequency='daily',
            processing_level='STD',
        )


[docs] def standardize_sim2( data, var_list=None, output_path=None, output_prefix='SIM2', low_memory=False, ): """ Standardize SIM2 reanalysis data. Parameters ---------- data : str, pathlib.Path, pd.DataFrame, or xr.Dataset Input data (directory of CSVs, single CSV, DataFrame, or Dataset). var_list : list of str, optional Variables to standardize. output_path : bool, str, pathlib.Path, or None Output control (False = memory only, None = next to input, path = there). output_prefix : str Filename prefix. Default 'SIM2'. low_memory : bool Process files one at a time. Returns ------- xr.Dataset Standardized dataset. """ standardizer = SIM2Standardizer( data=data, var_list=var_list, output_path=output_path, output_prefix=output_prefix, low_memory=low_memory, ) return standardizer.standardize()