Source code for cwatm

# -*- coding: utf-8 -*-
"""
Created on Tue Sep 17 16:58:10 2024

@author: Alexandre Kenshilik Coche
@contact: alexandre.co@hotmail.fr

PAGAIE_interface est une interface regroupant les fonctions de geohydroconvert
pertinentes pour géotraiter les données géographiques dans le cadre de la 
méthodologie Eau et Territoire (https://eau-et-territoire.org ).
A la difference de geohydroconvert, trajectoire_toolbox propose une sélection 
des fonctions les plus utiles de geohydroconvert, au sein d'une interface
traduite en Français.
"""

#%% IMPORTATIONS
import os
import re
from functools import wraps
from pathlib import Path
import datetime
import xarray as xr
import pandas as pd
import numpy as np
xr.set_options(keep_attrs = True)
from geop4th import geobricks as geo
from geop4th.workflows.standardize import standardize_fr as stzfr

#%% A DEPLACER !!!
# =============================================================================
# @formating
# def era5(data, *, 
#          mask=None, 
#          bounds=None, 
#          resolution=None, 
#          x0=None, 
#          y0=None,
#          base_template=None, 
#          **rio_kwargs,
#          ):     
#     # ERA5
#     if data_type.casefold().replace(' ', '').replace('-', '') in [
#             "era5", "era5land", "era"]:
#         data_folder, filelist = liste_fichiers(data, extension = '.nc')
# 
#         # outpath = outpath[:-3] # to remove '.nc'
#         outpath = os.path.splitext(filelist[0])[0]
#         outpath = os.path.join(data_folder, outpath)
#         # Corriger les données
#         ds = convertir_cwatm([os.path.join(data_folder, f) for f in filelist], data_type = "ERA5")
#         # Géoréférencer les données
#         ds = georeferencer(data = ds, data_type = "other", crs = 4326)
#         # Compression avec pertes (packing)
#         ds_comp = compresser(ds)
#         exporter(ds_comp, outpath + '_pack' + '.nc')
#         # Reprojections et exports
#         # rio_kwargs['dst_crs'] = 2154
#         for res in resolution:
#             if res is None:
#                 res_suffix = ''
#                 if 'resolution' in rio_kwargs:
#                     rio_kwargs.pop('resolution')
#             else:
#                 res_suffix = f'_{res}m'
#                 rio_kwargs['resolution'] = res
#             n_mask = 0
#             for m in mask:
#                 n_mask += 1
#                 ds_rprj = reprojeter(data = ds, mask = m, bounds = bounds,
#                                      x0 = x0, y0 = y0, base_template = base_template,
#                                      **rio_kwargs)
#                 exporter(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc')
#                 # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs...
#         
#         del ds_rprj
#         del ds
#         del ds_comp
# =============================================================================

#%% DECORATOR
def formating(func):
    @wraps(func)
    def wrapper(data, **kwargs):
        
        if 'resolution' not in kwargs:
            kwargs['resolution'] = None
        if 'mask' not in kwargs:
            kwargs['mask'] = None
        
        kwargs['rundate'] = datetime.datetime.now().strftime("%Y-%m-%d_%Hh%M")
        
        if isinstance(kwargs['resolution'], int) or (kwargs['resolution'] is None):
            kwargs['resolution'] = [kwargs['resolution']] # it can therefore be a list of tuples or a list of lists
            
        if not isinstance(kwargs['mask'], list):
            kwargs['mask'] = [kwargs['mask']]
        
        func(data, **kwargs)
        
    return wrapper
    

