# -*- coding: utf-8 -*-
"""
SIM2 data standardization module.
Converts raw SIM2 CSV data into CF-compliant NetCDF with
standardized coordinates, units, and metadata. Follows the same
architecture as standardize_era5.py (ERA5StandardizerWL).
Created on 2026-03-11
@author: Bastien Boivin
@contact: bastien.boivin@proton.me
"""
import os
import pathlib
import logging
from typing import Union, Optional, Dict, List
import numpy as np
import pandas as pd
import xarray as xr
from geop4th import geobricks as geo
from geop4th.datasets import (
SIM2,
get_variables,
get_variable_info,
get_short_to_long_name_mapping,
get_temporal_aggregation_modes,
)
from geop4th.workflows.standardize._common import (
detect_frequency,
export_standardized,
add_cf_global_attrs,
clean_encoding_conflicts,
)
logger = logging.getLogger(__name__)
# SIM2 grid resolution in metres
_SIM2_RES = 8000
def standardize_sim2_dataframe(df, var_list=None):
"""
Convert a raw SIM2 pandas DataFrame (from CSV) into a standardized
xr.Dataset. This is the low-level routine used by both the
SIM2Standardizer class and the SIM2Downloader.
Parameters
----------
df : pd.DataFrame
Raw SIM2 data with columns LAMBX, LAMBY, DATE, and variable columns.
var_list : list of str, optional
Variables to keep. If None, all data columns are kept.
Returns
-------
xr.Dataset
Gridded dataset with standardized coordinates and units.
"""
df = df.copy()
# Resolve variable names
aliases = SIM2.get('name_aliases', {})
if var_list is not None:
if isinstance(var_list, str):
var_list = [var_list]
resolved = []
for v in var_list:
resolved.append(aliases.get(v, v if v.endswith('_Q') else v + '_Q'))
var_list = resolved
# Rename spatial columns
if 'LAMBX' in df.columns:
df.rename(columns={'LAMBX': 'x', 'LAMBY': 'y', 'DATE': 'time'}, inplace=True)
df[['x', 'y']] = df[['x', 'y']] * 100 # hectometres -> metres
# Compute PRETOT if components are present
if 'PRELIQ_Q' in df.columns and 'PRENEI_Q' in df.columns:
if var_list is None or 'PRETOT_Q' in var_list:
df['PRETOT_Q'] = df['PRELIQ_Q'] + df['PRENEI_Q']
# Filter columns
if var_list is not None:
keep_cols = ['time', 'y', 'x'] + [v for v in var_list if v in df.columns]
df = df[keep_cols]
# Pivot to xarray
df.set_index(['time', 'y', 'x'], inplace=True)
ds = df.to_xarray()
# Fill spatial axes to a continuous grid
ds = ds.reindex(
x=np.arange(ds.x.min().values, ds.x.max().values + _SIM2_RES, _SIM2_RES),
)
ds = ds.reindex(
y=np.arange(ds.y.min().values, ds.y.max().values + _SIM2_RES, _SIM2_RES),
)
# Embed CRS and standardize grid mapping
ds = geo.georef(ds, crs=27572)
# Variable metadata and unit conversion
all_var_info = {}
for cat in SIM2['temporal_variables'].values():
all_var_info.update(cat)
all_var_info.update(SIM2.get('secondary_variables', {}))
for var in list(ds.data_vars):
info = all_var_info.get(var)
if info is None:
continue
# Unit conversion first (convert_units overwrites attrs)
conversion = info.get('conversion')
if conversion is not None:
ds[var] = geo.convert_units(ds[var], conversion)
# Metadata after conversion to ensure correct values
ds[var].attrs['long_name'] = info['description']
ds[var].attrs['units'] = info.get('standard_units', info['units'])
return ds
class SIM2Standardizer:
"""
Standardize SIM2 data for hydrological modeling.
Handles CSV files, directories of CSVs, and xr.Datasets already in memory.
Parameters
----------
data : str, pathlib.Path, pd.DataFrame, or xr.Dataset
Input data.
- Directory path: all CSV files inside are merged and standardized.
- File path: single CSV file.
- DataFrame: raw SIM2 tabular data.
- Dataset: already-gridded data (only metadata/units are fixed).
var_list : list of str, optional
Variables to standardize.
output_path : bool, str, pathlib.Path, or None
- False: return dataset only, don't write to disk.
- None or True: write next to the input data.
- str/Path: write to that directory.
output_prefix : str
Filename prefix. Default 'SIM2'.
low_memory : bool
Process CSV files one at a time (slower but less RAM).
"""
def __init__(
self,
data,
var_list=None,
output_path=None,
output_prefix='SIM2',
low_memory=False,
):
self.data = data
self.var_list = self._normalize_var_list(var_list)
self.output_prefix = output_prefix
self.low_memory = low_memory
# Output settings
if output_path is False:
self.output_dir = None
self._want_save = False
elif isinstance(output_path, (str, pathlib.Path)):
self.output_dir = pathlib.Path(output_path)
self._want_save = True
else:
self.output_dir = None
self._want_save = True
@staticmethod
def _normalize_var_list(var_list):
"""Normalize variable names."""
if var_list is None:
return None
if isinstance(var_list, str):
var_list = [var_list]
else:
var_list = list(var_list)
aliases = SIM2.get('name_aliases', {})
return [aliases.get(v, v if v.endswith('_Q') else v + '_Q') for v in var_list]
def standardize(self):
"""
Run the standardization workflow.
Returns
-------
xr.Dataset
Standardized dataset.
"""
# Load data depending on input type
if isinstance(self.data, xr.Dataset):
ds = self._standardize_dataset(self.data)
elif isinstance(self.data, pd.DataFrame):
ds = standardize_sim2_dataframe(self.data, var_list=self.var_list)
ds = self._finalize(ds)
elif isinstance(self.data, (str, pathlib.Path)):
ds = self._standardize_from_path(pathlib.Path(self.data))
else:
raise ValueError(f"Unsupported data type: {type(self.data)}")
# Export
if self._want_save:
self._save(ds)
return ds
def _standardize_from_path(self, path):
"""Load from file or directory, then standardize."""
if path.is_dir():
return self._standardize_directory(path)
elif path.suffix.lower() == '.csv':
return self._standardize_csv(path)
elif path.suffix.lower() == '.nc':
ds = geo.load(str(path))
return self._standardize_dataset(ds)
else:
raise ValueError(f"Unsupported file format: {path.suffix}")
def _standardize_csv(self, filepath):
"""Standardize a single CSV file."""
df = pd.read_csv(
filepath, sep=';', header=0, decimal='.',
parse_dates=['DATE'],
)
ds = standardize_sim2_dataframe(df, var_list=self.var_list)
return self._finalize(ds)
def _standardize_directory(self, dirpath):
"""Merge and standardize all CSVs in a directory."""
_, filelist = geo.get_filelist(str(dirpath), extension='.csv')
_, latest = geo.get_filelist(str(dirpath), extension='.csv', tag='latest')
n_files = len(filelist) - len(latest)
if n_files == 0:
raise ValueError(f"No CSV files found in {dirpath}")
print(f"Merging {n_files} SIM2 CSV files (~{n_files * 7 + 7} GB RAM recommended)")
if self.low_memory:
print("Low-memory mode: processing one file at a time")
if not self.low_memory:
df = geo.merge(
str(dirpath), extension='.csv',
sep=';', header=0, decimal='.', parse_dates=['DATE'],
)
ds = standardize_sim2_dataframe(df, var_list=self.var_list)
else:
# Process one file at a time
data_folder, files = geo.get_filelist(str(dirpath), extension='.csv')
ds = None
for f in files:
fpath = os.path.join(data_folder, f)
df = pd.read_csv(
fpath, sep=';', header=0, decimal='.', parse_dates=['DATE'],
)
chunk_ds = standardize_sim2_dataframe(df, var_list=self.var_list)
if ds is None:
ds = chunk_ds
else:
ds = geo.merge([ds, chunk_ds], update_val=True)
return self._finalize(ds)
def _standardize_dataset(self, ds):
"""Standardize an already-loaded xr.Dataset."""
all_var_info = {}
for cat in SIM2['temporal_variables'].values():
all_var_info.update(cat)
all_var_info.update(SIM2.get('secondary_variables', {}))
# Compute PRETOT if missing and components exist
if 'PRELIQ_Q' in ds.data_vars and 'PRENEI_Q' in ds.data_vars:
if 'PRETOT_Q' not in ds.data_vars:
if self.var_list is None or 'PRETOT_Q' in self.var_list:
ds['PRETOT_Q'] = ds['PRELIQ_Q'] + ds['PRENEI_Q']
print(" Computed PRETOT_Q = PRELIQ_Q + PRENEI_Q")
# Ensure CRS is embedded
if ds.rio.crs is None:
ds = geo.georef(ds, crs=27572)
# Fix metadata and units
for var in list(ds.data_vars):
info = all_var_info.get(var)
if info is None:
continue
# Check current units before overwriting to decide if conversion needed
conversion = info.get('conversion')
if conversion is not None:
current_units = ds[var].attrs.get('units', '')
target_units = info.get('standard_units', info['units'])
if current_units != target_units:
ds[var] = geo.convert_units(ds[var], conversion)
# Metadata after conversion
ds[var].attrs['long_name'] = info['description']
ds[var].attrs['units'] = info.get('standard_units', info['units'])
return self._finalize(ds)
def _finalize(self, ds):
"""Add CF metadata and clean encoding."""
ds = add_cf_global_attrs(
ds,
dataset_name='SIM2 Reanalysis',
source_institution='Météo-France',
processor_name='SIM2Standardizer',
frequency='daily',
)
ds.attrs['standardization_applied'] = 'True'
ds.attrs['standardization_date'] = pd.Timestamp.now().isoformat()
ds.attrs['target_frequency'] = 'daily'
ds = clean_encoding_conflicts(ds)
return ds
def _save(self, ds):
"""Save the dataset to disk."""
if self.output_dir is None:
if isinstance(self.data, (str, pathlib.Path)):
p = pathlib.Path(self.data)
out_dir = p.parent if p.is_file() else p
else:
out_dir = pathlib.Path.cwd()
else:
out_dir = self.output_dir
if len(ds.data_vars) == 1:
var_name = list(ds.data_vars)[0]
else:
var_name = 'data'
export_standardized(
ds, out_dir,
prefix=self.output_prefix,
variable_name=var_name,
frequency='daily',
processing_level='STD',
)
[docs]
def standardize_sim2(
data,
var_list=None,
output_path=None,
output_prefix='SIM2',
low_memory=False,
):
"""
Standardize SIM2 reanalysis data.
Parameters
----------
data : str, pathlib.Path, pd.DataFrame, or xr.Dataset
Input data (directory of CSVs, single CSV, DataFrame, or Dataset).
var_list : list of str, optional
Variables to standardize.
output_path : bool, str, pathlib.Path, or None
Output control (False = memory only, None = next to input, path = there).
output_prefix : str
Filename prefix. Default 'SIM2'.
low_memory : bool
Process files one at a time.
Returns
-------
xr.Dataset
Standardized dataset.
"""
standardizer = SIM2Standardizer(
data=data,
var_list=var_list,
output_path=output_path,
output_prefix=output_prefix,
low_memory=low_memory,
)
return standardizer.standardize()