# -*- coding: utf-8 -*-
"""
SIM2 data download from Météo-France (meteo.data.gouv.fr).
Downloads daily SAFRAN-ISBA-MODCOU reanalysis data as gzipped CSV
archives, converts them to NetCDF using the SIM2 standardizer,
and merges with any existing local data.
Created on 2026-03-11
@author: Bastien Boivin
@contact: bastien.boivin@proton.me
"""
import os
import re
import logging
import pathlib
from io import BytesIO
import gzip
from typing import Union, List, Optional, Dict
import numpy as np
import pandas as pd
import geopandas as gpd
import xarray as xr
xr.set_options(keep_attrs=True)
import requests
from geop4th import geobricks as geo
from geop4th.datasets import (
SIM2,
get_variables,
get_short_to_long_name_mapping,
)
from geop4th.download._common import (
validate_response,
parse_date,
resolve_area,
)
logger = logging.getLogger(__name__)
# SIM2 grid parameters (EPSG:2154)
_FRANCE_EXTENT = (56000.0, 1613000.0, 1200000.0, 2685000.0)
_SIM2_RESOLUTION = 8000 # metres
def _resolve_dynamic_dates(source_info):
"""Resolve dynamic date placeholders in download source config."""
start = source_info['start_date']
end = source_info['end_date']
today = pd.to_datetime('today').normalize()
yesterday = today - pd.Timedelta(1, 'D')
first_of_month = yesterday.replace(day=1)
end_of_prev_month = first_of_month - pd.Timedelta(1, 'D')
if start == 'dynamic_current_month':
start = first_of_month
else:
start = pd.to_datetime(start)
if end == 'dynamic_yesterday':
end = yesterday
elif end == 'dynamic_latest_month':
end = end_of_prev_month
else:
end = pd.to_datetime(end)
return start, end
def _build_available_data():
"""Build a DataFrame of available SIM2 download sources."""
rows = {}
for key, info in SIM2['download_sources'].items():
start, end = _resolve_dynamic_dates(info)
rows[key] = {
'url': info['url'],
'size_gb': info['size_gb'],
'start_date': start,
'end_date': end,
}
return pd.DataFrame.from_dict(rows, orient='index')
def _normalize_var_list(var_list):
"""Ensure all variable names end with '_Q'."""
if var_list is None:
all_vars = get_variables(SIM2, variable_type='temporal_variables')
var_list = list(all_vars.keys())
# Add secondary variables
for sec_name in SIM2.get('secondary_variables', {}):
if sec_name not in var_list:
var_list.append(sec_name)
return var_list
if isinstance(var_list, str):
var_list = [var_list]
else:
var_list = list(var_list)
# Resolve aliases
aliases = SIM2.get('name_aliases', {})
normalized = []
for v in var_list:
if v in aliases:
normalized.append(aliases[v])
elif not v.endswith('_Q'):
normalized.append(v + '_Q')
else:
normalized.append(v)
return normalized
class SIM2Downloader:
"""
Download SIM2 reanalysis data from meteo.data.gouv.fr.
Parameters
----------
dst_folder : str or pathlib.Path
Output directory. Existing files are detected and only
missing time periods are downloaded.
variables : str, list of str, or None
Variable names (with or without the '_Q' suffix).
If None, all variables are downloaded.
start_date : str, int, or pd.Timestamp
Start of the period. Supports 'YYYY', 'YYYY-MM', 'YYYY-MM-DD'.
end_date : str, int, or pd.Timestamp, optional
End of the period. Defaults to yesterday.
mask : str, pathlib.Path, or gpd.GeoDataFrame, optional
Spatial mask to clip data to a subregion (saves disk space).
extension : str
Output format ('.nc' or '.csv'). Default '.nc'.
"""
def __init__(
self,
dst_folder,
variables=None,
start_date=None,
end_date=None,
mask=None,
extension='.nc',
):
self.dst_folder = pathlib.Path(dst_folder)
self.dst_folder.mkdir(parents=True, exist_ok=True)
self.var_list = _normalize_var_list(variables)
self.extension = extension if extension.startswith('.') else '.' + extension
self.mask = mask
# Date range
if start_date is None:
self.first_date = pd.to_datetime('1958-08-01')
else:
self.first_date = parse_date(start_date, is_end_date=False)
if end_date is None:
self.last_date = pd.to_datetime('today').normalize() - pd.Timedelta(1, 'D')
else:
self.last_date = parse_date(end_date, is_end_date=True)
self.first_year = self.first_date.year
self.last_year = self.last_date.year
# Build source table
self.available_data = _build_available_data()
def download(self):
"""
Execute the download workflow.
Returns
-------
dict
Mapping of variable name -> output file path.
"""
print("\nInitializing SIM2 download...")
print(f" Variables : {len(self.var_list)}")
print(f" Period : {self.first_date.date()} to {self.last_date.date()}")
print(f" Output : {self.dst_folder}")
# Scan existing local data
local_data = self._scan_local_data()
# Determine which archives need downloading
to_download = self._determine_downloads(local_data)
if len(to_download) == 0:
print("No additional data needs to be downloaded.")
return {}
total_size = self.available_data.loc[to_download, 'size_gb'].sum()
print(f"\nDownloading {len(to_download)} archives (~{total_size:.1f} GB compressed)...")
# Download, convert, clip, merge
data_chunks = self._download_archives(to_download, local_data)
if len(data_chunks) == 0:
print("No data was downloaded successfully.")
return {}
# Export per variable
output_files = self._export(data_chunks, local_data)
return output_files
# --- internal methods ---
def _scan_local_data(self):
"""Scan dst_folder for existing SIM2 files."""
local_data = pd.DataFrame(
index=self.var_list,
columns=['file', 'start_date', 'end_date', 'extent'],
)
local_data['extent'] = False
_, filelist = geo.get_filelist(self.dst_folder, extension=self.extension)
if len(filelist) == 0:
return local_data
sim_pattern = re.compile(r'(.*)_SIM2_')
for filepath in filelist:
filename = os.path.split(filepath)[-1]
match = sim_pattern.findall(filename)
if not match:
continue
var = match[0].replace('_QUOT', '')
if var not in local_data.index:
continue
local_data.loc[var, 'file'] = filepath
if self.extension == '.nc':
try:
with geo.load(filepath, decode_coords='all', decode_times=True) as ds:
if pd.date_range(
start=ds.time[0].item(),
end=ds.time[-1].item(),
freq='D',
).size == ds.time.size:
local_data.loc[var, 'start_date'] = pd.to_datetime(ds.time[0].item())
local_data.loc[var, 'end_date'] = pd.to_datetime(ds.time[-1].item())
res = abs(ds.rio.resolution()[0])
ds_ext = np.array(ds.rio.bounds())
mask_ext = self._get_mask_extent(ds_ext, res)
if (ds_ext[0:2] <= mask_ext[0:2]).any() or (ds_ext[2:4] >= mask_ext[2:4]).any():
local_data.loc[var, 'extent'] = True
else:
print(f" Local {filename} does not cover desired extent, will re-download")
os.remove(filepath)
except Exception:
pass
# Invalidate rows where extent is insufficient
local_data.loc[
local_data.index[local_data['extent'] == False],
local_data.columns[0:3],
] = np.nan
return local_data
def _get_mask_extent(self, ds_extent, resolution):
"""Compute target spatial extent from mask or default to France."""
if self.mask is None:
return np.array(_FRANCE_EXTENT)
mask_gdf = geo.load(self.mask)
if isinstance(mask_gdf, gpd.GeoDataFrame):
mask_gdf = geo.reproject(mask_gdf, dst_crs=27572)
mask_ext = mask_gdf.total_bounds.copy()
elif isinstance(mask_gdf, xr.Dataset):
mask_gdf = geo.reproject(mask_gdf, dst_crs=27572)
mask_ext = np.array(mask_gdf.rio.bounds())
else:
return np.array(_FRANCE_EXTENT)
# Snap to SIM2 grid
for i, func in enumerate([np.floor, np.floor, np.ceil, np.ceil]):
snap = geo.nearest(
x=mask_ext[i] if i % 2 == 0 else None,
y=mask_ext[i] if i % 2 == 1 else None,
x0=ds_extent[0], y0=ds_extent[1], res=resolution,
)
mask_ext[i] = snap
return mask_ext
def _determine_downloads(self, local_data):
"""Determine which archive files need to be downloaded."""
if local_data['file'].isnull().values.any():
# Missing variables -> download everything in the date range
return self.available_data.index[
(self.available_data['end_date'] > self.first_date)
& (self.available_data['start_date'] < self.last_date)
].tolist()
min_core = local_data['start_date'][self.var_list].max()
max_core = local_data['end_date'][self.var_list].min()
to_download = set()
if self.first_date < min_core:
idx = self.available_data.index[
(self.available_data['start_date'] < min_core)
& (self.available_data['end_date'] > self.first_date)
]
to_download.update(idx)
if self.last_date > max_core:
idx = self.available_data.index[
(self.available_data['end_date'] > max_core)
& (self.available_data['start_date'] < self.last_date)
]
to_download.update(idx)
return list(to_download)
def _download_archives(self, to_download, local_data):
"""Download and parse CSV archives into xr.Dataset chunks."""
from geop4th.workflows.standardize.standardize_sim2 import standardize_sim2_dataframe
data_chunks = []
# Variables that need updating
var_sublist = local_data.loc[self.var_list].index[
local_data['start_date'][self.var_list].isna()
| local_data['end_date'][self.var_list].isna()
].tolist()
# Ensure we also get vars with missing files
var_sublist += local_data[local_data['file'].isna()].index.tolist()
var_sublist = list(set(var_sublist))
# CSV columns to read (secondary vars like PRETOT_Q need their components)
secondary = SIM2.get('secondary_variables', {})
csv_vars = []
for v in self.var_list:
if v in secondary:
csv_vars.extend(secondary[v].get('requires', []))
elif v != 'PRETOT_Q':
csv_vars.append(v)
else:
csv_vars.extend(['PRELIQ_Q', 'PRENEI_Q'])
csv_vars = list(set(csv_vars))
for dataname in to_download:
url = self.available_data.loc[dataname, 'url']
print(f"\nDownloading {dataname}...")
print(" (may take several minutes)")
try:
response = requests.get(url)
except Exception as e:
print(f" Download error: {e}")
continue
if not validate_response(response):
print(f" Invalid response for {dataname}")
continue
try:
with gzip.open(BytesIO(response.content), 'rt') as f:
df = pd.read_csv(
f, sep=';',
usecols=['LAMBX', 'LAMBY', 'DATE'] + csv_vars,
header=0, decimal='.',
parse_dates=['DATE'],
)
except Exception as e:
print(f" Parse error: {e}")
continue
# Filter date range
df = df[(df['DATE'] >= self.first_date) & (df['DATE'] <= self.last_date)]
if df.empty:
continue
# Compute secondary variables in the dataframe
if 'PRETOT_Q' in self.var_list:
if 'PRELIQ_Q' in df.columns and 'PRENEI_Q' in df.columns:
df['PRETOT_Q'] = df['PRELIQ_Q'] + df['PRENEI_Q']
# Convert to NetCDF via standardizer
if self.extension == '.nc':
ds = standardize_sim2_dataframe(df, var_list=self.var_list)
data_chunks.append(ds)
else:
# CSV path: basic renaming
df.rename(columns={'LAMBX': 'x', 'LAMBY': 'y', 'DATE': 'time'}, inplace=True)
df[['x', 'y']] = df[['x', 'y']] * 100
data_chunks.append(df)
print(f" OK: {len(df)} rows")
return data_chunks
def _export(self, data_chunks, local_data):
"""Clip, merge, and export per variable."""
output_files = {}
if self.extension == '.csv':
return self._export_csv(data_chunks)
# NetCDF: one file per variable
if len(data_chunks) == 0:
return output_files
sample_vars = list(data_chunks[0].data_vars)
for var in sample_vars:
print(f"\nProcessing {var}...")
# Clip
clipped = []
if self.mask is not None:
mask_ds = geo.load(self.mask)
if isinstance(mask_ds, gpd.GeoDataFrame):
bounds = mask_ds.total_bounds
try:
mask_crs = mask_ds.crs
except AttributeError:
mask_crs = None
elif isinstance(mask_ds, xr.Dataset):
mask_ds = mask_ds.where(mask_ds > 0, drop=True)
bounds = mask_ds.rio.bounds()
try:
mask_crs = mask_ds.rio.crs
except AttributeError:
mask_crs = None
else:
bounds = None
mask_crs = None
for chunk in data_chunks:
if bounds is not None:
clipped.append(geo.clip(chunk[[var]], bounds=bounds, bounds_crs=mask_crs))
else:
clipped.append(chunk[[var]])
else:
clipped = [chunk[[var]] for chunk in data_chunks]
# Add existing local files for this variable
_, varfiles = geo.get_filelist(
self.dst_folder, extension='.nc', tag='^' + var,
)
for lf in varfiles:
clipped.append(geo.load(os.path.join(self.dst_folder, lf)))
# Merge
print(f" Merging {len(clipped)} chunks...")
merged = geo.merge(clipped, update_val=True)
# Remove old files
for lf in varfiles:
os.remove(os.path.join(self.dst_folder, lf))
# Export
outpath = self.dst_folder / f"{var}_SIM2_{self.first_year}-{self.last_year}.nc"
print(f" Exporting to {outpath.name}")
geo.export(merged, str(outpath))
output_files[var] = outpath
return output_files
def _export_csv(self, data_chunks):
"""Merge and export CSV format."""
if self.mask is not None:
clipped = []
for d in data_chunks:
clipped.append(geo.clip(d, src_crs=27572, mask=self.mask))
else:
clipped = data_chunks
merged = geo.merge(clipped)
merged[['x', 'y']] = merged[['x', 'y']] / 100
merged.rename(columns={'x': 'LAMBX', 'y': 'LAMBY', 'time': 'DATE'}, inplace=True)
outpath = self.dst_folder / f"QUOT_SIM2_{self.first_year}-{self.last_year}.csv"
merged.to_csv(outpath, sep=';', header=True, decimal='.', index=False)
print(f"Exported to {outpath}")
return {'csv': outpath}
[docs]
def download_sim2(
dst_folder,
variables=None,
start_date=None,
end_date=None,
mask=None,
extension='.nc',
):
"""
Download SIM2 reanalysis data from meteo.data.gouv.fr.
Parameters
----------
dst_folder : str or pathlib.Path
Output directory.
variables : str, list of str, or None
Variables to download. Accepts names with or without '_Q' suffix.
If None, all variables are downloaded.
start_date : str, int, or datetime-like, optional
Start of the period (default: 1958).
end_date : str, int, or datetime-like, optional
End of the period (default: yesterday).
mask : str, pathlib.Path, or gpd.GeoDataFrame, optional
Spatial mask for clipping.
extension : str
Output format ('.nc' or '.csv'). Default '.nc'.
Returns
-------
dict
Mapping of variable name -> output file path.
"""
downloader = SIM2Downloader(
dst_folder=dst_folder,
variables=variables,
start_date=start_date,
end_date=end_date,
mask=mask,
extension=extension,
)
return downloader.download()