#%% FILE MANAGEMENT
def infer_outpath(path):
    
    # Safeguard
    try:
        path = Path(path)
    except TypeError as err:
        print(f"TypeError: `path` should be a valid path\n{err}\n")
        raise
        
    if path.is_file():
        stz_pattern = re.compile(".*stz.*")
        if (len(stz_pattern.findall(path.stem)) == 0) & (len(stz_pattern.findall(path.parent.stem)) == 0):
            print("Warning: make sure that the file defined by `path` has been standardized with geop4th standardize functions")
        outpath = path
        
    elif path.is_dir():
        _, filelist = geo.get_filelist(path, tag = 'stz')
        if len(filelist) == 0:
            print(f"Warning: `data` was passed as a folder, but no standardized files have been identified in this folder. Please consider using geop4th standardize functions beforehand. `outpath` is inferred from the first file: {filelist[0]}")
            _, filelist_all = geo.get_filelist(path)
            outpath = path / filelist_all[0]
        elif len(filelist) > 1:
            print(f"Warning: Several files were detected as standardized in the `data` folder. The outpath is inferred from the first file: {filelist[0]}")
            outpath = path / filelist[0]
        else:
            outpath = path / filelist[0]
    
    return outpath
            
            
        
#%% PORTAL FOR FORMATING
def format_data(data, 
                data_name, *, 
                mask=None, 
                bounds=None, 
                resolution=None, 
                x0=None, 
                y0=None,
                base_template=None, 
                **rio_kwargs,
                ):
    
    # ---- Données topographiques
    #%%% BD ALTI 25m
    if data_name.casefold().replace(' ', '') in ['bdalti', 'alti', 'ignalti']:
        return bdalti(data)
    
    # ---- Données climatiques
    #%%% DRIAS-Climat 2022 (netcdf)
    # Données SAFRAN (tas, prtot, rayonnement, vent... ) "EXPLORE2-Climat 2022" 
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            'drias2022', 'climat2022', 'driasclimat2022']:
        return explore2climat(data)
    
    #%%% DRIAS-Eau 2024 (netcdf)
    # Indicateurs SAFRAN (SWIAV, SSWI-3...) "EXPLORE2-SIM2 2024" 
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            'sim2024', 'indicateursim2024', 'indicateurssim2024',
            'driaseau2024', 'indicateurdriaseau2024',
            'indicateursdriaseau2024']:
        return explore2eau(data)
    
    #%%% SIM2 (csv to netCDF)
    # Réanalyse historique climatique Safran-Isba(-Modcou) "SIM2" 
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            'sim2', 'sim', 'safranisba', 'safranisbamodcou',
            ]:
        return sim2(data)
    
    # ---- Données d'usage de l'eau
    #%%% BNPE
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            "bnpe"]:
        return bnpe(data)
    
    # ---- Données d'occupation des sols
    #%%% OCS SCOT Pays Basque
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            "ocs", "ocspbs", "ocspaysbasque"]:
        return ocspaysbasque(data)
    
    
#%% FUNCTIONS FOR FORMATING
#%%% Topographic data

