Source code for cwatm

# -*- coding: utf-8 -*-
"""
Created on Tue Sep 17 16:58:10 2024

@author: Alexandre Kenshilik Coche
@contact: alexandre.co@hotmail.fr

PAGAIE_interface est une interface regroupant les fonctions de geohydroconvert
pertinentes pour géotraiter les données géographiques dans le cadre de la 
méthodologie Eau et Territoire (https://eau-et-territoire.org ).
A la difference de geohydroconvert, trajectoire_toolbox propose une sélection 
des fonctions les plus utiles de geohydroconvert, au sein d'une interface
traduite en Français.
"""

#%% IMPORTATIONS
import os
import re
from functools import wraps
from pathlib import Path
import datetime
from shapely.geometry import Polygon
import xarray as xr
import pandas as pd
from pandas.api.types import is_numeric_dtype
import numpy as np
xr.set_options(keep_attrs = True)
from rosetta import rosetta, SoilData
from geop4th import geobricks as geo
from geop4th.workflows.standardize import standardize_fr as stzfr
from geop4th.graphics import (
    ncplot as ncp,
    cmapgenerator as cmg,
    )


#%% A DEPLACER !!!
# =============================================================================
# @formating
# def era5(data, *, 
#          mask=None, 
#          bounds=None, 
#          resolution=None, 
#          x0=None, 
#          y0=None,
#          base_template=None, 
#          **rio_kwargs,
#          ):     
#     # ERA5
#     if data_type.casefold().replace(' ', '').replace('-', '') in [
#             "era5", "era5land", "era"]:
#         data_folder, filelist = liste_fichiers(data, extension = '.nc')
# 
#         # outpath = outpath[:-3] # to remove '.nc'
#         outpath = os.path.splitext(filelist[0])[0]
#         outpath = os.path.join(data_folder, outpath)
#         # Corriger les données
#         ds = convertir_cwatm([os.path.join(data_folder, f) for f in filelist], data_type = "ERA5")
#         # Géoréférencer les données
#         ds = georeferencer(data = ds, data_type = "other", crs = 4326)
#         # Compression avec pertes (packing)
#         ds_comp = compresser(ds)
#         exporter(ds_comp, outpath + '_pack' + '.nc')
#         # Reprojections et exports
#         # rio_kwargs['dst_crs'] = 2154
#         for res in resolution:
#             if res is None:
#                 res_suffix = ''
#                 if 'resolution' in rio_kwargs:
#                     rio_kwargs.pop('resolution')
#             else:
#                 res_suffix = f'_{res}m'
#                 rio_kwargs['resolution'] = res
#             n_mask = 0
#             for m in mask:
#                 n_mask += 1
#                 ds_rprj = reprojeter(data = ds, mask = m, bounds = bounds,
#                                      x0 = x0, y0 = y0, base_template = base_template,
#                                      **rio_kwargs)
#                 exporter(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc')
#                 # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs...
#         
#         del ds_rprj
#         del ds
#         del ds_comp
# =============================================================================

#%% DECORATOR
def formating(func):
    @wraps(func)
    def wrapper(data, **kwargs):
        
        # ---- Formating `resolution`
        if 'resolution' not in kwargs:
            kwargs['resolution'] = None
        
        if isinstance(kwargs['resolution'], int) or (kwargs['resolution'] is None):
            kwargs['resolution'] = [kwargs['resolution']] # it can therefore be a list of tuples or a list of lists
            
            
        # ---- Formating `mask` and `bounds`
        # `mask` and `bounds` arguments can not be passed together
        # To homogeneize cases, `bounds` will eventually be converted to `mask`
        if 'mask' not in kwargs:
            kwargs['mask'] = None
            
            if 'bounds' in kwargs:
                # `bounds` is converted to `mask`
                kwargs['mask'] = Polygon([
                    (kwargs['bounds'][0], kwargs['bounds'][1]),
                    (kwargs['bounds'][2], kwargs['bounds'][1]), 
                    (kwargs['bounds'][2], kwargs['bounds'][3]), 
                    (kwargs['bounds'][0], kwargs['bounds'][3]),
                    (kwargs['bounds'][0], kwargs['bounds'][1]),
                    ])
                kwargs['bounds'] = None
            
            else: # 'bounds' not in kwargs:
                kwargs['bounds'] = None
                
        else: # 'mask' in kwargs
            if 'bounds' in kwargs:
                print("Warning: the arguments `mask` and `bounds` are not intended to be passed together. Only `mask` will be used.")
                kwargs['bounds'] = None
            
        if not isinstance(kwargs['mask'], list):
            kwargs['mask'] = [kwargs['mask']]
        
        # ---- Formating `rundate`
        kwargs['rundate'] = datetime.datetime.now().strftime("%Y-%m-%d_%Hh%M")
        
        
        return func(data, **kwargs)
        
    return wrapper
    

#%% FILE MANAGEMENT
def infer_outpath(path):
    
    # Safeguard
    try:
        path = Path(path)
    except TypeError as err:
        print(f"TypeError: `path` should be a valid path\n{err}\n")
        raise
        
    if path.is_file():
        stz_pattern = re.compile(".*stz.*")
        if (len(stz_pattern.findall(path.stem)) == 0) & (len(stz_pattern.findall(path.parent.stem)) == 0):
            print("Warning: make sure that the file defined by `path` has been standardized with geop4th standardize functions")
        outpath = path
        
    elif path.is_dir():
        _, filelist = geo.get_filelist(path, tag = 'stz')
        if len(filelist) == 0:
            print(f"Warning: `data` was passed as a folder, but no standardized files have been identified in this folder. Please consider using geop4th standardize functions beforehand. `outpath` is inferred from the first file: {filelist[0]}")
            _, filelist_all = geo.get_filelist(path)
            outpath = path / filelist_all[0]
        elif len(filelist) > 1:
            print(f"Warning: Several files were detected as standardized in the `data` folder. The outpath is inferred from the first file: {filelist[0]}")
            outpath = path / filelist[0]
        else:
            outpath = path / filelist[0]
    
    return outpath
            
            
        
#%% PORTAL FOR FORMATING
def format_data(data, 
                data_name, *, 
                mask=None, 
                bounds=None, 
                resolution=None, 
                x0=None, 
                y0=None,
                base_template=None, 
                **rio_kwargs,
                ):
    
    # ---- Données topographiques
    #%%% BD ALTI 25m
    if data_name.casefold().replace(' ', '') in ['bdalti', 'alti', 'ignalti']:
        return bdalti(data)
    
    # ---- Données climatiques
    #%%% DRIAS-Climat 2022 (netcdf)
    # Données SAFRAN (tas, prtot, rayonnement, vent... ) "EXPLORE2-Climat 2022" 
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            'drias2022', 'climat2022', 'driasclimat2022']:
        return explore2climat(data)
    
    #%%% DRIAS-Eau 2024 (netcdf)
    # Indicateurs SAFRAN (SWIAV, SSWI-3...) "EXPLORE2-SIM2 2024" 
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            'sim2024', 'indicateursim2024', 'indicateurssim2024',
            'driaseau2024', 'indicateurdriaseau2024',
            'indicateursdriaseau2024']:
        return explore2eau(data)
    
    #%%% SIM2 (csv to netCDF)
    # Réanalyse historique climatique Safran-Isba(-Modcou) "SIM2" 
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            'sim2', 'sim', 'safranisba', 'safranisbamodcou',
            ]:
        return sim2(data)
    
    # ---- Données d'usage de l'eau
    #%%% BNPE
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            "bnpe"]:
        return bnpe(data)
    
    # ---- Données d'occupation des sols
    #%%% OCS SCOT Pays Basque
    elif data_name.casefold().replace(' ', '').replace('-', '') in [
            "ocs", "ocspbs", "ocspaysbasque"]:
        return ocspaysbasque(data)
    
    
#%% FUNCTIONS FOR FORMATING
#%%% Topographic data

