Source code for download_fr

# -*- coding: utf-8 -*-
"""
Created on Tue Sep 17 16:58:10 2024

@author: Alexandre Kenshilik Coche
@contact: alexandre.co@hotmail.fr

download.py est un ensemble de fonctions permettant le téléchargement
automatique de données françaises nécessaires à la modélisation 
hydrologique des socio-écosystèmes.
Cet outil a notamment vocation à regrouper les données classiques nécessaires
au modèle CWatM utilisé dans le cadre de la 
méthodologie Eau et Territoire (https://eau-et-territoire.org ).
"""

#%% IMPORTATIONS
import os
import re
import requests
import datetime
from io import (BytesIO, StringIO)
import gzip
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import xarray as xr
xr.set_options(keep_attrs = True)
from geop4th import geobricks as geo
from geop4th.workflows.standardize import standardize_fr as stzfr

#%% LIST
def list():
    excluded_funcs = {
        'valid_request',
        'valid_single_request',
        'list',
        '',
        }

    # Inspired from https://www.mostlypython.com/building-a-checklist-of-all-functions-in-a-file/
    import importlib
    name = __name__.split(".")
    folder = ".".join(name[0:-1])
    module = ".".join([name[-1], "py"])
    contents = importlib.resources.read_text(folder, module)
    func_pattern = r".*def ([a-z0-9_]*)\("
    func_list = set(re.findall(func_pattern, contents)).difference(excluded_funcs)
    
    print(f"List of {name[-1]} supported datasets:")
    print('  - ' + '()\n  - '.join(sorted(func_list)) + '()')