[docs] @formating def dem(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, outpath = None, **rio_kwargs, ): r""" Format standardized DEM data for CWatM: - reproject and clip them into passed ``resolution``, ``mask``, ``dst_crs``, ``x0``, ``y0`` ... - generate matching maps of flow directions - generate matching maps of standard deviations of elevation Parameters ---------- data : path (str or pathlib.Path), or variable (numpy.ndarray, xarray.Dataset or xarray.DataArray) MNT Dataset retrieved from IGN "ALTI" database https://geoservices.ign.fr/bdalti mask : (list of) str, pathlib.Path, shapely.geometry, xarray.DataArray or geopandas.GeoDataFrame, optional Filepath of mask used to clip the data. bounds : iterable or None, optional, default None Boundaries of the target domain as a tuple (x_min, y_min, x_max, y_max). resolution : (list of) int, optional List of resolutions to use for data formating. x0: number, optional, default None Origin of the X-axis, used to align the reprojection grid. y0: number, optional, default None Origin of the Y-axis, used to align the reprojection grid. base_template : str, pathlib.Path, xarray.DataArray or geopandas.GeoDataFrame, optional Filepath, used as a template for spatial profile. Supported file formats are *.tif*, *.nc* and vector formats supported by geopandas (*.shp*, *.json*, ...). outpath : str, optional Outpath stem when ``data`` is passed as a GEOP4TH variable instead of a filepath. **rio_kwargs : keyword args, optional Argument passed to the ``xarray.Dataset.rio.reproject()`` function call. **Note**: These arguments are prioritary over ``base_template`` attributes. May contain: - ``dst_crs`` : str - ``resolution`` : float or tuple - ``shape`` : tuple (int, int) - ``transform`` : Affine - ``nodata`` : float or None - ``resampling`` : - see ``help(rasterio.enums.Resampling)`` - most common are: ``5`` (average), ``13`` (sum), ``0`` (nearest), ``9`` (min), ``8`` (max), ``1`` (bilinear), ``2`` (cubic)... - the functionality ``'std'`` (standard deviation) is also available - see ``help(xarray.Dataset.rio.reproject)`` Returns ------- None. Generate the formated files, with filepaths derived from ``data`` or from ``outpath``. Examples --------- >>> cwatm.dem( r"E:\Inputs\DEM\IGN\BDALTIV2_folder\BDALTIV2_stz.tif", resolution = [500, 1000], x0 = 0, y0 = 0, dst_crs = 27572, mask = r"E:\Inputs\masks\MyMask.shp" ) Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\BDALTIV2_stz_CWATM_1000m_mask1.tif' Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\ElevationStD_BDALTIV2_stz_CWATM_1000m_mask1.tif' Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\BDALTIV2_stz_CWATM_500m_mask1.tif' Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\ElevationStD_BDALTIV2_stz_CWATM_500m_mask1.tif' """ # ---- Load # Initialize the output filepath(s) if isinstance(data, xr.Dataset): if outpath is None: print("Error: if `data` is a xarray.Dataset, the argument `outpath` needs to be passed") return else: outpath_stem = Path(outpath) elif isinstance(data, (str, Path)): outpath_stem = infer_outpath(data) data = outpath_stem outpath = outpath_stem.with_suffix('.tif').with_stem(outpath_stem.stem + '_CWATM') # Load data_ds = geo.load_any(data) # Impose reprojection resampling to be the minimum value rio_kwargs['resampling'] = 9, # 9: minimum # ---- Replace 0 (sea level) with nodata main_var = geo.main_vars(data_ds)[0] encod = data_ds[main_var].encoding # store encodings (_FillValue, compression...) data_ds[main_var] = data_ds[main_var].where(data_ds[main_var] != 0) data_ds[main_var].encoding = encod # transfer encodings # ---- Reproject for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data_ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) # x0 = 12.5, y0 = 12.5 geo.export(ds_rprj, outpath.with_stem(outpath.stem + res_suffix + f'_mask{n_mask}') ) # ---- Flow directions ldd = geo.compute_ldd(ds_rprj, dirmap = '1-9', engine = 'pysheds') geo.export(ldd, outpath.with_stem('LDD_' + outpath.stem + res_suffix + f'_mask{n_mask}') ) # ---- Standard deviation on each cell if res is not None: # impose a resampling according to the standard deviation if 'resampling' in rio_kwargs: rio_kwargs.pop('resampling') elevstd = geo.reproject(data_ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, resampling = 'std', # standard deviation **rio_kwargs) geo.export(elevstd, outpath.with_stem('ElevationStD_' + outpath.stem + res_suffix + f'_mask{n_mask}')) else: print("Err: Standard Deviation can only be computed if there is a downscaling. A resolution argument should be passed")
#%%% Données climatiques @formating def explore2climat(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # DRIAS-Climat 2022 (netcdf) # Données SAFRAN (tas, prtot, rayonnement, vent... ) "EXPLORE2-Climat 2022" data_folder, filelist = geo.get_filelist(data, extension = '.nc') for file_name in filelist: data = os.path.join(data_folder, file_name) # Raccourcir le nom motif1 = re.compile('(.*Adjust_France)') motif2 = re.compile('(historical|rcp25|rcp45|rcp85)') motif3 = re.compile('(\d{4,4}-\d{4,4}.*)') str1 = motif1.split(file_name)[1][:-7] str2 = motif2.split(file_name)[1] str3 = motif3.split(file_name)[1] outpath = '_'.join([str1, str2, str3]) # outpath = outpath[:-3] # to remove '.nc' outpath = os.path.splitext(outpath)[0] outpath = os.path.join(data_folder, outpath) # Corriger les données ds = stzfr.explore2climat(data) # Géoréférencer les données ds = geo.georef(data = ds, data_type = 'drias2022') # Compression avec pertes (packing) ds_comp = geo.pack(ds) geo.export(ds_comp, outpath + '_pack' + '.nc') # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc') # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs... del ds_rprj del ds del ds_comp @formating def explore2eau(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # DRIAS-Eau 2024 (netcdf) # Indicateurs SAFRAN (SWIAV, SSWI-3...) "EXPLORE2-SIM2 2024" data_folder, filelist = geo.get_filelist(data, extension = '.nc') for file_name in filelist: data = os.path.join(data_folder, file_name) # Raccourcir le nom motif1 = re.compile('(.*_France)') motif2 = re.compile('(historical|rcp25|rcp45|rcp85)') motif3 = re.compile('(\d{4,4}-\d{4,4}.*)') str1 = motif1.split(file_name)[1][:-7] str2 = motif2.split(file_name)[1] str3 = motif3.split(file_name)[1] outpath = '_'.join([str1, str2, str3]) # outpath = outpath[:-3] # to remove '.nc' outpath = os.path.splitext(outpath)[0] outpath = os.path.join(data_folder, outpath) # Corriger les données ds = stzfr.explore2eau(data) # Géoréférencer les données ds = geo.georef(data = ds, data_type = "DRIAS-Eau 2024") # Compression avec pertes (packing) ds_comp = geo.pack(ds) geo.export(ds_comp, outpath + '_pack' + '.nc') # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data = ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc') # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs... del ds_rprj del ds del ds_comp
[docs] @formating def meteo(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # SIM2 (csv to netCDF) # Réanalyse historique climatique Safran-Isba(-Modcou) "SIM2" # Load data # ============================================================================= # sim2_pattern = re.compile("_SIM2_(.*)") # ============================================================================= if isinstance(data, list): data_ds = geo.merge_data(data) if os.path.isdir(data[0]): outpath = data # ============================================================================= # suffix = datetime.datetime.now().strftime("%Y-%m-%d_%Hh%M") # ============================================================================= elif os.path.isfile(data[0]): outpath = os.path.split(data)[0] # ============================================================================= # suffix = sim2_pattern.findall(os.path.splitext(os.path.split(data)[-1])[0])[0] # ============================================================================= elif isinstance(data, (str, Path)): if os.path.isfile(data): data_ds = geo.load_any(data, sep = ';') outpath = os.path.split(data)[0] # ============================================================================= # suffix = sim2_pattern.findall(os.path.splitext(os.path.split(data)[-1])[0])[0] # ============================================================================= elif os.path.isdir(data): # Recherche en 1er les fichiers .csv outpath, filelist = geo.get_filelist(data, extension = '.csv') if len(filelist) > 0: data_ds = geo.merge_data(os.path.join(outpath, filelist)) # Si aucun, recherche en 2e les fichiers .nc (1 fichier par variable) else: data_ds = xr.Dataset() for var in ['PRENEI_Q', 'PRELIQ_Q', 'PRETOT_Q', 'T_Q', 'TINF_H_Q', 'TSUP_H_Q', 'ETP_Q', 'Q_Q', 'FF_Q', 'DLI_Q', 'SSI_Q', 'HU_Q']: outpath, filelist = geo.get_filelist(data, extension = '.nc', tag = var) if (filelist) > 0: ds_temp = geo.merge(os.path.join(outpath, filelist)) data_ds[var] = ds_temp[var] else: data_ds = geo.load_any(data) if isinstance(data_ds, pd.DataFrame): # data_ds = geo.convert_to_cwatm(data_ds, data_type = "SIM2") data_ds = stzfr.sim2(data_ds) # Géoréférencer les données data_ds = geo.georef(data_ds) # Préparation outpath time_var = geo.main_time_dims(data_ds)[0] print('---') print(data_ds[time_var].min()) start_year = pd.to_datetime(data_ds[time_var].min().values).strftime("%Y-%m") end_year = pd.to_datetime(data_ds[time_var].max().values).strftime("%Y-%m") var_list = geo.main_vars(data_ds) for var in var_list: var_ds = data_ds[[var]].copy() # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data = var_ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, os.path.join( outpath, '_'.join([var, start_year, end_year]) + res_suffix + f'_mask{n_mask}.nc') )
#%%% Données d'occupation du sol @formating def ocspaysbasque(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # OCS SCOT Pays Basque data_folder, filelist = geo.get_filelist(data, extension = '.nc') # outpath = outpath[:-3] # to remove '.nc' outpath = os.path.splitext(filelist[0])[0] outpath = os.path.join(data_folder, outpath) # Corriger les données ds = stzfr.ocspaysbasque([os.path.join(data_folder, f) for f in filelist]) # Géoréférencer les données ds = geo.georef(data = ds, data_type = "other", crs = 4326) # Compression avec pertes (packing) ds_comp = geo.pack(ds) geo.export(ds_comp, outpath + '_pack' + '.nc') # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data = ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc') # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs... del ds_rprj del ds del ds_comp #%%% Données d'usages de l'eau @formating def bnpe(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # BNPE # ---- Creer les fichiers qui combinent les info sur les chroniques et les infos sur les ouvrages gdf = stzfr.bnpe(data = data) # ---- Initialisations concat_folder = Path(data).with_name("concat") year_start = int(gdf.time.dt.year.min()) year_end = int(gdf.time.dt.year.max()) time_dim = geo.main_time_dims(gdf)[0] n_mask = 0 for m in mask: n_mask += 1 # ---- Manage folders if not (concat_folder / Path(f"mask{n_mask}")).exists(): os.makedirs(concat_folder / Path(f"mask{n_mask}")) # ---- Cut json files on masks print(f"\nCut shapefile over mask {n_mask}/{len(mask)}:") reprj_gdf = geo.reproject(gdf, # main_vars = 1, # useless mask = m) geo.export(reprj_gdf, concat_folder / Path(f"mask{n_mask}") / f"{year_start}-{year_end}_clipped.json", encoding='utf-8') # ---- Fichier récapitulatif .csv # Reorder columns (optional) col_names_base = ['code_ouvrage', 'nom_commune', 'volume', 'code_usage', 'libelle_usage', 'prelevement_ecrasant'] + [time_dim] if ('libelle_type_milieu' in gdf.columns) & ('code_type_milieu' in gdf.columns): col_names = col_names_base + ['code_type_milieu', 'libelle_type_milieu'] elif ('libelle_type_milieu' in gdf.columns) & ('code_type_milieu' not in gdf.columns): col_names = col_names_base + ['libelle_type_milieu'] elif ('libelle_type_milieu' not in gdf.columns) & ('code_type_milieu' in gdf.columns): col_names = col_names_base + ['code_type_milieu'] # ============================================================================= # df = pd.DataFrame(reprj_gdf[col_names]) # ============================================================================= col_names = [time_dim] + col_names remaining_col = list(set(gdf.columns) - set(col_names)) # ============================================================================= # df = df[[time_dim] + col_names # ============================================================================= df = pd.DataFrame(reprj_gdf[col_names + remaining_col]) # Remove useless rows df = df[df.volume > 0] # Sort by nom_commune df.sort_values(by = 'nom_commune', inplace = True) # Export du fichier récapitulatif csv df.to_csv(concat_folder / "recap.csv", sep = ';', encoding = 'utf-8') # ---- Rastérisation for res in resolution: if res is None: res_suffix = '' res_folder = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' res_folder = f"{res}m" rio_kwargs['resolution'] = res # Folders if not (concat_folder / "netcdf" / res_folder / f"mask{n_mask}").exists(): os.makedirs(concat_folder / "netcdf" / res_folder / f"mask{n_mask}") # Reprojection ds_rprj = geo.reproject(data = gdf, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, rasterize = True, main_vars = None, **rio_kwargs) if 'dst_crs' in rio_kwargs: ds_rprj = geo.georef(data = ds_rprj, include_crs = True, crs = rio_kwargs['dst_crs']) else: ds_rprj = geo.georef(data = ds_rprj) # Export de chaque fichier annuel reprojeté geo.export(ds_rprj, concat_folder / "netcdf" / res_folder / f"mask{n_mask}" / f"{year_start}-{year_end}_clipped.nc", ) #%% STANDARDIZE OUTPUTS def output(data, *, data_type = 'CWatM'): # data_type peut être aussi = 'MODFLOW' 0 #%% AUTRES FONCTIONS def zones_basses(mnt_raster): # Génère un raster indiquant les pixels sous le niveau de la mer, à partir # d'un raster de modèle numérique de terrain with xr.open_dataset(mnt_raster, decode_times = False, decode_coords = 'all') as mnt_ds: mnt_ds.load() def dummy_fractionLandcover(base, first_year = 2020, last_year = 2020): frac0 = geo.dummy_input(base, 0) frac1 = geo.dummy_input(base, 1) mvar = geo.main_var(frac0)[0] data_ds = frac0.copy() data_ds = data_ds.rename({mvar: 'fracforest'}) data_ds['fracgrassland'] = frac1[mvar] data_ds['fracirrNonPaddy'] = frac0[mvar] data_ds['fracirrPaddy'] = frac0[mvar] data_ds['fracsealed'] = frac0[mvar] data_ds['fracwater'] = frac0[mvar] data_ds = data_ds.expand_dims(dim = {'time': pd.date_range(start = f"{first_year}-01-01", periods = last_year - first_year + 1, freq = 'YE') }, axis = 0) data_ds = geo.georef(data_ds, crs = data_ds.rio.crs) data_ds, _ = geo.standard_fill_value(data_ds) for var in geo.main_var(data_ds): data_ds[var].attrs['long_name'] = var return data_ds def dummy_dzRel(base, out_path): array = geo.dummy_input(base, 1) mvar = geo.main_var(array)[0] data_ds = xr.Dataset() for n in [0, 1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]: var = f'dzRel{n:04}' data_ds[var] = array[mvar] data_ds[var].attrs['long_name'] = var data_ds = geo.georef(data_ds, crs = data_ds.rio.crs) data_ds, _ = geo.standard_fill_value(data_ds) geo.export(data_ds, Path(out_path)/"dzRel_dummy.nc") return data_ds def dummy_lakeres(base, out_path): # array = geo.dummy_input(base, 1) # mvar = geo.main_var(array)[0] print("Warning: The input `base` should necessary be the mask") data_ds = geo.dummy_input(base, 0) mvar = geo.main_var(data_ds)[0] xvar, yvar = geo.main_space_dims(data_ds) stack_da = data_ds[mvar].stack(xy = [xvar, yvar]) # Determine the coordinates of the first not-NaN cell x, y = stack_da[stack_da.notnull()].xy[0].item() data_ds[mvar].loc[{xvar: x, yvar: y}] = 1 data_ds = geo.georef(data_ds, crs = data_ds.rio.crs) data_ds, _ = geo.standard_fill_value(data_ds) geo.export(data_ds, Path(out_path)/"lakeres_dummy.nc") return data_ds def dummy_routing(base, out_path): ds_dict = {} ds_dict['chanMan'] = geo.dummy_input(base, 0.3) ds_dict['chanGrad'] = geo.dummy_input(base, 0.02) ds_dict['chanGradMin'] = geo.dummy_input(base, 0.0001) ds_dict['chanLength'] = geo.dummy_input(base, 1400) ds_dict['chanWidth'] = geo.dummy_input(base, 17) ds_dict['chanDepth'] = geo.dummy_input(base, 1) for var in ds_dict: ds_dict[var] = geo.georef(ds_dict[var], crs = ds_dict[var].rio.crs) ds_dict[var], _ = geo.standard_fill_value(ds_dict[var]) geo.export(ds_dict[var], Path(out_path)/f"{var}_dummy.nc") return ds_dict