Source code for download_sim2

# -*- coding: utf-8 -*-
"""
SIM2 data download from Météo-France (meteo.data.gouv.fr).

Downloads daily SAFRAN-ISBA-MODCOU reanalysis data as gzipped CSV
archives, converts them to NetCDF using the SIM2 standardizer,
and merges with any existing local data.

Created on 2026-03-11
@author: Bastien Boivin
@contact: bastien.boivin@proton.me
"""

import os
import re
import logging
import pathlib
from io import BytesIO
import gzip
from typing import Union, List, Optional, Dict

import numpy as np
import pandas as pd
import geopandas as gpd
import xarray as xr
xr.set_options(keep_attrs=True)

import requests

from geop4th import geobricks as geo
from geop4th.datasets import (
    SIM2,
    get_variables,
    get_short_to_long_name_mapping,
)
from geop4th.download._common import (
    validate_response,
    parse_date,
    resolve_area,
)

logger = logging.getLogger(__name__)

# SIM2 grid parameters (EPSG:2154)
_FRANCE_EXTENT = (56000.0, 1613000.0, 1200000.0, 2685000.0)
_SIM2_RESOLUTION = 8000  # metres


def _resolve_dynamic_dates(source_info):
    """Resolve dynamic date placeholders in download source config."""
    start = source_info['start_date']
    end = source_info['end_date']

    today = pd.to_datetime('today').normalize()
    yesterday = today - pd.Timedelta(1, 'D')
    first_of_month = yesterday.replace(day=1)
    end_of_prev_month = first_of_month - pd.Timedelta(1, 'D')

    if start == 'dynamic_current_month':
        start = first_of_month
    else:
        start = pd.to_datetime(start)

    if end == 'dynamic_yesterday':
        end = yesterday
    elif end == 'dynamic_latest_month':
        end = end_of_prev_month
    else:
        end = pd.to_datetime(end)

    return start, end


def _build_available_data():
    """Build a DataFrame of available SIM2 download sources."""
    rows = {}
    for key, info in SIM2['download_sources'].items():
        start, end = _resolve_dynamic_dates(info)
        rows[key] = {
            'url': info['url'],
            'size_gb': info['size_gb'],
            'start_date': start,
            'end_date': end,
        }
    return pd.DataFrame.from_dict(rows, orient='index')


def _normalize_var_list(var_list):
    """Ensure all variable names end with '_Q'."""
    if var_list is None:
        all_vars = get_variables(SIM2, variable_type='temporal_variables')
        var_list = list(all_vars.keys())
        # Add secondary variables
        for sec_name in SIM2.get('secondary_variables', {}):
            if sec_name not in var_list:
                var_list.append(sec_name)
        return var_list

    if isinstance(var_list, str):
        var_list = [var_list]
    else:
        var_list = list(var_list)

    # Resolve aliases
    aliases = SIM2.get('name_aliases', {})
    normalized = []
    for v in var_list:
        if v in aliases:
            normalized.append(aliases[v])
        elif not v.endswith('_Q'):
            normalized.append(v + '_Q')
        else:
            normalized.append(v)
    return normalized