#%% Utilitaries to check if a request is valid
[docs] def valid_request(response, file_format = 'geojson', prev_frame = None): """ Utilitary to check if a request is valid. Parameters ---------- response : request.response `request` object that contains a server's response to an HTTP request. This ``response`` variable can be obtained with ``requests.get(<url>)`` file_format : {'csv', 'json', 'geojson'}, default 'geojson' File format in which the data will be retrieved. prev_frame : pandas.DataFrame or geopandas.GeoDataFrame, optional Frame used for recurrence run (when request results are on several pages) Returns ------- isvalid : bool Flag indicating if the request has succeeded. frame : pandas.DataFrame of geopandas.GeoDataFrame Frame contaning the downloaded values. If ``file_format`` is 'geojson', returns a geopandas.GeoDataFrame, otherwise return a pandas.DataFrame (non-geospatialized data). """ isvalid = False frame = prev_frame if (response.status_code == 200): # all correct, one single page _, frame = valid_single_request(response, file_format) elif (response.status_code == 206): # correct but results are on several pages isvalid, frame = valid_single_request(response, file_format) # Safeguard preventing technical limitations of BNPE API (max request size) # BNPE API can not handle more than 20000 results in one request if 'last' in response.links: page_pattern = re.compile("page=(\d*)") n_page = page_pattern.findall(response.links['last']['url']) if len(n_page) > 0: n_results = int(n_page[0]) * frame.shape[0] if n_results > 20000: print(f"\nErr: Up to {n_results} results were asked but BNPE API cannot retrieve more than 20000 results in one request. If you passed a `masks` argument, please provide smaller masks.") return False, None if prev_frame is not None: frame = pd.concat([prev_frame, frame], axis = 0, ignore_index = True).drop_duplicates() if 'next' in response.links: response = requests.get(response.links['next']['url']) _, frame = valid_request(response, file_format, prev_frame = frame) elif (response.status_code == 400): print("Err: Incorrect request") elif (response.status_code == 401): print("Err: Unauthorized") elif (response.status_code == 403): print("Err: Forbidden") elif (response.status_code == 404): print("Err: Not Found") elif (response.status_code == 500): print("Err: Server internal error") # if frame is not None if isinstance(frame, pd.DataFrame): isvalid = True return isvalid, frame
def valid_single_request(response, file_format = 'geojson'): """ Utilitary to check if a request is valid. Parameters ---------- response : request.response `request` object that contains a server's response to an HTTP request. This ``response`` variable can be obtained with ``requests.get(<url>)`` file_format : {'csv', 'json', 'geojson'}, default 'geojson' File format in which the data will be retrieved. Returns ------- isvalid : bool Flag indicating if the request has succeeded. frame : pandas.DataFrame of geopandas.GeoDataFrame Frame contaning the downloaded values. If ``file_format`` is 'geojson', returns a geopandas.GeoDataFrame, otherwise return a pandas.DataFrame (non-geospatialized data). """ # ---- Determine validity of downloaded data isvalid = False if file_format == 'csv': if response.text != '': # not empty isvalid = True elif file_format in ['json', 'geojson']: if response.json()['count'] != 0: # not empty isvalid = True if isvalid: # ---- Retrieve results as a dataframe # Save data content into a DataFrame (or a GeoDataFrame) if file_format == 'csv': frame = pd.read_csv(BytesIO(response.content), sep=";") elif file_format == 'geojson': # For some datasets, the 'GeoJSON' format option is not available, # only classic JSON is retrieved # In these cases, if the user requires a GeoJSON, the JSON dict # needs to be converted to GeoJSON before export : if 'type' in response.json(): if response.json()['type'] == 'FeatureCollection': # Response content is truly a GeoJSON json_to_geojson = False else: json_to_geojson = True else: json_to_geojson = True if json_to_geojson: # Additional step of georefencing are necessary json_df = pd.DataFrame.from_dict(response.json()['data']) geometry = [Point(xy) for xy in zip(json_df.longitude, json_df.latitude)] frame = gpd.GeoDataFrame( json_df, crs = 4326, geometry = geometry) else: frame = gpd.read_file(response.text, driver='GeoJSON') elif file_format == 'json': # =============== useless ===================================================== # data_stations = pd.DataFrame.from_dict( # {i: response.json()['data'][i] for i in range(len(response.json()['data']))}, # orient = 'index') # ============================================================================= frame = pd.DataFrame.from_dict(response.json()['data']) else: frame = None return isvalid, frame #%% BNPE (Données de prélèvements)
[docs] def bnpe(*, dst_folder, start_year = None, end_year = None, masks = None, departments = None, communes = None, file_formats = 'geojson'): """ Function to facilitate the downloading of French water withdrawal data from BNPE API. Parameters ---------- dst_folder : str or pathlib.Path Destination folder in which the downloaded data will be stored. start_year : int, optional Year from which the data will be retrieved. end_year : int, optional Year until which the data will be retrieved. masks : list of-, or single element among str, pathlib.Path, xarray.Dataset, xarray.DataArray or geopandas.GeoDataFrame, optional Mask on which the data will be retrieved. At least one parameter among ``masks``, ``departments`` or ``communes`` should be passed. departments : int or list of int, optional INSEE code(s) of department(s) for which data will be retrieved. At least one parameter among ``masks``, ``departments`` or ``communes`` should be passed. communes : int or list of int, optional INSEE code(s) of commune(s) for which data will be retrieved. At least one parameter among ``masks``, ``departments`` or ``communes`` should be passed. file_formats : str or list of str, {'csv', 'json', 'geojson'}, default 'geojson' File format in which the data will be retrieved. Returns ------- None. Downloaded data is stored in ``dst_folder`` """ # ---- Argument retrieving # Safeguard if (departments is None) & (communes is None) & (masks is None): print("Err: It is required to specify at least one area with the arguments `departments` or `communes` or `masks`") return if start_year is None: start_year = 2008 if end_year is None: end_year = datetime.datetime.today().year if isinstance(start_year, (str, float)): start_year = int(start_year) if end_year is None: # and start_year is not None, see previous case years = [start_year] else: if isinstance(end_year, (str, float)): end_year = int(end_year) years = list(range(start_year, end_year + 1)) if masks is not None: if isinstance(masks, tuple): masks = list(masks) elif not isinstance(masks, list): masks = [masks] if departments is not None: if isinstance(departments, (str, float)): departments = [int(departments)] elif isinstance(departments, int): departments = [departments] elif isinstance(departments, tuple): departments = list(departments) if communes is not None: if isinstance(communes, (str, float)): communes = [int(communes)] elif isinstance(communes, int): communes = [communes] elif isinstance(communes, tuple): communes = list(communes) if isinstance(file_formats, str): file_formats = [file_formats] for i in range(0, len(file_formats)): # file_formats = file_formats.replace('.', '') if file_formats[i][0] == '.': file_formats[i] = file_formats[i][1:] outdir = os.path.join(dst_folder, "originaux") if not os.path.exists(outdir): os.makedirs(outdir) outdir_ouvrages = os.path.join(dst_folder, "originaux", "ouvrages") if not os.path.exists(outdir_ouvrages): os.makedirs(outdir_ouvrages) # ---- Requests print("\nDownloading...") for y in years: y = int(y) print(f"\n . {y}") if departments is not None: for d in departments: d = f"{d:02.0f}" # format '00' for f in file_formats: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques.csv?annee={y}&code_departement={d}&size=5000" outpath = os.path.join(outdir, f"dpmt{d}_{y}.csv") elif f == 'geojson': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques?annee={y}&code_departement={d}&format=geojson&size=5000" outpath = os.path.join(outdir, f"dpmt{d}_{y}.json") elif f == 'json': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques?annee={y}&code_departement={d}&size=5000" outpath = os.path.join(outdir, f"dpmt{d}_{y}.json") response = requests.get(url) isvalid, frame = valid_request(response, f) if isvalid: # Export if f == 'csv': geo.export(frame, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(frame, outpath) if communes is not None: for c in communes: c = f"{c:<05.0f}" # format '000000' for f in file_formats: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques.csv?annee={y}&code_commune_insee={c}&size=5000" outpath = os.path.join(outdir, f"cmne{c}_{y}.csv") elif f == 'geojson': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques?annee={y}&code_commune_insee={c}&format=geojson&size=5000" outpath = os.path.join(outdir, f"cmne{c}_{y}.json") elif f == 'json': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques?annee={y}&code_commune_insee={c}&size=5000" outpath = os.path.join(outdir, f"cmne{c}_{y}.json") response = requests.get(url) isvalid, frame = valid_request(response, f) if isvalid: # Export if f == 'csv': geo.export(frame, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(frame, outpath) if masks is not None: i_mask = 0 for m in masks: i_mask += 1 mask_ds = geo.load_any(m) mask_ds = geo.reproject(mask_ds, dst_crs = 4326) for f in file_formats: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques.csv?annee={y}&bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&size=5000" outpath = os.path.join(outdir, f"mask{i_mask}_{y}.csv") elif f == 'geojson': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques?annee={y}&bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&format=geojson&size=5000" outpath = os.path.join(outdir, f"mask{i_mask}_{y}.json") elif f == 'json': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/chroniques?annee={y}&bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&size=5000" outpath = os.path.join(outdir, f"mask{i_mask}_{y}.json") response = requests.get(url) isvalid, frame = valid_request(response, f) # ======= {draft} to automatically handle too large requests ================== # isvalid, frame, exceeds = valid_request(response, f) # # if exceeds: # masks = [*masks[0:i_mask-1], split(masks[i_mask-1]), *masks[i_mask:]] # bnpe(dst_folder = dst_folder, start_year = start_year, # end_year = end_year, masks = masks, # departments = departments, communes = communes, # file_formats = file_formats) # return # ============================================================================= if isvalid: # In the `mask` case, the data is clipped to the mask if isinstance(frame, gpd.GeoDataFrame): frame = geo.reproject(frame, mask = m) else: # frame is only a pd.DataFrame # first frame is converted to a GeoDataFrame geometry = [Point(xy) for xy in zip(frame.longitude, frame.latitude)] gdf = gpd.GeoDataFrame( frame, crs = 4326, geometry = geometry) # Then it is clipped gdf = geo.reproject(gdf, mask = m) # Finally it is converted back to a DataFrame frame = pd.DataFrame(gdf.drop(columns = 'geometry')) # Export if f == 'csv': geo.export(frame, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(frame, outpath) # Infos sur ouvrage if departments is not None: for d in departments: d = f"{d:02.0f}" # format '00' for f in file_formats: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages.csv?code_departement={d}&size=5000" outpath = os.path.join(outdir_ouvrages, f"dpmt{d}_ouvrages.csv") elif f == 'geojson': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages?code_departement={d}&format=geojson&size=5000" outpath = os.path.join(outdir_ouvrages, f"dpmt{d}_ouvrages.json") elif f == 'json': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages?code_departement={d}&size=5000" outpath = os.path.join(outdir_ouvrages, f"dpmt{d}_ouvrages.json") response = requests.get(url) isvalid, frame = valid_request(response, f) if isvalid: # Export if f == 'csv': geo.export(frame, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(frame, outpath) if communes is not None: for c in communes: c = f"{c:<05.0f}" # format '000000' for f in file_formats: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages.csv?code_commune_insee={c}&size=5000" outpath = os.path.join(outdir_ouvrages, f"cmne{c}_ouvrages.csv") elif f == 'geojson': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages?code_commune_insee={c}&format=geojson&size=5000" outpath = os.path.join(outdir_ouvrages, f"cmne{c}_ouvrages.json") elif f == 'json': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages?code_commune_insee={c}&size=5000" outpath = os.path.join(outdir_ouvrages, f"cmne{c}_ouvrages.json") response = requests.get(url) isvalid, frame = valid_request(response, f) if isvalid: # Export if f == 'csv': geo.export(frame, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(frame, outpath) if masks is not None: i_mask = 0 for m in masks: i_mask += 1 mask_ds = geo.load_any(m) mask_ds = geo.reproject(mask_ds, dst_crs = 4326) for f in file_formats: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages.csv?bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&size=5000" outpath = os.path.join(outdir_ouvrages, f"mask{i_mask}_ouvrages.csv") elif f == 'geojson': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages?bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&format=geojson&size=5000" outpath = os.path.join(outdir_ouvrages, f"mask{i_mask}_ouvrages.json") elif f == 'json': url = rf"https://hubeau.eaufrance.fr/api/v1/prelevements/referentiel/ouvrages?bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&size=5000" outpath = os.path.join(outdir_ouvrages, f"mask{i_mask}_ouvrages.json") response = requests.get(url) isvalid, frame = valid_request(response, f) if isvalid: # In the `mask` case, the data is clipped to the mask if isinstance(frame, gpd.GeoDataFrame): frame = geo.reproject(frame, mask = m) else: # frame is only a pd.DataFrame # first frame is converted to a GeoDataFrame geometry = [Point(xy) for xy in zip(frame.longitude, frame.latitude)] gdf = gpd.GeoDataFrame( frame, crs = 4326, geometry = geometry) # Then it is clipped gdf = geo.reproject(gdf, mask = m) # Finally it is converted back to a DataFrame frame = pd.DataFrame(gdf.drop(columns = 'geometry')) # Export if f == 'csv': geo.export(frame, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(frame, outpath)
#%% Hydrometry (Mesures de débits)
[docs] def hydrometry(*, dst_folder, masks, start_year, end_year = None, quantities = 'QmnJ', file_formats = 'geojson'): """ Function to facilitate the downloading of French hydrometry data from EauFrance API. Parameters ---------- dst_folder : str or pathlib.Path Destination folder in which the downloaded data will be stored. masks : list of-, or single element among str, pathlib.Path, xarray.Dataset, xarray.DataArray or geopandas.GeoDataFrame, optional Mask on which the data will be retrieved. start_year : int, optional Year from which the data will be retrieved. end_year : int, optional Year until which the data will be retrieved. quantities : str or list of str, default 'QmnJ' - 'QmnJ': average daily discharge - 'QmM': average monthly discharge - 'HIXM': maximum instant height per month - 'HIXnJ': maximum instant height per day - 'QINM': minimum instant discharge per month - 'QINnJ': minimum instant discharge per day - 'QixM': maximum instant discharge per month - 'QIXnJ': maximum instant discharge per day file_formats : str or list of str, {'csv', 'json', 'geojson'}, default 'geojson' File format in which the data will be retrieved. Returns ------- None. Downloaded data is stored in ``dst_folder`` """ # ---- Argument retrieving if isinstance(start_year, (str, float)): start_year = int(start_year) if end_year is None: years = [start_year] else: if isinstance(end_year, (str, float)): end_year = int(end_year) years = list(range(start_year, end_year + 1)) if masks is not None: if isinstance(masks, tuple): masks = list(masks) elif not isinstance(masks, list): masks = [masks] if isinstance(file_formats, str): file_formats = [file_formats] for i in range(0, len(file_formats)): # file_formats = file_formats.replace('.', '') if file_formats[i][0] == '.': file_formats[i] = file_formats[i][1:] if isinstance(quantities, str): quantities = [quantities] outdir = os.path.join(dst_folder, "originaux") if not os.path.exists(outdir): os.makedirs(outdir) outdir_stations = os.path.join(dst_folder, "originaux", "stations") if not os.path.exists(outdir_stations): os.makedirs(outdir_stations) # ---- Requests print("\nDownloading...") i_mask = 0 for m in masks: i_mask += 1 mask_ds = geo.load_any(m) mask_ds = geo.reproject(mask_ds, dst_crs = 4326) # List of stations for f in file_formats: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v2/hydrometrie/referentiel/stations.csv?bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&size=10000" outpath = os.path.join(outdir_stations, f"mask{i_mask}_stations.csv") elif f == 'geojson': url = url = rf"https://hubeau.eaufrance.fr/api/v2/hydrometrie/referentiel/stations?bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&format=geojson&size=10000" outpath = os.path.join(outdir_stations, f"mask{i_mask}_stations.json") elif f == 'json': url = url = rf"https://hubeau.eaufrance.fr/api/v2/hydrometrie/referentiel/stations?bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&size=10000" outpath = os.path.join(outdir_stations, f"mask{i_mask}_stations.json") response = requests.get(url) isvalid, data_stations = valid_request(response, f) if isvalid: # Data is clipped to the mask if isinstance(data_stations, gpd.GeoDataFrame): data_stations = geo.reproject(data_stations, mask = m) else: # frame is only a pd.DataFrame # first frame is converted to a GeoDataFrame data_stations.loc[:, ['longitude', 'latitude']] = data_stations['coordLatLon'].str.split(',', expand = True).rename(columns = {0: 'latitude', 1: 'longitude'}).astype(float) geometry = [Point(xy) for xy in zip(data_stations.longitude, data_stations.latitude)] gdf = gpd.GeoDataFrame( data_stations, crs = 4326, geometry = geometry) # Then it is clipped gdf = geo.reproject(gdf, mask = m) # Finally it is converted back to a DataFrame data_stations = pd.DataFrame(gdf.drop(columns = ['geometry', 'latitude', 'longitude'])) # Export if f == 'csv': geo.export(data_stations, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(data_stations, outpath) # Discharge data for y in years: y = int(y) print(f" . {y}") for station_id in data_stations['code_station']: for f in file_formats: for q in quantities: if f == 'csv': url = rf"https://hubeau.eaufrance.fr/api/v2/hydrometrie/obs_elab.csv?code_entite={station_id}&bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&date_debut_obs_elab={y}-01-01&date_fin_obs_elab={y}-12-31&grandeur_hydro_elab={q}&size=20000" outpath = os.path.join(outdir, f"{q}_{station_id}_mask{i_mask}_{y}.csv") # ============ apparently format option is not available ====================== # elif f == 'geojson': # url = rf"https://hubeau.eaufrance.fr/api/v2/hydrometrie/obs_elab?code_entite={station_id}&bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&date_debut_obs_elab={y}-01-01&date_fin_obs_elab={y}-12-31&grandeur_hydro_elab={q}&format=geojson&size=20000" # outpath = os.path.join(outdir, f"{q}_{station_id}_mask{i_mask}_{y}.json") # ============================================================================= elif f in ['geojson', 'json']: url = rf"https://hubeau.eaufrance.fr/api/v2/hydrometrie/obs_elab?code_entite={station_id}&bbox={mask_ds.total_bounds[0]}&bbox={mask_ds.total_bounds[1]}&bbox={mask_ds.total_bounds[2]}&bbox={mask_ds.total_bounds[3]}&date_debut_obs_elab={y}-01-01&date_fin_obs_elab={y}-12-31&grandeur_hydro_elab={q}&size=20000" outpath = os.path.join(outdir, f"{q}_{station_id}_mask{i_mask}_{y}.json") response = requests.get(url) isvalid, frame = valid_request(response, f) if isvalid: # Data is clipped to the mask if isinstance(frame, gpd.GeoDataFrame): frame = geo.reproject(frame, mask = m) else: # frame is only a pd.DataFrame # first frame is converted to a GeoDataFrame geometry = [Point(xy) for xy in zip(frame.longitude, frame.latitude)] gdf = gpd.GeoDataFrame( frame, crs = 4326, geometry = geometry) # Then it is clipped gdf = geo.reproject(gdf, mask = m) # Finally it is converted back to a DataFrame frame = pd.DataFrame(gdf.drop(columns = 'geometry')) # Export if f == 'csv': geo.export(frame, outpath, sep = ';', encoding = 'utf-8', index = False) else: geo.export(frame, outpath)
#%% SIM2 (Données climatiques de réanalyse historique MétéoFrance)
[docs] def sim2(*, dst_folder: str, var_list = None, first_year: int=None, last_year: int=None, extension = '.nc', mask = None): """ Depuis le 13/12/2023 ces données sont en accès ouvert sur https://meteo.data.gouv.fr (onglet "Données climatologiques de référence pour le changement climatique", puis "Données changement climatique - SIM quotidienne" : https://meteo.data.gouv.fr/datasets/6569b27598256cc583c917a7 ) Parameters ---------- dst_folder : str or pathlib.Path Path to the output folder containing the clipped SIM2 netCDF files. According to the files already present in this folder, only the necessary complementary files will be downloaded. var_list : iterable, optional List of variable names. Works with SIM2 variable names: 'DRAINC_Q' | 'RUNC_Q' | 'EVAP_Q' | 'PRETOT_Q' | 'T_Q' ... The '_Q' is optional ('DRAINC', 'RUNC'... also work). If None, all variables will be retrieved. first_year : int, optional Data will be extracted from 1st January of first_year to 31st December of last_year. If None, the earliest available date will be considered. last_year : int, optional End date of data to be extracted. If None, the current date of the day will be used instead. extension : {'.nc', '.csv'}, default '.nc' Output file type. mask : str or pathlib.Path, optional Shapefile path to indicate how to clip the CSV or netCDF files that will be exported to the ``dst_folder`` folder. Note: The only purpose of using ``mask`` at this step is to save space on disk. Returns ------- None. Create or update the netcdf files. """ # ---- Initialization print("\nInitializing...") # Variables if var_list is None: var_list = ['PRENEI_Q', 'PRELIQ_Q', 'T_Q', 'FF_Q', 'Q_Q', 'DLI_Q', 'SSI_Q', 'HU_Q', 'EVAP_Q', 'ETP_Q', 'PE_Q', 'SWI_Q', 'DRAINC_Q', 'RUNC_Q', 'RESR_NEIGE_Q', 'RESR_NEIGE6_Q', 'HTEURNEIGE_Q', 'HTEURNEIGE6_Q', 'HTEURNEIGEX_Q', 'SNOW_FRAC_Q', 'ECOULEMENT_Q', 'WG_RACINE_Q', 'WGI_RACINE_Q', 'TINF_H_Q', 'TSUP_H_Q'] # all variables # ============================================================================= # if ('PRETOT' in var_list) | ('PRETOT_Q' in var_list): # var_list = var_list + ['PRENEI', 'PRELIQ'] # ============================================================================= for i in range(0, len(var_list)): if var_list[i][-2:] != '_Q': var_list[i] = var_list[i] + '_Q' var_sublist = [] # Folders if not os.path.isdir(dst_folder): os.makedirs(dst_folder) if extension[0] != '.': extension = '.' + extension # Dates if first_year is None: first_year = 1958 first_date = pd.to_datetime(f"{first_year}-01-01", format = "%Y-%m-%d") if last_year is None: last_date = pd.to_datetime('today').normalize() last_year = last_date.year else: last_date = pd.to_datetime(f"{last_year}-12-31", format = "%Y-%m-%d") # ============================================================================= # # Mask # if mask is not None: # if not os.path.splitext(mask)[-1] == '.shp': # print("Err: The mask value should point to a .shp file. Otherwise, use mask=None to deactivate clipping") # return # ============================================================================= france_extent = (56000.0, 1613000.0, 1200000.0, 2685000.0) # whole France # limits for SIM2 # data in epsg:2154 data_to_merge = [] # ---- Determine which data needs to be downloaded # Data already available for each variable local_data = pd.DataFrame(index = var_list, columns = ['file', 'start_date', 'end_date', 'extent']) local_data.extent = False sim_pattern = re.compile('(.*)_SIM2_') _, filelist = geo.get_filelist(dst_folder, extension = extension) # ============================================================================= # filelist = [os.path.join(dst_folder, f) \ # for f in os.listdir(dst_folder) \ # if os.path.isfile(os.path.join(dst_folder, f))] # ============================================================================= if len(filelist) > 0: # folder is not empty for file in filelist: filename = os.path.split(file)[-1] sim2_match = sim_pattern.findall(filename) var = sim2_match[0].replace("_QUOT", "") local_data.loc[var, 'file'] = file remove_file = False if extension == '.csv': df_temp = pd.read_csv(os.path.join(dst_folder, file), sep = ';', # ============================================================================= # usecols = ['LAMBX', 'LAMBY', 'DATE'] + var_list, # ============================================================================= header = 0, decimal = '.', parse_dates = ['DATE'], # date_format='%Y%m%d', # Not available before pandas 2.0.0 ) # Variables if len(set(var_list).intersection(set(df_temp.columns))) < len(var_list): print(f" Local data {os.path.split(file)[-1]} does not include all desired variables") remove_file = True # the file will be deleted (outside this section) # Dates if pd.date_range(start = df_temp.DATE.iloc[0], end = df_temp.DATE.iloc[-1], freq = 'D').size == df_temp.DATE.size: # all time values are contiguous local_data.loc[var, 'start_date'] = pd.to_datetime(df_temp.DATE.iloc[0]) local_data.loc[var, 'end_date'] = pd.to_datetime(df_temp.DATE.iloc[-1]) elif extension == '.nc': with geo.load_any(os.path.join(dst_folder, file), decode_coords = 'all', decode_times = True) as ds_temp: # Dates if pd.date_range(start = ds_temp.time[0].item(), end = ds_temp.time[-1].item(), freq = 'D').size == ds_temp.time.size: # all time values are contiguous local_data.loc[var, 'start_date'] = pd.to_datetime(ds_temp.time[0].item()) local_data.loc[var, 'end_date'] = pd.to_datetime(ds_temp.time[-1].item()) # Spatial extent resolution = abs(ds_temp.rio.resolution()[0]) ds_extent = np.array(ds_temp.rio.bounds()) if mask is not None: mask_gdf = geo.load_any(mask) mask_gdf = geo.reproject(mask_gdf, dst_crs = ds_temp.rio.crs) # mask_extent = mask_gdf.buffer(resolution).total_bounds if isinstance(mask_gdf, gpd.GeoDataFrame): mask_extent = mask_gdf.total_bounds elif isinstance(mask_gdf, xr.Dataset): mask_extent = mask_gdf.rio.bounds() mask_extent[0] = geo.nearest( x = mask_extent[0], x0 = ds_extent[0], y0 = ds_extent[1], res = resolution) if mask_extent[0] > mask_gdf.total_bounds[0]: mask_extent[0] -= resolution mask_extent[1] = geo.nearest( y = mask_extent[1], x0 = ds_extent[0], y0 = ds_extent[1], res = resolution) if mask_extent[1] > mask_gdf.total_bounds[1]: mask_extent[1] -= resolution mask_extent[2] = geo.nearest( x = mask_extent[2], x0 = ds_extent[0], y0 = ds_extent[1], res = resolution) if mask_extent[2] < mask_gdf.total_bounds[2]: mask_extent[2] += resolution mask_extent[3] = geo.nearest( y = mask_extent[3], x0 = ds_extent[0], y0 = ds_extent[1], res = resolution) if mask_extent[3] < mask_gdf.total_bounds[3]: mask_extent[3] += resolution else: mask_extent = france_extent # whole France if (ds_extent[0:2] <= mask_extent[0:2]).any() | (ds_extent[2:4] >= mask_extent[2:4]).any() : local_data.loc[var, 'extent'] = True else: print(f" Local data {os.path.split(file)[-1]} does not cover desired spatial extent") remove_file = True # the file will be deleted (outside this 'with' section) if remove_file: os.remove(os.path.join(dst_folder, file)) # local_data.iloc[:, 0:3][local_data.extent == True] = np.nan local_data.loc[local_data.index[local_data.extent == False], local_data.columns[0:3]] = np.nan # ---- Download print("\nDownloading...") # Download only the necessary data files from MeteoFrance API # Until the access to SIM2 data is implemented through the Météo-France's # API (https://portail-api.meteofrance.fr), the current stable urls # are used. stable_urls = { 'QUOT_SIM2_1958-1959': ('https://www.data.gouv.fr/fr/datasets/r/5dfb33b3-fae5-4d0e-882d-7db74142bcae', 0.16, pd.to_datetime('1958-08-01', format = "%Y-%m-%d"), pd.to_datetime('1959-12-31', format = "%Y-%m-%d")), 'QUOT_SIM2_1960-1969': ('https://www.data.gouv.fr/fr/datasets/r/eb0d6e42-cee6-4d7c-bc5b-646be4ced72e', 1.1, pd.to_datetime('1960-01-01', format = "%Y-%m-%d"), pd.to_datetime('1969-12-31', format = "%Y-%m-%d")), 'QUOT_SIM2_1970-1979': ('https://www.data.gouv.fr/fr/datasets/r/33417617-c0dd-4513-804e-c3f563cb81b4', 1.1, pd.to_datetime('1970-01-01', format = "%Y-%m-%d"), pd.to_datetime('1979-12-31', format = "%Y-%m-%d")), 'QUOT_SIM2_1980-1989': ('https://www.data.gouv.fr/fr/datasets/r/08ad5936-cb9e-4284-a6fc-36b29aca9607', 1.1, pd.to_datetime('1980-01-01', format = "%Y-%m-%d"), pd.to_datetime('1989-12-31', format = "%Y-%m-%d")), 'QUOT_SIM2_1990-1999': ('https://www.data.gouv.fr/fr/datasets/r/ad584d65-7d2d-4ff1-bc63-4f93357ed196', 1.1, pd.to_datetime('1990-01-01', format = "%Y-%m-%d"), pd.to_datetime('1999-12-31', format = "%Y-%m-%d")), 'QUOT_SIM2_2000-2009': ('https://www.data.gouv.fr/fr/datasets/r/10d2ce77-5c3b-44f8-bb46-4df27ed48595', 1.1, pd.to_datetime('2000-01-01', format = "%Y-%m-%d"), pd.to_datetime('2009-12-31', format = "%Y-%m-%d")), 'QUOT_SIM2_2010-2019': ('https://www.data.gouv.fr/fr/datasets/r/da6cd598-498b-4e39-96ea-fae89a4a8a46', 1.1, pd.to_datetime('2010-01-01', format = "%Y-%m-%d"), pd.to_datetime('2019-12-31', format = "%Y-%m-%d")), 'QUOT_SIM2_latest_period': ('https://www.data.gouv.fr/fr/datasets/r/92065ec0-ea6f-4f5e-8827-4344179c0a7f', 1.1, pd.to_datetime('2020-01-01', format = "%Y-%m-%d"), (pd.to_datetime('today').normalize() - pd.Timedelta(1, 'D')).replace(day = 1) - pd.Timedelta(1, 'D')), 'QUOT_SIM2_latest_days': (#'https://www.data.gouv.fr/fr/datasets/r/ff8e9fc6-d269-45e8-a3c3-a738195ea92a', 'https://www.data.gouv.fr/fr/datasets/r/adcca99a-6db0-495a-869f-40c888174a57', 0.1, (pd.to_datetime('today').normalize() - pd.Timedelta(1, 'D')).replace(day = 1), pd.to_datetime('today').normalize() - pd.Timedelta(1, 'D')), } available_data = pd.DataFrame.from_dict( data = stable_urls, orient = 'index', columns = ['url', 'size_Go', 'start_date', 'end_date']) ### Identify which files will be needed # ------------------------------------- # If there is no netcdf file already if local_data.file.isnull().values.any(): # Then all the data files will be downloaded to_download = available_data.index[ (available_data.end_date > first_date) \ & (available_data.start_date < last_date)] else: # The "core period" is the period that is covered by local data for # specified variables min_core_date = local_data.start_date[var_list].max() max_core_date = local_data.end_date[var_list].min() if first_date < min_core_date: inf_idx = available_data.index[(available_data.start_date < min_core_date) & (available_data.end_date > first_date)] else: inf_idx = [] if last_date > max_core_date: sup_idx = available_data.index[(available_data.end_date > max_core_date) & (available_data.start_date < last_date)] else: sup_idx = [] to_download = list(set(inf_idx).union(set(sup_idx))) # Files to download to cover the times # (Note that if the specified spatial extent is not covered, the files have been previously deleted in __init__()) if len(to_download) > 0: # print(f"The following .csv datasets will be downoladed: {', '.join([dataname + '(' + self.available_data.loc[dataname, 'size_Go'] + ')' for dataname in to_download])}") ram_space = available_data.loc[to_download, 'size_Go'].max() disk_space = available_data.loc[to_download, 'size_Go'].sum()/2.5*len(var_list) \ - sum(os.path.getsize(os.path.join(dst_folder, f)) \ for f in os.listdir(dst_folder) \ if os.path.isfile(os.path.join(dst_folder, f)))/1073741824 disk_space = np.max([0, disk_space]) print(f"The following CSV datasets will be downloaded to RAM and exported to netCDF files to cover the required period and area: {', '.join(to_download)}") print(f"(required RAM: < {ram_space} Go, required space: < {disk_space:.2f} Go)") ### Download the required files # ----------------------------- if len(to_download) > 0: for dataname in to_download: print(f"\nDownloading {dataname}...") print(" (can take several min, depending on internet speed)") response = requests.get(available_data.loc[dataname, 'url']) if (response.status_code == 200) & (os.path.splitext(response.url)[-1] != '.tmp'): # Decompress gzip content with gzip.open(BytesIO(response.content), 'rt') as f: # Determine variables to extract in the current file var_sublist = local_data.loc[var_list].index[ (local_data.start_date[var_list] > available_data.loc[dataname, 'start_date']) \ | (local_data.end_date[var_list] < available_data.loc[dataname, 'end_date'])] var_sublist = var_sublist.to_list() + local_data[local_data.file.isnull()].index.to_list() # Replace 'PRETOT_Q' with 'PRELIQ_Q' and 'PRENEI_Q' if needed var_list_csv = [] for v in var_list: if v != 'PRETOT_Q': var_list_csv += [v] else: var_list_csv += ['PRELIQ_Q', 'PRENEI_Q'] # Read .csv file and export desired variables df = pd.read_csv(f, sep = ';', usecols = ['LAMBX', 'LAMBY', 'DATE'] + var_list_csv, header = 0, decimal = '.', parse_dates = ['DATE'], # date_format='%Y%m%d', # Not available before pandas 2.0.0 ) df = df[(df.DATE >= first_date) & (df.DATE <= last_date)] # Cumulated values (day 1) are summed from 06:00 UTC (day 1) to 06:00 UTC (day 2) # Therefore, days correspond to Central Standard Time days. # ============================================================================= # df['DATE'] = df['DATE'].tz_localize('Etc/GMT-6') # ============================================================================= if 'PRETOT_Q' in var_list: df['PRETOT_Q'] = df['PRELIQ_Q'] + df['PRENEI_Q'] if not 'PRELIQ_Q' in var_list: df.drop(columns = 'PRELIQ_Q', inplace = True) if not 'PRENEI_Q' in var_list: df.drop(columns = 'PRENEI_Q', inplace = True) if extension == '.csv': # Standardization step necessary for further clipping # (this step is realized in stzfr.sim2(df) when extension == '.nc' (see below)) df.rename(columns = {'LAMBX': 'x', 'LAMBY': 'y', 'DATE': 'time'}, inplace = True) df[['x', 'y']] = df[['x', 'y']]*100 # convert hm to m data_to_merge.append(df) elif extension == '.nc': ds = stzfr.sim2(df) data_to_merge.append(ds) else: print(f" *****\n Error while downloading the file {dataname}.csv\n *****") else: print("No additional original dataset needs to be downloaded.") return # Export preparation name_pattern = re.compile("(.*)\d{4,6}-\d{4,6}") basename = name_pattern.findall(dataname) if len(basename) > 0: basename = basename[0] else: basename = 'QUOT_SIM2_' outpath = os.path.join(dst_folder, basename + f"{first_year}-{last_year}") if extension == '.csv': # ============================================================================= # if mask is not None: # print(f"Warning: As `extension` for output is {extension}, the `mask` will not be taken into account.") # print("If you want to save the results clipped over the specified mask, please pass `extension = '.nc'`") # ============================================================================= # ---- Clip data clipped_df = [] if mask is not None: maskname = os.path.splitext(os.path.split(mask)[-1])[0] print(f" _ clipping on {maskname}") i = 0 for d in data_to_merge: i += 1 print(f" . {i}/{len(data_to_merge)}") clipped_df.append(geo.clip(d, src_crs = 27572, mask = mask)) else: clipped_df = data_to_merge # ---- Merge pandas DataFrames print("Merging...") merged_df = geo.merge_data(clipped_df) # Revert temporary corrections on x and y dimension merged_df[['x', 'y']] = merged_df[['x', 'y']]/100 # convert hm to m merged_df.rename(columns = {'x': 'LAMBX', 'y': 'LAMBY', 'time': 'DATE'}, inplace = True) # ---- Export print("\nExporting...") print(f" _ to {outpath}.csv") merged_df.to_csv(outpath + '.csv', sep = ';', header = True, decimal = '.', index = False) elif extension == '.nc': # Clip and merge files in folder # ============================================================================= # if 'PRETOT' in data_to_merge[0]: # var_list = var_list + ['PRETOT'] # ============================================================================= for var in data_to_merge[0].data_vars: print(f"\nProcessing {var}...") # ---- Clip data clipped_ds = [] if mask is not None: maskname = os.path.splitext(os.path.split(mask)[-1])[0] mask_ds = geo.load_any(mask) if isinstance(mask_ds, xr.Dataset): mask_ds = mask_ds.where(mask_ds > 0, drop = True) bounds = mask_ds.rio.bounds() try: mask_crs = mask_ds.rio.crs except: mask_crs = None elif isinstance(mask_ds, gpd.GeoDataFrame): bounds = mask_ds.total_bounds try: mask_crs = mask_ds.crs except: mask_crs = None print(f" _ clipping on {maskname}") i = 0 for d in data_to_merge: i += 1 print(f" . {i}/{len(data_to_merge)}") clipped_ds.append(geo.clip(d[[var]], bounds = bounds, bounds_crs = mask_crs)) else: clipped_ds = data_to_merge # ---- Merge netCDF files # Add local files _, varfilelist = geo.get_filelist(dst_folder, extension = ".nc", tag = '^' + var) # it is necessary to add '^' before the string so that it will detect # only the files that start with var. To avoid confusion between # 'T_Q' and 'PRETOT_Q' (which also contains 'T_Q') for localfile in varfilelist: clipped_ds.append(geo.load_any(os.path.join(dst_folder, localfile))) print(f" _ merging {len(clipped_ds)} files") merged_ds = geo.merge_data(clipped_ds, update_val = True) # update_val = True because sometimes SIM2 original datasets can overlap for localfile in varfilelist: os.remove(os.path.join(dst_folder, localfile)) # ---- Export print("\nExporting...") var_outpath = os.path.join(os.path.split(outpath)[0], f"{var}_" + os.path.split(outpath)[-1]) + ".nc" print(f" _ to {var_outpath}") geo.export(merged_ds, var_outpath)