[docs] @formating def dem(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, outpath = None, **rio_kwargs, ): r""" Format standardized DEM data for CWatM: - reproject and clip them into passed ``resolution``, ``mask``, ``dst_crs``, ``x0``, ``y0`` ... - generate matching maps of flow directions - generate matching maps of standard deviations of elevation Parameters ---------- data : path (str or pathlib.Path), or variable (numpy.ndarray, xarray.Dataset or xarray.DataArray) MNT Dataset retrieved from IGN "ALTI" database https://geoservices.ign.fr/bdalti mask : (list of) str, pathlib.Path, shapely.geometry, xarray.DataArray or geopandas.GeoDataFrame, optional Filepath of mask used to clip the data. bounds : iterable or None, optional, default None Boundaries of the target domain as a tuple (x_min, y_min, x_max, y_max). resolution : (list of) int, optional List of resolutions to use for data formating. x0: number, optional, default None Origin of the X-axis, used to align the reprojection grid. y0: number, optional, default None Origin of the Y-axis, used to align the reprojection grid. base_template : str, pathlib.Path, xarray.DataArray or geopandas.GeoDataFrame, optional Filepath, used as a template for spatial profile. Supported file formats are *.tif*, *.nc* and vector formats supported by geopandas (*.shp*, *.json*, ...). outpath : str, optional Outpath stem when ``data`` is passed as a GEOP4TH variable instead of a filepath. **rio_kwargs : keyword args, optional Argument passed to the ``xarray.Dataset.rio.reproject()`` function call. **Note**: These arguments are prioritary over ``base_template`` attributes. May contain: - ``dst_crs`` : str - ``resolution`` : float or tuple - ``shape`` : tuple (int, int) - ``transform`` : Affine - ``nodata`` : float or None - ``resampling`` : - see ``help(rasterio.enums.Resampling)`` - most common are: ``5`` (average), ``13`` (sum), ``0`` (nearest), ``9`` (min), ``8`` (max), ``1`` (bilinear), ``2`` (cubic)... - the functionality ``'std'`` (standard deviation) is also available - see ``help(xarray.Dataset.rio.reproject)`` Returns ------- None. Generate the formated files, with filepaths derived from ``data`` or from ``outpath``. Examples --------- >>> cwatm.dem( r"E:\Inputs\DEM\IGN\BDALTIV2_folder\BDALTIV2_stz.tif", resolution = [500, 1000], x0 = 0, y0 = 0, dst_crs = 27572, mask = r"E:\Inputs\masks\MyMask.shp" ) Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\BDALTIV2_stz_CWATM_1000m_mask1.tif' Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\ElevationStD_BDALTIV2_stz_CWATM_1000m_mask1.tif' Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\BDALTIV2_stz_CWATM_500m_mask1.tif' Exporting... _ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\ElevationStD_BDALTIV2_stz_CWATM_500m_mask1.tif' """ # ---- Load # Initialize the output filepath(s) if isinstance(data, xr.Dataset): if outpath is None: print("Error: if `data` is a xarray.Dataset, the argument `outpath` needs to be passed") return else: outpath_stem = Path(outpath) elif isinstance(data, (str, Path)): outpath_stem = infer_outpath(data) data = outpath_stem outpath = outpath_stem.with_suffix('.tif').with_stem(outpath_stem.stem + '_CWATM') # Load data_ds = geo.load_any(data) # Impose reprojection resampling to be the minimum value rio_kwargs['resampling'] = 9, # 9: minimum # ---- Replace 0 (sea level) with nodata main_var = geo.main_vars(data_ds)[0] encod = data_ds[main_var].encoding # store encodings (_FillValue, compression...) data_ds[main_var] = data_ds[main_var].where(data_ds[main_var] != 0) data_ds[main_var].encoding = encod # transfer encodings # ---- Reproject for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data_ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) # x0 = 12.5, y0 = 12.5 geo.export(ds_rprj, outpath.with_stem(outpath.stem + res_suffix + f'_mask{n_mask}') ) # ---- Flow directions ldd = geo.compute_ldd(ds_rprj, dirmap = '1-9', engine = 'pysheds') geo.export(ldd, outpath.with_stem('LDD_' + outpath.stem + res_suffix + f'_mask{n_mask}') ) # ---- Cell area area = geo.cell_area(ds_rprj, engine = 'equal earth') geo.export(area, outpath.with_stem('Area_' + outpath.stem + res_suffix + f'_mask{n_mask}') ) # ---- Standard deviation on each cell if res is not None: # impose a resampling according to the standard deviation if 'resampling' in rio_kwargs: rio_kwargs.pop('resampling') elevstd = geo.reproject(data_ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, resampling = 'std', # standard deviation **rio_kwargs) geo.export(elevstd, outpath.with_stem('ElevationStD_' + outpath.stem + res_suffix + f'_mask{n_mask}')) else: print("Err: Standard Deviation can only be computed if there is a downscaling. A resolution argument should be passed")
#%%% Climatic data @formating def explore2climat(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # DRIAS-Climat 2022 (netcdf) # Données SAFRAN (tas, prtot, rayonnement, vent... ) "EXPLORE2-Climat 2022" data_folder, filelist = geo.get_filelist(data, extension = '.nc') for file_name in filelist: data = os.path.join(data_folder, file_name) # Raccourcir le nom motif1 = re.compile('(.*Adjust_France)') motif2 = re.compile('(historical|rcp25|rcp45|rcp85)') motif3 = re.compile('(\d{4,4}-\d{4,4}.*)') str1 = motif1.split(file_name)[1][:-7] str2 = motif2.split(file_name)[1] str3 = motif3.split(file_name)[1] outpath = '_'.join([str1, str2, str3]) # outpath = outpath[:-3] # to remove '.nc' outpath = os.path.splitext(outpath)[0] outpath = os.path.join(data_folder, outpath) # Corriger les données ds = stzfr.explore2climat(data) # Géoréférencer les données ds = geo.georef(data = ds, data_type = 'drias2022') # Compression avec pertes (packing) ds_comp = geo.pack(ds) geo.export(ds_comp, outpath + '_pack' + '.nc') # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc') # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs... del ds_rprj del ds del ds_comp @formating def explore2eau(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # DRIAS-Eau 2024 (netcdf) # Indicateurs SAFRAN (SWIAV, SSWI-3...) "EXPLORE2-SIM2 2024" data_folder, filelist = geo.get_filelist(data, extension = '.nc') for file_name in filelist: data = os.path.join(data_folder, file_name) # Raccourcir le nom motif1 = re.compile('(.*_France)') motif2 = re.compile('(historical|rcp25|rcp45|rcp85)') motif3 = re.compile('(\d{4,4}-\d{4,4}.*)') str1 = motif1.split(file_name)[1][:-7] str2 = motif2.split(file_name)[1] str3 = motif3.split(file_name)[1] outpath = '_'.join([str1, str2, str3]) # outpath = outpath[:-3] # to remove '.nc' outpath = os.path.splitext(outpath)[0] outpath = os.path.join(data_folder, outpath) # Corriger les données ds = stzfr.explore2eau(data) # Géoréférencer les données ds = geo.georef(data = ds, data_type = "DRIAS-Eau 2024") # Compression avec pertes (packing) ds_comp = geo.pack(ds) geo.export(ds_comp, outpath + '_pack' + '.nc') # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data = ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc') # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs... del ds_rprj del ds del ds_comp
[docs] @formating def meteo(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # SIM2 (csv to netCDF) # Réanalyse historique climatique Safran-Isba(-Modcou) "SIM2" # Load data # ============================================================================= # sim2_pattern = re.compile("_SIM2_(.*)") # ============================================================================= if isinstance(data, list): data_ds = geo.merge_data(data) if os.path.isdir(data[0]): outpath = data # ============================================================================= # suffix = datetime.datetime.now().strftime("%Y-%m-%d_%Hh%M") # ============================================================================= elif os.path.isfile(data[0]): outpath = os.path.split(data)[0] # ============================================================================= # suffix = sim2_pattern.findall(os.path.splitext(os.path.split(data)[-1])[0])[0] # ============================================================================= elif isinstance(data, (str, Path)): if os.path.isfile(data): data_ds = geo.load_any(data, sep = ';') outpath = os.path.split(data)[0] # ============================================================================= # suffix = sim2_pattern.findall(os.path.splitext(os.path.split(data)[-1])[0])[0] # ============================================================================= elif os.path.isdir(data): # Recherche en 1er les fichiers .csv outpath, filelist = geo.get_filelist(data, extension = '.csv') if len(filelist) > 0: data_ds = geo.merge_data(os.path.join(outpath, filelist)) # Si aucun, recherche en 2e les fichiers .nc (1 fichier par variable) else: data_ds = xr.Dataset() for var in ['PRENEI_Q', 'PRELIQ_Q', 'PRETOT_Q', 'T_Q', 'TINF_H_Q', 'TSUP_H_Q', 'ETP_Q', 'Q_Q', 'FF_Q', 'DLI_Q', 'SSI_Q', 'HU_Q']: outpath, filelist = geo.get_filelist(data, extension = '.nc', tag = var) if (filelist) > 0: ds_temp = geo.merge(os.path.join(outpath, filelist)) data_ds[var] = ds_temp[var] else: data_ds = geo.load_any(data) if isinstance(data_ds, pd.DataFrame): # data_ds = geo.convert_to_cwatm(data_ds, data_type = "SIM2") data_ds = stzfr.sim2(data_ds) # Géoréférencer les données data_ds = geo.georef(data_ds) # Préparation outpath time_var = geo.main_time_dims(data_ds)[0] print('---') print(data_ds[time_var].min()) start_year = pd.to_datetime(data_ds[time_var].min().values).strftime("%Y-%m") end_year = pd.to_datetime(data_ds[time_var].max().values).strftime("%Y-%m") var_list = geo.main_vars(data_ds) for var in var_list: var_ds = data_ds[[var]].copy() # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data = var_ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, os.path.join( outpath, '_'.join([var, start_year, end_year]) + res_suffix + f'_mask{n_mask}.nc') )
#%%% Landcover data
[docs] @formating def landcover(data, *, year = None, sep = '|', decimal = '.', csv_date_format = "%y-%m-%d", mask = None, bounds = None, resolution = None, x0 = None, y0 = None, base_template = None, outpath = None, polygon_file = None, **rio_kwargs, ): """ This function process a guideline file, typically a tabular CSV file, in which are described where and how to process the data to create CWatM landcover parameters. Therefore, besides this guideline file, ``cwatm.landcover()`` will also need a vector file with landcover polygons (or a raster file with area values), and some tabular CSV files containing cropCoefficient and interceptCap timeseries during one year (these timeseries can be parse, they do not need to be complete nor regular). With these elements, this function generates the following files: - at the root (*cwatm/* folder): - '<guideline_filename>_area_<year>_<mask>_<resolution>.nc': raster file with, in each cell, the area of each landcover described in the guideline file - '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc': a oen-year netCDF with the ``cropgroupnumber`` values in each cell - '<guideline_filename>_fracLandcover_<year>_<mask>_<resolution>.nc': a raster file with the fraction of CWatM landcovers in each cell - in landcover sub-folders (*grassland/*, *forest/*, *irrNonPaddy/*, *irrPaddy/*): - '<guideline_filename>_cropCoefficient_<year>_<mask>_<resolution>.nc' - '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc' - '<guideline_filename>_interceptCap_<year>_<mask>_<resolution>.nc' - '<guideline_filename>_maxRootDepth_<year>_<mask>_<resolution>.nc' Nested use ^^^^^^^^^^ For each `data`, this function creates netCDF files in the root folder and in the subfolders of each 6 CWatM landcover. It can be used in a nested way: this function can be called with `data` guidelines relative to a sub-landcover (for example a 'crop.csv' `data` defining 'maize' and 'wheat'), and the generated files (here for example .\cwatm\grassland\crop_maxRootDepth.nc) can be used as arguments in the guidelines of a 'final.csv' `data` (which defines e.g. 'wetland', 'pasture', 'crop' ... and uses the netCDF above for 'crop' values). In this example, the 'crop.csv' call will generates netCDF files in CWatM landcover subfolders ('forest', 'grassland', 'irrNonPaddy', ...), which will be used as arguments in 'final.csv', but also some netCDF files in the root folder, such as 'crop_fracLandcover.nc' or 'crop_cropgroupnumber.nc'. These root files are useless, as they are intended to give values weighted by CWatM landcovers, but the 'crop.csv' `data` does not include all CWatM landcovers. The files that should be used in CWatM are 'final_fracLandcover.nc' etc. in the root folder, and 'final_maxRootDepth.nc' etc. in the 6 landcover subfolders. Parameters ---------- data : pandas.DataFrame, or str or pathlib.Path Tabular files listing the rules to differentiate landcovers and to compute CWatM landcover parameters. mask : str, Path, shapely.geometry, xarray.DataArray or geopandas.GeoDataFrame, optional Filepath of mask used to clip the data. bounds : iterable or None, optional, default None Boundaries of the target domain as a tuple (x_min, y_min, x_max, y_max). The values are expected to be given according to ``bounds_crs`` if it is not None. If ``bounds_crs`` is None, ``bounds`` are expected to be given according to the destination CRS ``dst_crs`` if it is not None. It ``dst_crs`` is also None, ``bounds`` are then expected to be given according to the source CRS (``src_crs`` of ``data``'s CRS). resolution : float or tuple, optional DESCRIPTION. The default is None. x0: number, optional Origin of the X-axis, used to align the reprojection grid. y0: number, optional Origin of the Y-axis, used to align the reprojection grid. base_template : str, Path, xarray.DataArray or geopandas.GeoDataFrame, optional Filepath, used as a template for spatial profile. Supported file formats are *.tif*, *.nc* and vector formats supported by geopandas (*.shp*, *.json*, ...). outpath : str or pathlib.Path, optional Required when ``data`` is not a filepath. Define the location where the outputs will be exported (inside a *cwatm/* sub-folder). If ``outpath`` is None and ``data`` is a filepath, the outputs will be exported at the root of ``data`` (inside a *cwatm/* sub-folder). **rio_kwargs : keyword args, optional Argument passed to the ``xarray.Dataset.rio.reproject()`` function call. **Note**: These arguments are prioritary over ``base_template`` attributes. May contain: - ``dst_crs`` : str - ``resolution`` : float or tuple - ``shape`` : tuple (int, int) of (height, width) - ``transform`` : Affine - ``nodata`` : float or None - ``resampling`` : - see ``help(rasterio.enums.Resampling)`` - most common are: ``5`` (average), ``13`` (sum), ``0`` (nearest), ``9`` (min), ``8`` (max), ``1`` (bilinear), ``2`` (cubic)... - the functionality ``'std'`` (standard deviation) is also available - see ``help(xarray.Dataset.rio.reproject)`` Returns ------- Generates the landcover files required by CWatM: - at the root (*cwatm/* folder): - '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc': a oen-year netCDF with the ``cropgroupnumber`` values in each cell - '<guideline_filename>_fracLandcover_<year>_<mask>_<resolution>.nc': a raster file with the fraction of CWatM landcovers in each cell - '<guideline_filename>_area_<year>_<mask>_<resolution>.nc': raster file with, in each cell, the area of each landcover described in the guideline file (not required for CWatM, but usefull for other inputs formatting) - in landcover sub-folders (*grassland/*, *forest/*, *irrNonPaddy/*, *irrPaddy/*): - '<guideline_filename>_cropCoefficient_<year>_<mask>_<resolution>.nc' - '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc' - '<guideline_filename>_interceptCap_<year>_<mask>_<resolution>.nc' - '<guideline_filename>_maxRootDepth_<year>_<mask>_<resolution>.nc' """ # ---- Initializations landcover_list = ['forest', 'grassland', 'irrPaddy', 'irrNonPaddy', 'sealed', 'water'] guidelines = geo.load_any(data, sep = sep, decimal = decimal, header = 0, index_col = 0) year_arg = year # backup of `year` argument if year is None: year = datetime.datetime.now().year # to have a consistent time axis # ---- Folders if isinstance(data, (str, Path)): data = Path(data) outpath = data.parent / 'cwatm' outname = data.stem else: if outpath is None: print("Err: `outpath` argument is required when `data` is not a file") return else: outname = 'landcover' for var in ['area', 'cropgroupnumber', 'fracLandcover']: if not (outpath / var).exists(): os.makedirs(outpath / var) for landcover in landcover_list: if not (outpath / landcover).exists(): os.makedirs(outpath / landcover) for var in ['cropCoefficient', 'cropgroupnumber', 'interceptCap', 'maxRootDepth']: if not (outpath / landcover / var).exists(): os.makedirs(outpath / landcover / var) # ---- Rasterize area_ds = xr.Dataset() load_dict = {} #------------------------- ### 1} For each resolution #------------------------- for res in resolution: if res is None: print('Err: the `resolution` argument is mandatory') return n_mask = 0 #------------------- ### 2} For each mask #------------------- for m in mask: n_mask += 1 areaLandcover = xr.Dataset({'forest': 0, 'grassland': 0, 'irrPaddy': 0, 'irrNonPaddy': 0, 'sealed': 0, 'water': 0}) cropgroupnumber = areaLandcover.copy(deep = True) print(f"\nresolution {res} m | mask {n_mask} | area & fracLandcover") #-------------------------------------------------- ### 3} For each row (= rule) in `data` (guidelines) #-------------------------------------------------- for idx in guidelines.index: # to avoid loading the same file several times if guidelines.loc[idx, 'polygon_file'] in load_dict: # use the previously loaded file vect_ds = load_dict[guidelines.loc[idx, 'polygon_file']] else: if polygon_file is None: # load the file... polyg_path = Path(guidelines.loc[idx, 'polygon_file']) if not polyg_path.is_absolute(): polyg_path = (data.parent / polyg_path).resolve() vect_ds = geo.load_any(polyg_path) # ...and then store it, for potential later use load_dict[guidelines.loc[idx, 'polygon_file']] = vect_ds.copy() else: vect_ds = geo.load_any(polygon_file) load_dict[guidelines.loc[idx, 'polygon_file']] = vect_ds.copy() if isinstance(vect_ds, pd.DataFrame): # Interpreting the condition (building the selection) cond = guidelines.loc[idx, 'column_name: code'] ini = True if 'or' in cond: cond_list = cond.split(' or ') else: cond_list = [cond] for subcond in cond_list: if ' in ' in subcond: in_pattern = re.compile("(.*)( in )[\(\)\[\]](.*)[\(\)\[\]]") col_name, _, val = in_pattern.findall(subcond)[0] val = val.replace(' ', '').split(',') else: col_name, val = subcond.replace('=', ':').replace(' ', '').split(':') val = [val] # Correct the dtypes if is_numeric_dtype(vect_ds[col_name]): val = [float(v) for v in val] # Extracting rows subsel = vect_ds[vect_ds[col_name] == val[0]].index # Safeguard for mixed dtypes if len(subsel) == 0: try: subsel = vect_ds[vect_ds[col_name] == float(val[0])].index except: pass if len(val) > 1: for v in val: add_subsel = vect_ds[vect_ds[col_name] == v].index # Safeguard for mixed dtypes if len(add_subsel) == 0: try: add_subsel = vect_ds[vect_ds[col_name] == float(v)].index except: pass # Merge the indexes subsel = subsel.union(add_subsel) # Combining the subcond if ini: sel = subsel ini = False else: sel = sel.union(subsel) # Adding time filter time_var = geo.main_time_dims(vect_ds)[0] # if there is no time axis in `vect_ds` (polygon_file) # or if `vect_ds` contains only one year, the year condition # is ignored or useless, meaning that the index selection is # kept as it is (all previously selected # rows from `vect_ds` will be considered) if time_var is None: pass elif len(vect_ds.indexes[time_var].year.unique()) <= 1: # info if year != vect_ds.indexes[time_var].year.unique()[0]: print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the 'polygon_file'. Year condition is ignored.") pass # Otherwise, the year condition will be applied: else: # safeguard if any(vect_ds[time_var].dt.year == year): sel = sel.intersection( vect_ds[vect_ds[time_var].dt.year == year].index) else: print(f"Warning: the specified `year` ({year_arg}) does not exist in 'polygon_file' {guidelines.loc[idx, 'polygon_file']}") print("Which year from the 'polygon_file' should be used instead?") replacement_year = input('- ' + '\n- '.join(vect_ds.indexes[time_var].year.astype(str).unique().values) + '\n') sel = sel.intersection( vect_ds[vect_ds[time_var].dt.year == replacement_year].index) # Apply selection vect_cond_ds = vect_ds.loc[sel, ['geometry']] # ============================================================================= # vect_cond_ds[idx] = 1 # dummy numeric column for rasterization # ============================================================================= # Safeguard if len(vect_cond_ds) > 0: # Rasterize data area = geo.rasterize( vect_cond_ds, mask = m, bounds = bounds, resolution = res, x0 = x0, y0 = y0, base_template = base_template, rasterize_mode = 'coverage', force_polygon = True, **rio_kwargs, ) # Add the time axis area = area.expand_dims({'time': [pd.to_datetime(f"{year}-01-01", format = "%Y-%m-%d")]}) # Group all `area[idx]` [xr.DataArray] into the same `area_ds` [xr.Dataset] area_ds[idx] = area[geo.main_vars(area)[0]] elif isinstance(vect_ds, xr.Dataset): area_ds = vect_ds if not set(guidelines.index) == set(geo.main_vars(area_ds)): print("Err: `guidelines` and raster area file do not contain the same variables") return # Adding time filter time_var = geo.main_time_dims(area_ds)[0] # if there is no time axis in `vect_ds` (polygon_file) # or if `vect_ds` contains only one year, the year condition # is ignored or useless, meaning that the index selection is # kept as it is (all previously selected # rows from `vect_ds` will be considered) if time_var is None: pass elif len(area_ds.indexes[time_var].year.unique()) <= 1: # info if year != vect_ds.indexes[time_var].year.unique()[0]: print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the 'polygon_file'. Year condition is ignored.") pass # Otherwise, the year condition will be applied: else: # safeguard if any(vect_ds[time_var].dt.year == year): area_ds = area_ds.loc[{time_var: slice(str(year), str(year))}] else: print(f"Warnin: the specified `year` ({year_arg}) does not exist in 'polygon_file' {guidelines.loc[idx, 'polygon_file']}") print("Which year from the 'polygon_file' should be used instead?") replacement_year = input('- ' + '\n- '.join(vect_ds.indexes[time_var].year.astype(str).unique().values) + '\n') area_ds = area_ds.loc[{time_var: slice(str(replacement_year), str(replacement_year))}] # Complete the landcover fractions areaLandcover[guidelines.loc[idx, 'cwatm_class']] = areaLandcover[guidelines.loc[idx, 'cwatm_class']] + area_ds[idx] # ---- Export base results area_ds = geo.georef(area_ds, crs = area_ds.rio.crs) # area_ds = geo.georef(area_ds) area_ds = geo.cropfill(area_ds, m) geo.export(area_ds, outpath / 'area' / (outname + '_area' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc')) # ---- Export landcover fractions areaLandcover = geo.georef(areaLandcover, crs = area_ds.rio.crs) areaLandcover = geo.cropfill(areaLandcover, m) fractionLandcover = areaLandcover.copy(deep = True) for lc in fractionLandcover.data_vars: fractionLandcover = fractionLandcover.rename({lc: 'frac' + lc}) fractionLandcover = fractionLandcover / fractionLandcover.to_array().sum("variable") fractionLandcover = geo.georef(fractionLandcover, crs = area_ds.rio.crs) geo.export(fractionLandcover, outpath / 'fracLandcover' / (outname + '_fracLandcover' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc')) # ---- Combine parameters param_list = ['maxRootDepth', 'rootFraction1', 'cropgroupnumber', 'arnoBeta', 'cropCoefficient', 'interceptCap'] #----------------------------------------------------- ### 3'} For each CWatM landcover class with vegetation #----------------------------------------------------- # forest, grassland, irrPaddy, irrNonPaddy for cwatm_class in set(guidelines.cwatm_class.unique()) - {'sealed', 'water'}: # Extract guidelines for each class (forest, grassland, irrPaddy, irrNonPaddy, sealed, water) class_indexes = guidelines[guidelines.cwatm_class == cwatm_class].index # Average parameters values according to area coverage, and group them into `class_param` xr.Dataset class_param = xr.Dataset() class_param_dynamic = xr.Dataset() #----------------------------------- ### 4'} For each landcover parameter #----------------------------------- for param in set(param_list).intersection(set(guidelines.columns)): print(f"\nresolution {res} m | mask {n_mask} | {param}") ini_idx = True # `ini` is True for first iteration #------------------------------------------------ ### 5'} For each row (= rule) in this CWatM class #------------------------------------------------ for idx in class_indexes: # Retrieve parameter values raw_val = guidelines.loc[idx, param] # Safeguard for columns with mixed dtypes (numeric and paths) try: raw_val = float(raw_val) except: pass if isinstance(raw_val, (int, float)): if np.isnan(raw_val): raw_val = 0 val = raw_val elif isinstance(raw_val, (str, Path)): # load the file... param_path = Path(raw_val) if not param_path.is_absolute(): param_path = (data.parent / param_path).resolve() val = geo.load_any(param_path, # next arguments are only used for CSV data sep = sep, decimal = decimal, header = 0, index_col = 0, date_format = csv_date_format) # Adding time filter if isinstance(val, xr.Dataset): # Convert xr.Dataset into xr.DataArray val = val[geo.main_vars(val)[0]] param_time_var = geo.main_time_dims(val)[0] # if there is no time axis in `val` or if `val` # contains only one year, the `year` condition # is ignored or useless if param_time_var is None: pass elif len(val.indexes[param_time_var].year.unique()) <= 1: # info if year != val.indexes[param_time_var].year.unique()[0]: print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the parameter file {param_path.name}. Year condition is ignored.") val[param_time_var] = pd.to_datetime(val[param_time_var]) + pd.DateOffset(years = year - val[param_time_var][0].dt.year.item()) # Otherwise, the year condition will be applied: else: # safeguard if any(val[param_time_var].dt.year == year): val = val.loc[{param_time_var: slice(f"{year}-01-01", f"{year}-12-31")}] else: print(f"Warning: the specified `year` ({year_arg}) does not exist in {param_path.name}") print("Which year should be used instead?") replacement_year = input('- ' + '\n- '.join(val.indexes[param_time_var].year.astype(str).unique().values) + '\n') val = val.loc[{param_time_var: slice(f"{replacement_year}-01-01", f"{replacement_year}-12-31")}] # Standardize the date to first January of `year` if param not in ['cropCoefficient', 'interceptCap']: # yearly values val = val[[{param_time_var: 0}]] # Only a single date is kept val[param_time_var] = pd.to_datetime(f"{year}-01-01") # this date is normalized to 1st January elif isinstance(val, pd.DataFrame): # the time axis should be the index # Format the time dimension # Concatenate 3 arrays to be sure to have a full year starting from 1st January to 31st December val = pd.concat([val, val.set_index(val.index + pd.DateOffset(years=1)), val.set_index(val.index + pd.DateOffset(years=2))]) ini_year = val.index[0].year # Fill the gaps in the index val = val.reindex(pd.date_range(start = val.index[0], end = val.index[-1], freq = '1D')) val.index.name = 'time' # Fill the gaps in the values, by interpolating according to the time axis val = val.interpolate(method = 'time') # Keep only a full year starting from 1st January to 31st December full_year = val[(val.index.day == 1) & (val.index.month == 1)].index.year[0].item() val = val.loc[f'{full_year}-01-01': f'{full_year}-12-31'] # Change first year to first year of initial data val.index = val.index + pd.DateOffset(years = ini_year - val.index[0].year) # Keep only one value "each 10 days" (or more exaclty, keep only 01st, 11st and 21st of each month) val = val[(val.index.day == 1) | (val.index.day == 11) | (val.index.day == 21)] # Export (for info, not necessary for CWatM) if not (param_path.parent / '10day').exists(): os.makedirs(param_path.parent / '10day' ) geo.export(val, param_path.parent / '10day' / param_path.name, sep = sep, decimal = decimal, date_format = csv_date_format) # Convert pd.DataFrame into pd.Series val = val[geo.main_vars(val)[0]] # safeguard param_time_var = val.index.name if param_time_var != 'time': print("Info: {raw_val} may not contain time as index") # if `val` contains only one year, the `year` # condition is ignored or useless if len(val.index.year.unique()) <= 1: # info if year != val.index.year.unique()[0]: print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the parameter file {param_path.name}. Year condition is ignored.") val.index = val.index + pd.DateOffset(years = year - val.index[0].year) # Otherwise, the year condition will be applied: else: # safeguard if any(val.index.year == year): val = val[f"{year}-01-01":f"{year}-12-31"] else: print(f"Warning: the specified `year` ({year_arg}) does not exist in {param_path.name}") print("Which year should be used instead?") replacement_year = input('- ' + '\n- '.join(val.index.year.astype(str).unique().values) + '\n') val = val[f"{replacement_year}-01-01":f"{replacement_year}-12-31"] # Standardize the date to first January of `year` if param not in ['cropCoefficient', 'interceptCap']: # yearly values val = val.iloc[[0]] # Only a single date is kept val.index = [pd.to_datetime(f"{year}-01-01")] # this date is normalized to 1st January # Convert to xr.DataArray val = xr.DataArray.from_series(val) else: # raw_val is Nan pass # Compute area-weighted averages if ini_idx: if param in ['cropCoefficient', 'interceptCap']: # daily values class_param_dynamic[param] = area_ds[idx].loc[{'time': f"{year}-01-01"}] * val # note how time dimension is dropped here else: # yearly values class_param[param] = area_ds[idx].loc[{'time': [f"{year}-01-01"]}] * val sum_area = area_ds[idx].rename('area') else: if param in ['cropCoefficient', 'interceptCap']: # daily values class_param_dynamic[param] = class_param_dynamic[param] + area_ds[idx].loc[{'time': f"{year}-01-01"}] * val # note how time dimension is dropped here else: # yearly values class_param[param] = class_param[param] + area_ds[idx].loc[{'time': [f"{year}-01-01"]}] * val sum_area = sum_area + area_ds[idx].rename('area') ini_idx = False # after first iteration, `ini` becomes False # `sum_area`is reinitialized and computed for each param, but it is equal for each param # Ideally it should be computed only once # Normalize by sum of area to get the weighted average by coverage class_param = class_param / sum_area # class_param = class_param.where(sum_area != 0, np.nan) # replace Inf with 0 class_param_dynamic = class_param_dynamic / sum_area[{'time': 0}] # class_param_dynamic = class_param_dynamic.where(sum_area[{'time': 0}] != 0, np.nan) # replace Inf with 0 # Georeference class_param = geo.georef(class_param, crs = area_ds.rio.crs) # geo.georef(class_param) class_param = geo.cropfill(class_param, m) # fill previous nan with mean value class_param_dynamic = geo.georef(class_param_dynamic, crs = area_ds.rio.crs) # geo.georef(class_param) class_param_dynamic = geo.cropfill(class_param_dynamic, m) # fill nan with mean value # Complete cropgroupnumber (for global result weighted by landcover) for param in set(param_list).intersection(set(guidelines.columns)): if param == 'cropgroupnumber': cropgroupnumber[cwatm_class] = class_param[param] # ---- Export parameters if param in ['cropCoefficient', 'interceptCap']: geo.export(class_param_dynamic[[param]], outpath / cwatm_class / param / (outname + f'_{param}' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc')) else: # 'cropgroupnumber', 'maxRootDepth' geo.export(class_param[[param]], outpath / cwatm_class / param / (outname + f'_{param}' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc')) # ---- Compute cropgroupnumber # Weight cropnumber of each class with corresponding area cropgroupnumber = cropgroupnumber * areaLandcover # Divide by the sum of the area of all classes, in order to retrieve a # cropgroupnumber weighted by the area fraction of the 4 classes WITH VEGETATION # (it is different from fractionLandcover that takes into account the 6 landcovers) cropgroupnumber = cropgroupnumber / areaLandcover[['forest', 'grassland', 'irrPaddy', 'irrNonPaddy']].to_array().sum("variable") cropgroupnumber_mean = xr.Dataset({'cropgroupnumber': cropgroupnumber.to_array().sum('variable')}) # Mask and georef cropgroupnumber_mean = cropgroupnumber_mean.where(areaLandcover['forest'].notnull()) cropgroupnumber_mean = geo.georef(cropgroupnumber_mean, crs = area_ds.rio.crs) geo.export(cropgroupnumber_mean, outpath / 'cropgroupnumber' / (outname + '_cropgroupnumber' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc'))
# TODO: to remove? # Obsolete... @formating def ocspaysbasque(data, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, **rio_kwargs, ): # OCS SCOT Pays Basque data_folder, filelist = geo.get_filelist(data, extension = '.nc') # outpath = outpath[:-3] # to remove '.nc' outpath = os.path.splitext(filelist[0])[0] outpath = os.path.join(data_folder, outpath) # Corriger les données ds = stzfr.ocspaysbasque([os.path.join(data_folder, f) for f in filelist]) # Géoréférencer les données ds = geo.georef(data = ds, data_type = "other", crs = 4326) # Compression avec pertes (packing) ds_comp = geo.pack(ds) geo.export(ds_comp, outpath + '_pack' + '.nc') # Reprojections et exports # rio_kwargs['dst_crs'] = 2154 for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res n_mask = 0 for m in mask: n_mask += 1 ds_rprj = geo.reproject(data = ds, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc') # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs... del ds_rprj del ds del ds_comp #%%% Soil data @formating def soil(data, # vector_data, tabular_data, fractionLandcover_data, landcover_folder = None, maxRootDepth_forest = None, maxRootDepth_grassland = None, maxRootDepth_irrPaddy = None, maxRootDepth_irrNonPaddy = None, *, mask=None, bounds=None, resolution=None, x0=None, y0=None, base_template=None, outpath=None, **rio_kwargs, ): """ Note: This function takes a lot of time (it can take 1h for a whole department). Parameters ---------- data : TYPE DESCRIPTION. tabular_data : TYPE DESCRIPTION. fractionLandcover_data : TYPE DESCRIPTION. landcover_folder : TYPE, optional DESCRIPTION. The default is None. maxRootDepth_forest : TYPE, optional DESCRIPTION. The default is None. maxRootDepth_grassland : TYPE, optional DESCRIPTION. The default is None. maxRootDepth_irrPaddy : TYPE, optional DESCRIPTION. The default is None. maxRootDepth_irrNonPaddy : TYPE, optional DESCRIPTION. The default is None. * : TYPE DESCRIPTION. mask : TYPE, optional DESCRIPTION. The default is None. bounds : TYPE, optional DESCRIPTION. The default is None. resolution : TYPE, optional DESCRIPTION. The default is None. x0 : TYPE, optional DESCRIPTION. The default is None. y0 : TYPE, optional DESCRIPTION. The default is None. base_template : TYPE, optional DESCRIPTION. The default is None. outpath : TYPE, optional DESCRIPTION. The default is None. **rio_kwargs : TYPE DESCRIPTION. : TYPE DESCRIPTION. Returns ------- None. """ # ---- Load data if isinstance(data, (str, Path)): data = Path(data) outpath = data.parent / 'cwatm' if not outpath.exists(): os.makedirs(outpath) for lc in ['forest', 'general']: if not (outpath / lc).exists(): os.makedirs(outpath / lc) else: if outpath is None: print("Err: `outpath` argument is required when `data` is not a file") return vector_ds = geo.load_any(data) tabular_ds = geo.load_any(tabular_data, sep = ';') fractionLc_ds = geo.load_any(fractionLandcover_data) x_var1, y_var1 = geo.main_space_dims(fractionLc_ds)[0] #------------------------- ### 1} For each resolution #------------------------- for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' n_mask = 0 #------------------- ### 2} For each mask #------------------- for m in mask: n_mask += 1 vector_reprj = geo.reproject(vector_ds, mask = m, bounds = bounds, resolution = res, x0 = x0, y0 = y0, base_template = base_template, **rio_kwargs) # Loading maxRootDepth data, depending if user passed a folder or paths for each landcover maxRootDepth_ds = xr.Dataset() maxRootDepth_path_dict = {'forest': maxRootDepth_forest, 'grassland': maxRootDepth_grassland, 'irrNonPaddy': maxRootDepth_irrNonPaddy, 'irrPaddy': maxRootDepth_irrPaddy, } # dict of file paths ## Case where user passes a folder # folders have to have standard CWatM landcover names, and files inside # should be named 'maxRootDepth.nc' if landcover_folder is not None: landcover_folder = Path(landcover_folder) for lc in maxRootDepth_path_dict.keys(): if (landcover_folder / lc / 'maxRootDepth.nc').is_file(): temp = geo.load_any(landcover_folder / lc / 'maxRootDepth.nc') main_var = geo.main_vars(temp)[0] maxRootDepth_ds[lc] = temp[main_var] ## Case where user passes multiple file paths else: for lc in maxRootDepth_path_dict.keys(): if maxRootDepth_path_dict[lc] is not None: if Path(maxRootDepth_path_dict[lc]).is_file(): temp = geo.load_any(maxRootDepth_path_dict[lc]) main_var = geo.main_vars(temp)[0] maxRootDepth_ds[lc] = temp[main_var] # Remove the time axis t_var2 = geo.main_time_dims(maxRootDepth_ds)[0] if t_var2 is not None: maxRootDepth_ds = maxRootDepth_ds[{t_var2: 0}].drop_vars(t_var2) # Compute the average over all landcovers that have common parameters # 'forest' stands aside, 'grassland' averages 'grassland', 'irrNonPaddy' and 'irrPaddy' maxRootDepth_ds['general'] = maxRootDepth_ds[set(maxRootDepth_ds.data_vars) - {'forest'}].to_array().mean('variable') # maxRootDepth_ds = maxRootDepth_ds.drop_vars(['irrNonPaddy', 'irrPaddy']) maxRootDepth_ds = maxRootDepth_ds[['forest', 'general']] # Clean cut of maxRootDepth # ============================================================================= # maxRootDepth_ds = maxRootDepth_ds.where(geo.load_any(m)) # ============================================================================= x_var2, y_var2 = geo.main_space_dims(maxRootDepth_ds)[0] # generate a raster dataset containing the area covered by soil UCS polygons on each cell vector_area = vector_reprj[['geometry']].copy() vector_area.loc[:, 'area'] = 1 raster_area = geo.reproject(vector_area, mask = m, bounds = bounds, resolution = res, x0 = x0, y0 = y0, base_template = base_template, rasterize = True, rasterize_mode = {'area': 'coverage'}, **rio_kwargs) raster_area = raster_area.rename({'area = 1': 'area'}) # Columns to use for ROSETTA transfer functions # (only columns present in data will be used) col_to_extract = ['TAUX ARGILE_val_mod', # [g/kg] --> needs to divide by 10 to convert to % 'TAUX LIMON_val_mod', # [g/kg] 'TAUX SABLE_val_mod', # [g/kg] 'DENS_APPAR_val_mod', # [g/cm3] 'HUMID_PF42_val_mod', # [g/kg] water moisture at permanent wilting point (-1500 kPa) # --> needs to be divided by 1000 to convert to cm3/cm3 'HUMID_PF2_val_mod', # [g/kg] water moisture at field capacity (-33 kPa) ] var_list = list(set(col_to_extract).intersection(tabular_ds.columns)) # ---- Create a vector grid shape = raster_area.rio.shape # height, width x_res, _, x_min, _, y_res, y_max, _, _, _ = raster_area.rio.transform() grid_gdf, xy_coords = geo.vector_grid(height = shape[0], width = shape[1], x_min = x_min, y_max = y_max, x_res = x_res, y_res = y_res, crs = vector_reprj.crs) coords = [xy_coords['y_coords'], xy_coords['x_coords']] dims = ['y', 'x'] # ---- Computations results = {'forest': maxRootDepth_ds[['forest']].rename({'forest': 'soildepth1'}).copy(deep = True), 'general': maxRootDepth_ds[['general']].rename({'general': 'soildepth1'}).copy(deep = True) } for lc in ['forest', 'general']: results[lc]['soildepth_total'] = results['forest']['soildepth1'] * 0 # initialization into a null array for var in var_list: for i in ['1', '2', '3']: # CWatM soil layer number results[lc][var+i] = results['forest']['soildepth1'] * 0 # initialization into a null array # ============================================================================= # lay_1 = xr.Dataset() # first soil layer in CWatM # lay_2 = xr.Dataset() # second layer # lay_3 = xr.Dataset() # third layer # ============================================================================= for ucs in tabular_ds.no_ucs.unique(): print(f" . UCS {ucs}") # Create a raster map of area covered by this specific UCS on each cell # (only few cells will be non-null) ucs_area = geo.rasterize_categorical(vector_reprj[vector_reprj.no_ucs == ucs], grid_gdf, is_polygon = True, ) da_stacked = ucs_area.where(ucs_area > 0, drop = True).stack(c = [x_var2, y_var2]) loc = da_stacked[da_stacked.notnull()].c.values # Selection for one UCS sub_tabular_ds = tabular_ds[tabular_ds.no_ucs == ucs] uts_list = sub_tabular_ds.id_uts.unique() for uts in uts_list: # Selection for one UTS uts_ds = sub_tabular_ds[sub_tabular_ds.id_uts == uts].copy() uts_ds.set_index('no_strate', inplace = True) uts_ds.loc[0] = 0 # add a first null ligne uts_ds.sort_values(by = 'prof_inf_moy', inplace = True) # Safeguard for missing data uts_ds.reset_index(inplace = True, drop = True, names = 'no_strate') # n_strates = uts_ds.no_strate.max() # number of strates for this UTS n_strates = uts_ds.index.max() # number of strates for this UTS # ============================================================================= # # Create an xr.DataArray with strate values on the z-axis # strate_ds = ucs_area.copy(deep = True) # strate_ds = strate_ds.expand_dims(dim = {'strate': range(1, n_strates+1)}, axis = 0).copy() # ============================================================================= # ============================================================================= # for no_st in strate_ds.strate.values: # ============================================================================= # ============================================================================= # strate_ds.loc[{'strate': no_st}] = uts_ds[uts_ds.no_strate == no_st]['prof_inf_moy'].item() # ============================================================================= # strate_ds = strate_ds.where(strate_ds.strate == st, uts_ds[uts_ds.no_strate == st]['prof_inf_moy'].item()) # ============================================================================= # for lc in landcover: # for var in var_list: # diff = maxRootDepth_ds[lc] - uts_ds.loc[no_st, 'prof_inf_moy'] # cond = diff >= 0 # lay_2[var] = lay_2[var] + xr.where( # cond, # uts_ds.loc[no_st, var] * uts_ds.loc[no_st, 'prof_inf_moy'], # ... # ) # # lay_3[var] = lay_3[var] + xr.where( # cond, # -diff * uts_ds.loc[no_st, var]) + # (uts_ds.loc[i, 'prof_inf_moy'] - uts_ds.loc[i-1, 'prof_inf_moy']) * uts_ds.loc[i, var] for i in range(no_st, strate_ds.strate.max()) # ============================================================================= # Compute soil depth results['forest']['soildepth_total'] = results['forest']['soildepth_total'] + ucs_area * uts_ds['prof_inf_moy'].max()*uts_ds['pourcent'].max()/100 # average soil depth weighted by the coverage of each UTS # Dividing by the total coverage (raster_area) will be done at the end, # to do it at once, and to avoid nan (0 / 0). for lc in maxRootDepth_ds: # Compute texture for var in var_list: for cell in loc: root_depth = maxRootDepth_ds[lc].loc[{x_var2: cell[0], y_var2: cell[1]}].item() val1 = 0 # Soil strate number, notconsidering the first strate (layer 0 added by the script to the data) no_st = 1 # uts_ds.index.min()+1 strate_depths = uts_ds.loc[:, 'prof_inf_moy'].values # 5 cm, first soil layer of CWatM while (0.05 > strate_depths[no_st]) \ & (no_st < uts_ds.index.max()): val1 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] no_st += 1 if no_st >= uts_ds.index.max(): val1 += (0.05 - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] # val1 = val1 / strate_depths[no_st] val1 = val1 / 0.05 val2 = val1 val3 = val1 else: val1 += (0.05 - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] val2 = (strate_depths[no_st] - 0.05) * uts_ds.loc[no_st, var] no_st += 1 while (root_depth > strate_depths[no_st]) \ & (no_st < uts_ds.index.max()): val2 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] no_st += 1 if no_st >= uts_ds.index.max(): val1 = val1 / 0.05 val2 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] val2 = val2 / (strate_depths[no_st] - 0.05) val3 = val2 else: val2 += (root_depth - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] val3 = (strate_depths[no_st] - root_depth) * uts_ds.loc[no_st, var] no_st += 1 while (strate_depths[no_st] < strate_depths.max()) \ & (no_st < uts_ds.index.max()): # depth of the third soil layer = max depth val3 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] no_st += 1 val3 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var] val1 = val1 / 0.05 val2 = val2 / (root_depth - 0.05) val3 = val3 / (strate_depths.max() - root_depth) for i, val in {1: val1, 2: val2, 3:val3}.items(): results[lc][var + str(i)].loc[{x_var2: cell[0], y_var2: cell[1]}] \ = results[lc][var + str(i)].loc[{x_var2: cell[0], y_var2: cell[1]}] \ + ucs_area.loc[{x_var2: cell[0], y_var2: cell[1]}] * val * uts_ds['pourcent'].max()/100 # Finish computing soil depth # So fat total soil depth is computed the same way for forest and grassland results['general']['soildepth_total'] = results['forest']['soildepth_total'] # same total soil depth # ---- Normalize by total area for lc in ['forest', 'general']: # Soil depth results[lc]['soildepth_total'] = results[lc]['soildepth_total'] / raster_area['area'] # Corrections when maxRootDepths deeper than max soil depth: results[lc]['soildepth1'] = results[lc]['soildepth1'].where( results[lc]['soildepth1'] < results[lc]['soildepth_total'], results[lc]['soildepth_total']) results[lc]['soildepth2'] = results[lc]['soildepth_total'] - results[lc]['soildepth1'] # results[lc]['soildepth2'] = results[lc]['soildepth2'].where(results[lc]['soildepth2'] >= 0, 0) # replace negative values with 0 # Soil texture for var in var_list: for i in ['1', '2', '3']: # CWatM soil layer number results[lc][var+i] = results[lc][var+i] / raster_area['area'] # ---- Compute soil parameters with transfer functions for i in ['1', '2', '3']: # CWatM soil layer number # soildata must be a list of lists like [# [%sand, %silt, %clay, (bulk density, th33, th1500)]] # see https://github.com/usda-ars-ussl/rosetta-soil # Safeguard if percentages do not sum up to 100% norm = results[lc]['TAUX SABLE_val_mod'+i].values.flatten(order = 'C') + results[lc]['TAUX LIMON_val_mod'+i].values.flatten(order = 'C') \ + results[lc]['TAUX ARGILE_val_mod'+i].values.flatten(order = 'C') soildata = SoilData.from_array( np.array([ results[lc]['TAUX SABLE_val_mod'+i].values.flatten(order = 'C')/norm*100, # [g/kg] -->[-] --> [%] results[lc]['TAUX LIMON_val_mod'+i].values.flatten(order = 'C')/norm*100, results[lc]['TAUX ARGILE_val_mod'+i].values.flatten(order = 'C')/norm*100, # results[lc]['DENS_APPAR_val_mod'+i].values.flatten(order = 'C'), # faulty in RRP Pays Basque ]).transpose().tolist() ) mean, _, code = rosetta(3, soildata) results[lc]['KSat'+i] = ([y_var2, x_var2], 10**mean.transpose()[4].reshape(results[lc]['soildepth_total'].shape)) # [cm/d] results[lc]['alpha'+i] = ([y_var2, x_var2], 10**mean.transpose()[2].reshape(results[lc]['soildepth_total'].shape)) # [1/cm] results[lc]['lambda'+i] = ([y_var2, x_var2], 10**mean.transpose()[3].reshape(results[lc]['soildepth_total'].shape) - 1) # [-] results[lc]['thetas'+i] = ([y_var2, x_var2], mean.transpose()[1].reshape(results[lc]['soildepth_total'].shape)) # [-] results[lc]['thetar'+i] = ([y_var2, x_var2], mean.transpose()[0].reshape(results[lc]['soildepth_total'].shape)) # [-] # ---- Correction and export # results[lc] = geo.georef(results[lc], crs = results[lc].rio.crs) for i in ['3', '2', '1']: for var in ['soildepth', 'soildepth_total', 'KSat', 'alpha', 'lambda', 'thetas', 'thetar',] + var_list: if (var in ['soildepth', 'soildepth_total']) & (i == '3'): # does not exist continue # Safeguard for holes in results if var != 'soildepth': if i in ['1', '2']: # fill with lower layer results[lc][var+i] = results[lc][var+i].where( results[lc][var+i].notnull() | results[lc]['soildepth1'].isnull(), results[lc][var+str(int(i)+1)]) # ============================================================================= # # Fill with average value # results[lc][var+i] = results[lc][var+i].where( # results[lc][var+i].notnull() | results[lc]['soildepth1'].isnull(), results[lc][var+i].mean()) # ============================================================================= # ============================================================================= # # Mask data # results[lc][var+i] = results[lc][var+i].where(results[lc]['soildepth1'].notnull()) # ============================================================================= # Georef results[lc][var+i] = geo.georef(results[lc][var+i]) # Fill with average value results[lc][var+i] = geo.cropfill(results[lc][var+i], m) # Export geo.export( results[lc][var+i], outpath / lc / (var+i + f'_mask{n_mask}' + f'_{res}m' + '.nc')) #%%% Water Use data @formating def withdrawal(data, *, domes_efficiency, indus_efficiency, yearly_to_monthly = None, mask = None, bounds = None, resolution = None, x0 = None, y0 = None, base_template = None, outpath = None, **rio_kwargs, ): # ---- Initialisations # Load gdf = geo.load_any(data) gdf['volume'] = gdf['volume'] / 1e6 # convert m3 into Mm3 # TODO : pouvoir universaliser ces noms de colonnes gdf_dict = dict() gdf_dict['surface'] = gdf[gdf['libelle_type_milieu'] == 'Surface continental'] gdf_dict['groundwater'] = gdf[gdf['libelle_type_milieu'] == 'Souterrain'] gdf_dict['domestic'] = gdf[gdf['libelle_usage'] == 'EAU POTABLE'] gdf_dict['industry'] = gdf[gdf['libelle_usage'] == 'INDUSTRIE et ACTIVITES ECONOMIQUES (hors irrigation, hors énergie)'] # Folders if isinstance(data, (str, Path)): data = Path(data) outpath = data.parent / 'cwatm' outname = data.stem if not outpath.exists(): os.makedirs(outpath) else: if outpath is None: print("Err: `outpath` argument is required when `data` is not a file") return else: outname = 'withdrawal' # Time time_dim = geo.main_time_dims(gdf)[0] year_start = int(gdf[time_dim].dt.year.min()) year_end = int(gdf[time_dim].dt.year.max()) # Gross/Netto coefficients efficiency_dict = dict() efficiency_dict['domestic'] = domes_efficiency efficiency_dict['industry'] = indus_efficiency # ---- Format yearly_to_monthly if yearly_to_monthly is not None: # Load files, and disqualify numerics if isinstance(yearly_to_monthly, (int, float)): print("Err: `yearly_to_monthly` argument should be an iterable or a xarray variable with a length of 12. " f"As it is a {type(yearly_to_monthly)}, it is ignored.") yearly_to_monthly = None elif isinstance(yearly_to_monthly, (str, Path)): yearly_to_monthly = geo.load_any(yearly_to_monthly) if isinstance(yearly_to_monthly, xr.DataArray): yearly_to_monthly = yearly_to_monthly.to_dataset(name = 'volume_coeff') if yearly_to_monthly is not None: if isinstance(yearly_to_monthly, xr.Dataset): yearly_to_monthly.rename({geo.main_time_dims(yearly_to_monthly)[0]: 'time', geo.main_vars(yearly_to_monthly)[0]: 'volume_coeff'}) if len(yearly_to_monthly.time) > 12: print("Warning: `yearly_to_monthly` argument should have a length of 12. " f"As its actual lenght is {len(yearly_to_monthly)}, it is truncated.") yearly_to_monthly = yearly_to_monthly.isel(time = range(0, 12)).copy() yearly_to_monthly = yearly_to_monthly.assign_coords(pd.to_datetime( year = 2020, month = yearly_to_monthly.time.dt.month, day = yearly_to_monthly.time.dt.day) ).sortby('time') elif len(yearly_to_monthly.time) < 12: print(f"Err: `yearly_to_monthly` argument is ignored because it should have a length of 12, not {len(yearly_to_monthly.time)}") yearly_to_monthly = None elif isinstance(yearly_to_monthly, (list, tuple, np.ndarray)): if len(yearly_to_monthly) > 12: print("Warning: `yearly_to_monthly` argument should have a length of 12. " f"As its actual lenght is {len(yearly_to_monthly)}, it is truncated.") yearly_to_monthly = yearly_to_monthly[0:12] # Convert list to DataArray yearly_to_monthly = xr.DataArray( coords = {'time': pd.date_range( start = '2000-01-01', end = '2000-12-01', freq = 'MS')}, data = yearly_to_monthly, name = 'volume_coeff' ) elif len(yearly_to_monthly.time) < 12: print(f"Err: `yearly_to_monthly` argument is ignored because it should have a length of 12, not {len(yearly_to_monthly.time)}") yearly_to_monthly = None else: # Convert list to DataArray yearly_to_monthly = xr.DataArray( coords = {'time': pd.date_range( start = '2000-01-01', end = '2000-12-01', freq = 'MS')}, data = yearly_to_monthly, name = 'volume_coeff' ) elif isinstance(yearly_to_monthly, (pd.DataFrame, pd.Series)): # Convert to numpy.array and correct values if isinstance(yearly_to_monthly, pd.DataFrame): # Convert pd.DataFrames to pd.Series yearly_to_monthly = yearly_to_monthly[geo.main_vars(yearly_to_monthly)[0]] if len(yearly_to_monthly) > 12: print("Warning: `yearly_to_monthly` argument should have a length of 12. " f"As its actual lenght is {len(yearly_to_monthly)}, it is truncated.") yearly_to_monthly = yearly_to_monthly[0:12] elif len(yearly_to_monthly.time) < 12: print(f"Err: `yearly_to_monthly` argument is ignored because it should have a length of 12, not {len(yearly_to_monthly.time)}") yearly_to_monthly = None if yearly_to_monthly is not None: if isinstance(yearly_to_monthly.index, pd.DatetimeIndex): # If there is an index, reoerder it (just in case it is aligned # with an hydrological or meteorological year instead of a calendar year) yearly_to_monthly.set_index(pd.to_datetime({ 'year': 2000, 'month': yearly_to_monthly.index.month, 'day': yearly_to_monthly.index.day}).sort_values(), inplace = True) # Convert to DataArray yearly_to_monthly = yearly_to_monthly.to_xarray() else: print("Err: `yearly_to_monthly` is ingored as its type is not recognized. ") yearly_to_monthly = None # Compute values so that the distribution sums up to 1 if yearly_to_monthly is not None: yearly_to_monthly = yearly_to_monthly / (yearly_to_monthly.sum(dim = 'time')) # For each mask n_mask = 0 for m in mask: n_mask += 1 # TODO # ---- Cut json files on masks --> move to stzfr.bnpe() # ============================================================================= # print(f"\nCut shapefile over mask {n_mask}/{len(mask)}:") # reprj_gdf = geo.reproject(gdf, # # main_vars = 1, # useless # mask = m) # # geo.export(reprj_gdf, # concat_folder / Path(f"mask{n_mask}") / f"{year_start}-{year_end}_clipped.json", # encoding='utf-8') # ============================================================================= # TODO # ---- CSV summary --> move to stzfr.bnpe() # ============================================================================= # # Reorder columns (optional) # col_names_base = ['code_ouvrage', 'nom_commune', 'volume', 'code_usage', 'libelle_usage', 'prelevement_ecrasant'] + [time_dim] # # if ('libelle_type_milieu' in gdf.columns) & ('code_type_milieu' in gdf.columns): # col_names = col_names_base + ['code_type_milieu', 'libelle_type_milieu'] # elif ('libelle_type_milieu' in gdf.columns) & ('code_type_milieu' not in gdf.columns): # col_names = col_names_base + ['libelle_type_milieu'] # elif ('libelle_type_milieu' not in gdf.columns) & ('code_type_milieu' in gdf.columns): # col_names = col_names_base + ['code_type_milieu'] # # ============================================================================= # # df = pd.DataFrame(reprj_gdf[col_names]) # # ============================================================================= # # col_names = [time_dim] + col_names # remaining_col = list(set(gdf.columns) - set(col_names)) # # ============================================================================= # # df = df[[time_dim] + col_names # # ============================================================================= # df = pd.DataFrame(gdf[col_names + remaining_col]) # # # Remove useless rows # df = df[df.volume > 0] # # # Sort by nom_commune # df.sort_values(by = 'nom_commune', inplace = True) # # # Export du fichier récapitulatif csv # df.to_csv(concat_folder / "recap.csv", sep = ';', encoding = 'utf-8') # ============================================================================= # For each resolution for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res # Rasterization ds_dict = dict() # ---- [Rasterization] Static map of ratio surface/groundwater abstraction daily = False # Determine time step monthly = False yearly = False time_step = (gdf_dict['surface'][time_dim][1] - gdf_dict['surface'][time_dim][0]).total_seconds() if round(time_step / 86400) == 1: daily = True elif round(time_step / 2592000) == 1: monthly = True elif round(time_step / 31557600) == 1: yearly = True for origin in ['surface', 'groundwater']: ds_dict[origin] = geo.reproject(data = gdf_dict[origin], mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, rasterize = True, main_var_list = 'volume', rasterize_mode = 'sum', **rio_kwargs) x_var, y_var = geo.main_space_dims(ds_dict[origin])[0] ds_dict['swAbstractionFrac'] = ( ds_dict['surface'] / (ds_dict['surface'] + ds_dict['groundwater']) ).mean(dim = time_dim) ds_dict['swAbstractionFrac'] = ds_dict['swAbstractionFrac'].where( (ds_dict['groundwater'].mean(dim = time_dim) > 0) | (ds_dict['groundwater'].mean(dim = time_dim).isnull()), 1) # ---- [Rasterization] Monthly data of domestic and industry withdrawals for use in ['domestic', 'industry']: gdf_dict[use].set_index('time', inplace = True) ds_dict[use] = geo.reproject(data = gdf_dict[use], mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, rasterize = True, main_var_list = 'volume', rasterize_mode = 'sum', **rio_kwargs) # Annual mean if yearly: # yearly sum --> monthly average monthly_ds = ds_dict[use].copy(deep = True) monthly_ds = monthly_ds.reindex({time_dim: pd.date_range( start = f'{year_start}-01-01', end = f'{year_end}-12-01', freq = 'MS')}) for date in ds_dict[use][time_dim]: year = str(date.dt.year.item()) year_array = ds_dict[use]['volume'].loc[ {time_dim: slice(f'{year}-01-01', f'{year}-12-31')}] for month in range(1, 13): monthly_ds['volume'].loc[ {time_dim: pd.to_datetime([f'{year}-{month}-01'])} ] = year_array.assign_coords( {time_dim: pd.to_datetime([f"{year}-{month}-01"])} ) # ========== previous method: much slower ===================================== # monthly_list = list() # for date in ds_dict[use][time_dim]: # year = date.dt.year.item() # year_array = ds_dict[use][['volume']].loc[ # {time_dim: slice(f'{year}-01-01', f'{year}-12-31')}] / 12 # for month in range(1, 13): # monthly_list.append( # year_array.assign_coords( # {time_dim: pd.to_datetime([f"{year}-{month}-01"])} # ) # ) # monthly_ds = geo.merge_data(monthly_list) # ============================================================================= # Apply monthly variations if yearly_to_monthly is not None: monthly_ds.loc[ {time_dim: slice(year, year)} ] = monthly_ds.loc[ {time_dim: slice(year, year)} ] * yearly_to_monthly['volume_coeff'].assign_coords( {time_dim: pd.to_datetime({'year': int(year), 'month': yearly_to_monthly.time.dt.month, 'day': yearly_to_monthly.time.dt.day }) } ) # Comppute gross and netto demands monthly_ds = monthly_ds.rename({'volume': f'{use}GrossDemand'}) monthly_ds[f'{use}NettoDemand'] = monthly_ds[f'{use}GrossDemand'] * efficiency_dict[use] # Update ds_dict ds_dict[use] = monthly_ds # ---- Export for var in ['domestic', 'industry', 'swAbstractionFrac']: if 'dst_crs' in rio_kwargs: ds_dict[var] = geo.georef(data = ds_dict[var], crs = rio_kwargs['dst_crs']) else: ds_dict[var] = geo.georef(data = ds_dict[var]) geo.export(ds_dict[var], outpath / (outname + f"_{var}" + f"_{year_start}-{year_end}" + res_suffix + f'_mask{n_mask}' + '.nc'), ) @formating def withdrawal_modflow(data, *, mask = None, bounds = None, resolution = None, x0 = None, y0 = None, base_template = None, outpath = None, **rio_kwargs, ): # ---- Initialisations # Load gdf = geo.load_any(data) # Folders if isinstance(data, (str, Path)): data = Path(data) outpath = data.parent / 'cwatm' outname = data.stem else: if outpath is None: print("Err: `outpath` argument is required when `data` is not a file") return else: outname = 'withdrawal' # Time time_dim = geo.main_time_dims(gdf)[0] # Select gruondwater pumpings gdf_groundwater = gdf[gdf['libelle_type_milieu'] == 'Souterrain'] # gdf_groundwater = gdf_groundwater.drop(time_dim, axis = 1) # (Does not improve much the computation time) # For each mask n_mask = 0 for m in mask: n_mask += 1 # For each resolution for res in resolution: if res is None: res_suffix = '' if 'resolution' in rio_kwargs: rio_kwargs.pop('resolution') else: res_suffix = f'_{res}m' rio_kwargs['resolution'] = res ds_reprj = geo.reproject(data = gdf_groundwater, mask = m, bounds = bounds, x0 = x0, y0 = y0, base_template = base_template, rasterize = True, main_var_list = 'volume', rasterize_mode = 'count', **rio_kwargs) ds = ds_reprj.fillna(0).sum(dim = time_dim) ds = xr.where( ds > 0, 1, 0, keep_attrs = True).astype(bool, keep_attrs = True) # ds = ds.where(ds_reprj[{time_dim: 0}].notnull()) # (better to stick to the bool type, with no nan) # ---- Export if 'dst_crs' in rio_kwargs: ds = geo.georef(data = ds, crs = rio_kwargs['dst_crs']) else: ds = geo.georef(data = ds) geo.export(ds, outpath / (outname + "_wells_mask_modflow" + res_suffix + f'_mask{n_mask}' + '.nc'), ) #%% TOOLS #%%% EXPAND INPUTS
[docs] def plot_annual_coeff(filepath, *, extension = '.csv', date_format = "%d/%m", to_file = False, **kwargs): """ Convert a sparse timeseries tabular data (or a set of data) into a 10-day, and plot it (or them). Particularly handy to have a quick overview of *cropCoefficients* and *interceptCap* annual timeseries. Important: in this kind of sparse timeseries, it is assumed that values correspond to their first occurence, and will be propagated foward (``01/10 | 5, 20/10 | 2`` means that the value ``5`` applies from 01/10 to 19/10, followed by the value ``2``). Parameters ---------- data : str or pathlib.Path File (tabular data) or folder. extension : str, optional, default '.csv' Used only when data is a folder. Define the file types that will be taken into account. date_format : str, optional, default "%d/%m" Argument to load data ( with pandas.read_csv). to_file : bool or path (str or pathlib.Path), default False If True and if ``data`` is a path (str or pathlib.Path), the resulting dataset will be exported to the same location as ``data``, while appending '_prolonged' to its name. If ``to_file`` is a path, the resulting dataset will be exported to this specified filepath. **kwargs : Keys arguments for ``pandas.read_csv()``: - ``sep`` - ``index_col`` - ``decimal`` - ... Returns ------- Create 10day timeseries (in a *10day* folder) as well as a html figure (in a *continuous* folder). """ # ---- Initialization if not isinstance(filepath, (str, Path)): print("Err: Only file or folder are supported so far.") return filepath = Path(filepath) if filepath.is_dir(): root, filelist = geo.get_filelist(filepath, extension = '.csv') elif filepath.is_file(): root = filepath.parent filelist = [filepath.name] kwargs['date_format'] = date_format df_all = pd.DataFrame() if not (root / '10day').exists(): os.makedirs(root / '10day') if not (root / 'continuous').exists(): os.makedirs(root / 'continuous') for file in filelist: print('\n----\n', file) df = geo.load_any( root / file, **kwargs) # Safeguard if not isinstance(df, (pd.DataFrame, pd.Series)): print("Err: file format not supported yet. Only tabular data is supported.") return varname = df.columns[0] # Concatenate 3 arrays to be sure to have a full year starting from 1st January to 31st December df = pd.concat([df, df.set_index(df.index + pd.DateOffset(years=1)), df.set_index(df.index + pd.DateOffset(years=2))]) # Fill the gaps in the index df = df.reindex(pd.date_range(start = df.index[0], end = df.index[-1], freq = '1D')) df.index.name = 'time' # Fill the gaps in the values, by interpolating according to the time axis df = df.interpolate(method = 'time') # Keep only a full year starting from 1st January to 31st December full_year = df[(df.index.day == 1) & (df.index.month == 1)].index.year[0].item() df = df.loc[f'{full_year}-01-01': f'{full_year}-12-31'] # Change year to 2000 (for homogeneity with other CWatM inputs) df.index = df.index + pd.DateOffset(years = 2000 - df.index[0].year) # Keep only one value "each 10 days" (or more exaclty, keep only 01st, 11st and 21st of each month) df = df[(df.index.day == 1) | (df.index.day == 11) | (df.index.day == 21)] geo.export(df, root / '10day' / file) # Rename col and concatenante df_all.index = df.index df_all[file] = df[varname] # df.iloc[:,0] # df_all = geo.merge_data(root / 'continuous') cmap = np.vstack([cmg.discrete(sequence_name = 'wong', alpha = 0.8, black = False, alternate = False, color_format = 'float'), cmg.discrete(sequence_name = 'wong', alpha = 0.8, black = False, alternate = False, color_format = 'float'), cmg.discrete(sequence_name = 'wong', alpha = 0.8, black = False, alternate = False, color_format = 'float')]) _, _, figweb = ncp.plot_time_series(data = df_all, labels = df_all.columns.values, lstyles = ['-']*9 + ['--']*9 + ['dotted']*9, lwidths = 1.5, linecolors = cmap, ) # ---- Layout figweb.update_layout(#font_family = 'Open Sans', title = {'font': {'size': 20}, 'text': "cropCoefficients", 'xanchor': 'center', 'x': 0.45, }, legend = {'title': {'text': 'Légende'}, 'xanchor': 'right', 'y': 1.1, 'xref': 'container', 'yanchor': 'top', 'bgcolor': 'rgba(255, 255, 255, 0.2)', }, plot_bgcolor = "white", # legend = {'groupclick': 'togglegroup'}, width = 1700, height = 800, ) figweb.write_html(root / 'continuous' / f'{varname}.html')
#%%% STANDARDIZE OUTPUTS # TODO: placeholder function that currently relies on geobricks.georef() # It may be better to move this functionality from georef() to here def output(data, *, data_type = 'CWatM'): # data_type peut être aussi = 'MODFLOW' return geo.georef(data) #%%% OTHER FUNCTIONS def zones_basses(mnt_raster): # Génère un raster indiquant les pixels sous le niveau de la mer, à partir # d'un raster de modèle numérique de terrain with xr.open_dataset(mnt_raster, decode_times = False, decode_coords = 'all') as mnt_ds: mnt_ds.load() def dummy_fractionLandcover(base, first_year = 2020, last_year = 2020): frac0 = geo.dummy_input(base, 0) frac1 = geo.dummy_input(base, 1) mvar = geo.main_var(frac0)[0] data_ds = frac0.copy() data_ds = data_ds.rename({mvar: 'fracforest'}) data_ds['fracgrassland'] = frac1[mvar] data_ds['fracirrNonPaddy'] = frac0[mvar] data_ds['fracirrPaddy'] = frac0[mvar] data_ds['fracsealed'] = frac0[mvar] data_ds['fracwater'] = frac0[mvar] data_ds = data_ds.expand_dims(dim = {'time': pd.date_range(start = f"{first_year}-01-01", periods = last_year - first_year + 1, freq = 'YE') }, axis = 0) data_ds = geo.georef(data_ds, crs = data_ds.rio.crs) data_ds, _ = geo.standard_fill_value(data_ds) for var in geo.main_var(data_ds): data_ds[var].attrs['long_name'] = var return data_ds def dummy_dzRel(base, out_path): array = geo.dummy_input(base, 1) mvar = geo.main_var(array)[0] data_ds = xr.Dataset() for n in [0, 1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]: var = f'dzRel{n:04}' data_ds[var] = array[mvar] data_ds[var].attrs['long_name'] = var data_ds = geo.georef(data_ds, crs = data_ds.rio.crs) data_ds, _ = geo.standard_fill_value(data_ds) geo.export(data_ds, Path(out_path)/"dzRel_dummy.nc") return data_ds def dummy_lakeres(base, out_path): # array = geo.dummy_input(base, 1) # mvar = geo.main_var(array)[0] print("Warning: The input `base` should necessary be the mask") data_ds = geo.dummy_input(base, 0) mvar = geo.main_var(data_ds)[0] xvar, yvar = geo.main_space_dims(data_ds)[0] stack_da = data_ds[mvar].stack(xy = [xvar, yvar]) # Determine the coordinates of the first not-NaN cell x, y = stack_da[stack_da.notnull()].xy[0].item() data_ds[mvar].loc[{xvar: x, yvar: y}] = 1 data_ds = geo.georef(data_ds, crs = data_ds.rio.crs) data_ds, _ = geo.standard_fill_value(data_ds) geo.export(data_ds, Path(out_path)/"lakeres_dummy.nc") return data_ds def dummy_routing(base, out_path): ds_dict = {} ds_dict['chanMan'] = geo.dummy_input(base, 0.3) ds_dict['chanGrad'] = geo.dummy_input(base, 0.02) ds_dict['chanGradMin'] = geo.dummy_input(base, 0.0001) ds_dict['chanLength'] = geo.dummy_input(base, 1400) ds_dict['chanWidth'] = geo.dummy_input(base, 17) ds_dict['chanDepth'] = geo.dummy_input(base, 1) for var in ds_dict: ds_dict[var] = geo.georef(ds_dict[var], crs = ds_dict[var].rio.crs) ds_dict[var], _ = geo.standard_fill_value(ds_dict[var]) geo.export(ds_dict[var], Path(out_path)/f"{var}_dummy.nc") return ds_dict