class SIM2Downloader:
    """
    Download SIM2 reanalysis data from meteo.data.gouv.fr.

    Parameters
    ----------
    dst_folder : str or pathlib.Path
        Output directory. Existing files are detected and only
        missing time periods are downloaded.
    variables : str, list of str, or None
        Variable names (with or without the '_Q' suffix).
        If None, all variables are downloaded.
    start_date : str, int, or pd.Timestamp
        Start of the period. Supports 'YYYY', 'YYYY-MM', 'YYYY-MM-DD'.
    end_date : str, int, or pd.Timestamp, optional
        End of the period. Defaults to yesterday.
    mask : str, pathlib.Path, or gpd.GeoDataFrame, optional
        Spatial mask to clip data to a subregion (saves disk space).
    extension : str
        Output format ('.nc' or '.csv'). Default '.nc'.
    """

    def __init__(
        self,
        dst_folder,
        variables=None,
        start_date=None,
        end_date=None,
        mask=None,
        extension='.nc',
    ):
        self.dst_folder = pathlib.Path(dst_folder)
        self.dst_folder.mkdir(parents=True, exist_ok=True)

        self.var_list = _normalize_var_list(variables)
        self.extension = extension if extension.startswith('.') else '.' + extension
        self.mask = mask

        # Date range
        if start_date is None:
            self.first_date = pd.to_datetime('1958-08-01')
        else:
            self.first_date = parse_date(start_date, is_end_date=False)

        if end_date is None:
            self.last_date = pd.to_datetime('today').normalize() - pd.Timedelta(1, 'D')
        else:
            self.last_date = parse_date(end_date, is_end_date=True)

        self.first_year = self.first_date.year
        self.last_year = self.last_date.year

        # Build source table
        self.available_data = _build_available_data()

    def download(self):
        """
        Execute the download workflow.

        Returns
        -------
        dict
            Mapping of variable name -> output file path.
        """
        print("\nInitializing SIM2 download...")
        print(f"  Variables : {len(self.var_list)}")
        print(f"  Period    : {self.first_date.date()} to {self.last_date.date()}")
        print(f"  Output    : {self.dst_folder}")

        # Scan existing local data
        local_data = self._scan_local_data()

        # Determine which archives need downloading
        to_download = self._determine_downloads(local_data)

        if len(to_download) == 0:
            print("No additional data needs to be downloaded.")
            return {}

        total_size = self.available_data.loc[to_download, 'size_gb'].sum()
        print(f"\nDownloading {len(to_download)} archives (~{total_size:.1f} GB compressed)...")

        # Download, convert, clip, merge
        data_chunks = self._download_archives(to_download, local_data)

        if len(data_chunks) == 0:
            print("No data was downloaded successfully.")
            return {}

        # Export per variable
        output_files = self._export(data_chunks, local_data)
        return output_files

    # --- internal methods ---

    def _scan_local_data(self):
        """Scan dst_folder for existing SIM2 files."""
        local_data = pd.DataFrame(
            index=self.var_list,
            columns=['file', 'start_date', 'end_date', 'extent'],
        )
        local_data['extent'] = False

        _, filelist = geo.get_filelist(self.dst_folder, extension=self.extension)
        if len(filelist) == 0:
            return local_data

        sim_pattern = re.compile(r'(.*)_SIM2_')
        for filepath in filelist:
            filename = os.path.split(filepath)[-1]
            match = sim_pattern.findall(filename)
            if not match:
                continue
            var = match[0].replace('_QUOT', '')
            if var not in local_data.index:
                continue

            local_data.loc[var, 'file'] = filepath

            if self.extension == '.nc':
                try:
                    with geo.load(filepath, decode_coords='all', decode_times=True) as ds:
                        if pd.date_range(
                            start=ds.time[0].item(),
                            end=ds.time[-1].item(),
                            freq='D',
                        ).size == ds.time.size:
                            local_data.loc[var, 'start_date'] = pd.to_datetime(ds.time[0].item())
                            local_data.loc[var, 'end_date'] = pd.to_datetime(ds.time[-1].item())

                        res = abs(ds.rio.resolution()[0])
                        ds_ext = np.array(ds.rio.bounds())
                        mask_ext = self._get_mask_extent(ds_ext, res)

                        if (ds_ext[0:2] <= mask_ext[0:2]).any() or (ds_ext[2:4] >= mask_ext[2:4]).any():
                            local_data.loc[var, 'extent'] = True
                        else:
                            print(f"  Local {filename} does not cover desired extent, will re-download")
                            os.remove(filepath)
                except Exception:
                    pass

        # Invalidate rows where extent is insufficient
        local_data.loc[
            local_data.index[local_data['extent'] == False],
            local_data.columns[0:3],
        ] = np.nan

        return local_data

    def _get_mask_extent(self, ds_extent, resolution):
        """Compute target spatial extent from mask or default to France."""
        if self.mask is None:
            return np.array(_FRANCE_EXTENT)

        mask_gdf = geo.load(self.mask)
        if isinstance(mask_gdf, gpd.GeoDataFrame):
            mask_gdf = geo.reproject(mask_gdf, dst_crs=27572)
            mask_ext = mask_gdf.total_bounds.copy()
        elif isinstance(mask_gdf, xr.Dataset):
            mask_gdf = geo.reproject(mask_gdf, dst_crs=27572)
            mask_ext = np.array(mask_gdf.rio.bounds())
        else:
            return np.array(_FRANCE_EXTENT)

        # Snap to SIM2 grid
        for i, func in enumerate([np.floor, np.floor, np.ceil, np.ceil]):
            snap = geo.nearest(
                x=mask_ext[i] if i % 2 == 0 else None,
                y=mask_ext[i] if i % 2 == 1 else None,
                x0=ds_extent[0], y0=ds_extent[1], res=resolution,
            )
            mask_ext[i] = snap
        return mask_ext

    def _determine_downloads(self, local_data):
        """Determine which archive files need to be downloaded."""
        if local_data['file'].isnull().values.any():
            # Missing variables -> download everything in the date range
            return self.available_data.index[
                (self.available_data['end_date'] > self.first_date)
                & (self.available_data['start_date'] < self.last_date)
            ].tolist()

        min_core = local_data['start_date'][self.var_list].max()
        max_core = local_data['end_date'][self.var_list].min()

        to_download = set()
        if self.first_date < min_core:
            idx = self.available_data.index[
                (self.available_data['start_date'] < min_core)
                & (self.available_data['end_date'] > self.first_date)
            ]
            to_download.update(idx)

        if self.last_date > max_core:
            idx = self.available_data.index[
                (self.available_data['end_date'] > max_core)
                & (self.available_data['start_date'] < self.last_date)
            ]
            to_download.update(idx)

        return list(to_download)

    def _download_archives(self, to_download, local_data):
        """Download and parse CSV archives into xr.Dataset chunks."""
        from geop4th.workflows.standardize.standardize_sim2 import standardize_sim2_dataframe

        data_chunks = []

        # Variables that need updating
        var_sublist = local_data.loc[self.var_list].index[
            local_data['start_date'][self.var_list].isna()
            | local_data['end_date'][self.var_list].isna()
        ].tolist()
        # Ensure we also get vars with missing files
        var_sublist += local_data[local_data['file'].isna()].index.tolist()
        var_sublist = list(set(var_sublist))

        # CSV columns to read (secondary vars like PRETOT_Q need their components)
        secondary = SIM2.get('secondary_variables', {})
        csv_vars = []
        for v in self.var_list:
            if v in secondary:
                csv_vars.extend(secondary[v].get('requires', []))
            elif v != 'PRETOT_Q':
                csv_vars.append(v)
            else:
                csv_vars.extend(['PRELIQ_Q', 'PRENEI_Q'])
        csv_vars = list(set(csv_vars))

        for dataname in to_download:
            url = self.available_data.loc[dataname, 'url']
            print(f"\nDownloading {dataname}...")
            print("  (may take several minutes)")

            try:
                response = requests.get(url)
            except Exception as e:
                print(f"  Download error: {e}")
                continue

            if not validate_response(response):
                print(f"  Invalid response for {dataname}")
                continue

            try:
                with gzip.open(BytesIO(response.content), 'rt') as f:
                    df = pd.read_csv(
                        f, sep=';',
                        usecols=['LAMBX', 'LAMBY', 'DATE'] + csv_vars,
                        header=0, decimal='.',
                        parse_dates=['DATE'],
                    )
            except Exception as e:
                print(f"  Parse error: {e}")
                continue

            # Filter date range
            df = df[(df['DATE'] >= self.first_date) & (df['DATE'] <= self.last_date)]
            if df.empty:
                continue

            # Compute secondary variables in the dataframe
            if 'PRETOT_Q' in self.var_list:
                if 'PRELIQ_Q' in df.columns and 'PRENEI_Q' in df.columns:
                    df['PRETOT_Q'] = df['PRELIQ_Q'] + df['PRENEI_Q']

            # Convert to NetCDF via standardizer
            if self.extension == '.nc':
                ds = standardize_sim2_dataframe(df, var_list=self.var_list)
                data_chunks.append(ds)
            else:
                # CSV path: basic renaming
                df.rename(columns={'LAMBX': 'x', 'LAMBY': 'y', 'DATE': 'time'}, inplace=True)
                df[['x', 'y']] = df[['x', 'y']] * 100
                data_chunks.append(df)

            print(f"  OK: {len(df)} rows")

        return data_chunks

    def _export(self, data_chunks, local_data):
        """Clip, merge, and export per variable."""
        output_files = {}

        if self.extension == '.csv':
            return self._export_csv(data_chunks)

        # NetCDF: one file per variable
        if len(data_chunks) == 0:
            return output_files

        sample_vars = list(data_chunks[0].data_vars)
        for var in sample_vars:
            print(f"\nProcessing {var}...")

            # Clip
            clipped = []
            if self.mask is not None:
                mask_ds = geo.load(self.mask)
                if isinstance(mask_ds, gpd.GeoDataFrame):
                    bounds = mask_ds.total_bounds
                    try:
                        mask_crs = mask_ds.crs
                    except AttributeError:
                        mask_crs = None
                elif isinstance(mask_ds, xr.Dataset):
                    mask_ds = mask_ds.where(mask_ds > 0, drop=True)
                    bounds = mask_ds.rio.bounds()
                    try:
                        mask_crs = mask_ds.rio.crs
                    except AttributeError:
                        mask_crs = None
                else:
                    bounds = None
                    mask_crs = None

                for chunk in data_chunks:
                    if bounds is not None:
                        clipped.append(geo.clip(chunk[[var]], bounds=bounds, bounds_crs=mask_crs))
                    else:
                        clipped.append(chunk[[var]])
            else:
                clipped = [chunk[[var]] for chunk in data_chunks]

            # Add existing local files for this variable
            _, varfiles = geo.get_filelist(
                self.dst_folder, extension='.nc', tag='^' + var,
            )
            for lf in varfiles:
                clipped.append(geo.load(os.path.join(self.dst_folder, lf)))

            # Merge
            print(f"  Merging {len(clipped)} chunks...")
            merged = geo.merge(clipped, update_val=True)

            # Remove old files
            for lf in varfiles:
                os.remove(os.path.join(self.dst_folder, lf))

            # Export
            outpath = self.dst_folder / f"{var}_SIM2_{self.first_year}-{self.last_year}.nc"
            print(f"  Exporting to {outpath.name}")
            geo.export(merged, str(outpath))
            output_files[var] = outpath

        return output_files

    def _export_csv(self, data_chunks):
        """Merge and export CSV format."""
        if self.mask is not None:
            clipped = []
            for d in data_chunks:
                clipped.append(geo.clip(d, src_crs=27572, mask=self.mask))
        else:
            clipped = data_chunks

        merged = geo.merge(clipped)
        merged[['x', 'y']] = merged[['x', 'y']] / 100
        merged.rename(columns={'x': 'LAMBX', 'y': 'LAMBY', 'time': 'DATE'}, inplace=True)

        outpath = self.dst_folder / f"QUOT_SIM2_{self.first_year}-{self.last_year}.csv"
        merged.to_csv(outpath, sep=';', header=True, decimal='.', index=False)
        print(f"Exported to {outpath}")
        return {'csv': outpath}


[docs] def download_sim2( dst_folder, variables=None, start_date=None, end_date=None, mask=None, extension='.nc', ): """ Download SIM2 reanalysis data from meteo.data.gouv.fr. Parameters ---------- dst_folder : str or pathlib.Path Output directory. variables : str, list of str, or None Variables to download. Accepts names with or without '_Q' suffix. If None, all variables are downloaded. start_date : str, int, or datetime-like, optional Start of the period (default: 1958). end_date : str, int, or datetime-like, optional End of the period (default: yesterday). mask : str, pathlib.Path, or gpd.GeoDataFrame, optional Spatial mask for clipping. extension : str Output format ('.nc' or '.csv'). Default '.nc'. Returns ------- dict Mapping of variable name -> output file path. """ downloader = SIM2Downloader( dst_folder=dst_folder, variables=variables, start_date=start_date, end_date=end_date, mask=mask, extension=extension, ) return downloader.download()