# -*- coding: utf-8 -*-
"""
Created on Tue Sep 17 16:58:10 2024
@author: Alexandre Kenshilik Coche
@contact: alexandre.co@hotmail.fr
PAGAIE_interface est une interface regroupant les fonctions de geohydroconvert
pertinentes pour géotraiter les données géographiques dans le cadre de la
méthodologie Eau et Territoire (https://eau-et-territoire.org ).
A la difference de geohydroconvert, trajectoire_toolbox propose une sélection
des fonctions les plus utiles de geohydroconvert, au sein d'une interface
traduite en Français.
"""
#%% IMPORTATIONS
import os
import re
from functools import wraps
from pathlib import Path
import datetime
from shapely.geometry import Polygon
import xarray as xr
import pandas as pd
from pandas.api.types import is_numeric_dtype
import numpy as np
xr.set_options(keep_attrs = True)
from rosetta import rosetta, SoilData
from geop4th import geobricks as geo
from geop4th.workflows.standardize import standardize_fr as stzfr
from geop4th.graphics import (
ncplot as ncp,
cmapgenerator as cmg,
)
#%% A DEPLACER !!!
# =============================================================================
# @formating
# def era5(data, *,
# mask=None,
# bounds=None,
# resolution=None,
# x0=None,
# y0=None,
# base_template=None,
# **rio_kwargs,
# ):
# # ERA5
# if data_type.casefold().replace(' ', '').replace('-', '') in [
# "era5", "era5land", "era"]:
# data_folder, filelist = liste_fichiers(data, extension = '.nc')
#
# # outpath = outpath[:-3] # to remove '.nc'
# outpath = os.path.splitext(filelist[0])[0]
# outpath = os.path.join(data_folder, outpath)
# # Corriger les données
# ds = convertir_cwatm([os.path.join(data_folder, f) for f in filelist], data_type = "ERA5")
# # Géoréférencer les données
# ds = georeferencer(data = ds, data_type = "other", crs = 4326)
# # Compression avec pertes (packing)
# ds_comp = compresser(ds)
# exporter(ds_comp, outpath + '_pack' + '.nc')
# # Reprojections et exports
# # rio_kwargs['dst_crs'] = 2154
# for res in resolution:
# if res is None:
# res_suffix = ''
# if 'resolution' in rio_kwargs:
# rio_kwargs.pop('resolution')
# else:
# res_suffix = f'_{res}m'
# rio_kwargs['resolution'] = res
# n_mask = 0
# for m in mask:
# n_mask += 1
# ds_rprj = reprojeter(data = ds, mask = m, bounds = bounds,
# x0 = x0, y0 = y0, base_template = base_template,
# **rio_kwargs)
# exporter(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc')
# # exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs...
#
# del ds_rprj
# del ds
# del ds_comp
# =============================================================================
#%% DECORATOR
def formating(func):
@wraps(func)
def wrapper(data, **kwargs):
# ---- Formating `resolution`
if 'resolution' not in kwargs:
kwargs['resolution'] = None
if isinstance(kwargs['resolution'], int) or (kwargs['resolution'] is None):
kwargs['resolution'] = [kwargs['resolution']] # it can therefore be a list of tuples or a list of lists
# ---- Formating `mask` and `bounds`
# `mask` and `bounds` arguments can not be passed together
# To homogeneize cases, `bounds` will eventually be converted to `mask`
if 'mask' not in kwargs:
kwargs['mask'] = None
if 'bounds' in kwargs:
# `bounds` is converted to `mask`
kwargs['mask'] = Polygon([
(kwargs['bounds'][0], kwargs['bounds'][1]),
(kwargs['bounds'][2], kwargs['bounds'][1]),
(kwargs['bounds'][2], kwargs['bounds'][3]),
(kwargs['bounds'][0], kwargs['bounds'][3]),
(kwargs['bounds'][0], kwargs['bounds'][1]),
])
kwargs['bounds'] = None
else: # 'bounds' not in kwargs:
kwargs['bounds'] = None
else: # 'mask' in kwargs
if 'bounds' in kwargs:
print("Warning: the arguments `mask` and `bounds` are not intended to be passed together. Only `mask` will be used.")
kwargs['bounds'] = None
if not isinstance(kwargs['mask'], list):
kwargs['mask'] = [kwargs['mask']]
# ---- Formating `rundate`
kwargs['rundate'] = datetime.datetime.now().strftime("%Y-%m-%d_%Hh%M")
return func(data, **kwargs)
return wrapper
#%% FILE MANAGEMENT
def infer_outpath(path):
# Safeguard
try:
path = Path(path)
except TypeError as err:
print(f"TypeError: `path` should be a valid path\n{err}\n")
raise
if path.is_file():
stz_pattern = re.compile(".*stz.*")
if (len(stz_pattern.findall(path.stem)) == 0) & (len(stz_pattern.findall(path.parent.stem)) == 0):
print("Warning: make sure that the file defined by `path` has been standardized with geop4th standardize functions")
outpath = path
elif path.is_dir():
_, filelist = geo.get_filelist(path, tag = 'stz')
if len(filelist) == 0:
print(f"Warning: `data` was passed as a folder, but no standardized files have been identified in this folder. Please consider using geop4th standardize functions beforehand. `outpath` is inferred from the first file: {filelist[0]}")
_, filelist_all = geo.get_filelist(path)
outpath = path / filelist_all[0]
elif len(filelist) > 1:
print(f"Warning: Several files were detected as standardized in the `data` folder. The outpath is inferred from the first file: {filelist[0]}")
outpath = path / filelist[0]
else:
outpath = path / filelist[0]
return outpath
#%% PORTAL FOR FORMATING
def format_data(data,
data_name, *,
mask=None,
bounds=None,
resolution=None,
x0=None,
y0=None,
base_template=None,
**rio_kwargs,
):
# ---- Données topographiques
#%%% BD ALTI 25m
if data_name.casefold().replace(' ', '') in ['bdalti', 'alti', 'ignalti']:
return bdalti(data)
# ---- Données climatiques
#%%% DRIAS-Climat 2022 (netcdf)
# Données SAFRAN (tas, prtot, rayonnement, vent... ) "EXPLORE2-Climat 2022"
elif data_name.casefold().replace(' ', '').replace('-', '') in [
'drias2022', 'climat2022', 'driasclimat2022']:
return explore2climat(data)
#%%% DRIAS-Eau 2024 (netcdf)
# Indicateurs SAFRAN (SWIAV, SSWI-3...) "EXPLORE2-SIM2 2024"
elif data_name.casefold().replace(' ', '').replace('-', '') in [
'sim2024', 'indicateursim2024', 'indicateurssim2024',
'driaseau2024', 'indicateurdriaseau2024',
'indicateursdriaseau2024']:
return explore2eau(data)
#%%% SIM2 (csv to netCDF)
# Réanalyse historique climatique Safran-Isba(-Modcou) "SIM2"
elif data_name.casefold().replace(' ', '').replace('-', '') in [
'sim2', 'sim', 'safranisba', 'safranisbamodcou',
]:
return sim2(data)
# ---- Données d'usage de l'eau
#%%% BNPE
elif data_name.casefold().replace(' ', '').replace('-', '') in [
"bnpe"]:
return bnpe(data)
# ---- Données d'occupation des sols
#%%% OCS SCOT Pays Basque
elif data_name.casefold().replace(' ', '').replace('-', '') in [
"ocs", "ocspbs", "ocspaysbasque"]:
return ocspaysbasque(data)
#%% FUNCTIONS FOR FORMATING
#%%% Topographic data
[docs]
@formating
def dem(data, *,
mask=None,
bounds=None,
resolution=None,
x0=None,
y0=None,
base_template=None,
outpath = None,
**rio_kwargs,
):
r"""
Format standardized DEM data for CWatM:
- reproject and clip them into passed ``resolution``, ``mask``, ``dst_crs``, ``x0``, ``y0`` ...
- generate matching maps of flow directions
- generate matching maps of standard deviations of elevation
Parameters
----------
data : path (str or pathlib.Path), or variable (numpy.ndarray, xarray.Dataset or xarray.DataArray)
MNT Dataset retrieved from IGN "ALTI" database
https://geoservices.ign.fr/bdalti
mask : (list of) str, pathlib.Path, shapely.geometry, xarray.DataArray or geopandas.GeoDataFrame, optional
Filepath of mask used to clip the data.
bounds : iterable or None, optional, default None
Boundaries of the target domain as a tuple (x_min, y_min, x_max, y_max).
resolution : (list of) int, optional
List of resolutions to use for data formating.
x0: number, optional, default None
Origin of the X-axis, used to align the reprojection grid.
y0: number, optional, default None
Origin of the Y-axis, used to align the reprojection grid.
base_template : str, pathlib.Path, xarray.DataArray or geopandas.GeoDataFrame, optional
Filepath, used as a template for spatial profile. Supported file formats
are *.tif*, *.nc* and vector formats supported by geopandas (*.shp*, *.json*, ...).
outpath : str, optional
Outpath stem when ``data`` is passed as a GEOP4TH variable instead of a filepath.
**rio_kwargs : keyword args, optional
Argument passed to the ``xarray.Dataset.rio.reproject()`` function call.
**Note**: These arguments are prioritary over ``base_template`` attributes.
May contain:
- ``dst_crs`` : str
- ``resolution`` : float or tuple
- ``shape`` : tuple (int, int)
- ``transform`` : Affine
- ``nodata`` : float or None
- ``resampling`` :
- see ``help(rasterio.enums.Resampling)``
- most common are: ``5`` (average), ``13`` (sum), ``0`` (nearest),
``9`` (min), ``8`` (max), ``1`` (bilinear), ``2`` (cubic)...
- the functionality ``'std'`` (standard deviation) is also available
- see ``help(xarray.Dataset.rio.reproject)``
Returns
-------
None. Generate the formated files, with filepaths derived from ``data`` or from
``outpath``.
Examples
---------
>>> cwatm.dem(
r"E:\Inputs\DEM\IGN\BDALTIV2_folder\BDALTIV2_stz.tif",
resolution = [500, 1000],
x0 = 0,
y0 = 0,
dst_crs = 27572,
mask = r"E:\Inputs\masks\MyMask.shp"
)
Exporting...
_ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\BDALTIV2_stz_CWATM_1000m_mask1.tif'
Exporting...
_ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\ElevationStD_BDALTIV2_stz_CWATM_1000m_mask1.tif'
Exporting...
_ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\BDALTIV2_stz_CWATM_500m_mask1.tif'
Exporting...
_ Success: The data has been exported to the file 'E:\Inputs\DEM\IGN\BDALTIV2\ElevationStD_BDALTIV2_stz_CWATM_500m_mask1.tif'
"""
# ---- Load
# Initialize the output filepath(s)
if isinstance(data, xr.Dataset):
if outpath is None:
print("Error: if `data` is a xarray.Dataset, the argument `outpath` needs to be passed")
return
else:
outpath_stem = Path(outpath)
elif isinstance(data, (str, Path)):
outpath_stem = infer_outpath(data)
data = outpath_stem
outpath = outpath_stem.with_suffix('.tif').with_stem(outpath_stem.stem + '_CWATM')
# Load
data_ds = geo.load_any(data)
# Impose reprojection resampling to be the minimum value
rio_kwargs['resampling'] = 9, # 9: minimum
# ---- Replace 0 (sea level) with nodata
main_var = geo.main_vars(data_ds)[0]
encod = data_ds[main_var].encoding # store encodings (_FillValue, compression...)
data_ds[main_var] = data_ds[main_var].where(data_ds[main_var] != 0)
data_ds[main_var].encoding = encod # transfer encodings
# ---- Reproject
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
rio_kwargs['resolution'] = res
n_mask = 0
for m in mask:
n_mask += 1
ds_rprj = geo.reproject(data_ds, mask = m, bounds = bounds,
x0 = x0, y0 = y0, base_template = base_template,
**rio_kwargs) # x0 = 12.5, y0 = 12.5
geo.export(ds_rprj,
outpath.with_stem(outpath.stem + res_suffix + f'_mask{n_mask}')
)
# ---- Flow directions
ldd = geo.compute_ldd(ds_rprj, dirmap = '1-9', engine = 'pysheds')
geo.export(ldd,
outpath.with_stem('LDD_' + outpath.stem + res_suffix + f'_mask{n_mask}')
)
# ---- Cell area
area = geo.cell_area(ds_rprj, engine = 'equal earth')
geo.export(area,
outpath.with_stem('Area_' + outpath.stem + res_suffix + f'_mask{n_mask}')
)
# ---- Standard deviation on each cell
if res is not None:
# impose a resampling according to the standard deviation
if 'resampling' in rio_kwargs:
rio_kwargs.pop('resampling')
elevstd = geo.reproject(data_ds, mask = m, bounds = bounds,
x0 = x0, y0 = y0, base_template = base_template,
resampling = 'std', # standard deviation
**rio_kwargs)
geo.export(elevstd, outpath.with_stem('ElevationStD_' + outpath.stem + res_suffix + f'_mask{n_mask}'))
else:
print("Err: Standard Deviation can only be computed if there is a downscaling. A resolution argument should be passed")
#%%% Climatic data
@formating
def explore2climat(data, *,
mask=None,
bounds=None,
resolution=None,
x0=None,
y0=None,
base_template=None,
**rio_kwargs,
):
# DRIAS-Climat 2022 (netcdf)
# Données SAFRAN (tas, prtot, rayonnement, vent... ) "EXPLORE2-Climat 2022"
data_folder, filelist = geo.get_filelist(data, extension = '.nc')
for file_name in filelist:
data = os.path.join(data_folder, file_name)
# Raccourcir le nom
motif1 = re.compile('(.*Adjust_France)')
motif2 = re.compile('(historical|rcp25|rcp45|rcp85)')
motif3 = re.compile('(\d{4,4}-\d{4,4}.*)')
str1 = motif1.split(file_name)[1][:-7]
str2 = motif2.split(file_name)[1]
str3 = motif3.split(file_name)[1]
outpath = '_'.join([str1, str2, str3])
# outpath = outpath[:-3] # to remove '.nc'
outpath = os.path.splitext(outpath)[0]
outpath = os.path.join(data_folder, outpath)
# Corriger les données
ds = stzfr.explore2climat(data)
# Géoréférencer les données
ds = geo.georef(data = ds, data_type = 'drias2022')
# Compression avec pertes (packing)
ds_comp = geo.pack(ds)
geo.export(ds_comp, outpath + '_pack' + '.nc')
# Reprojections et exports
# rio_kwargs['dst_crs'] = 2154
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
rio_kwargs['resolution'] = res
n_mask = 0
for m in mask:
n_mask += 1
ds_rprj = geo.reproject(ds, mask = m, bounds = bounds,
x0 = x0, y0 = y0, base_template = base_template,
**rio_kwargs)
geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc')
# exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs...
del ds_rprj
del ds
del ds_comp
@formating
def explore2eau(data, *,
mask=None,
bounds=None,
resolution=None,
x0=None,
y0=None,
base_template=None,
**rio_kwargs,
):
# DRIAS-Eau 2024 (netcdf)
# Indicateurs SAFRAN (SWIAV, SSWI-3...) "EXPLORE2-SIM2 2024"
data_folder, filelist = geo.get_filelist(data, extension = '.nc')
for file_name in filelist:
data = os.path.join(data_folder, file_name)
# Raccourcir le nom
motif1 = re.compile('(.*_France)')
motif2 = re.compile('(historical|rcp25|rcp45|rcp85)')
motif3 = re.compile('(\d{4,4}-\d{4,4}.*)')
str1 = motif1.split(file_name)[1][:-7]
str2 = motif2.split(file_name)[1]
str3 = motif3.split(file_name)[1]
outpath = '_'.join([str1, str2, str3])
# outpath = outpath[:-3] # to remove '.nc'
outpath = os.path.splitext(outpath)[0]
outpath = os.path.join(data_folder, outpath)
# Corriger les données
ds = stzfr.explore2eau(data)
# Géoréférencer les données
ds = geo.georef(data = ds, data_type = "DRIAS-Eau 2024")
# Compression avec pertes (packing)
ds_comp = geo.pack(ds)
geo.export(ds_comp, outpath + '_pack' + '.nc')
# Reprojections et exports
# rio_kwargs['dst_crs'] = 2154
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
rio_kwargs['resolution'] = res
n_mask = 0
for m in mask:
n_mask += 1
ds_rprj = geo.reproject(data = ds, mask = m, bounds = bounds,
x0 = x0, y0 = y0, base_template = base_template,
**rio_kwargs)
geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc')
# exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs...
del ds_rprj
del ds
del ds_comp
[docs]
@formating
def meteo(data, *,
mask=None,
bounds=None,
resolution=None,
x0=None,
y0=None,
base_template=None,
**rio_kwargs,
):
# SIM2 (csv to netCDF)
# Réanalyse historique climatique Safran-Isba(-Modcou) "SIM2"
# Load data
# =============================================================================
# sim2_pattern = re.compile("_SIM2_(.*)")
# =============================================================================
if isinstance(data, list):
data_ds = geo.merge_data(data)
if os.path.isdir(data[0]):
outpath = data
# =============================================================================
# suffix = datetime.datetime.now().strftime("%Y-%m-%d_%Hh%M")
# =============================================================================
elif os.path.isfile(data[0]):
outpath = os.path.split(data)[0]
# =============================================================================
# suffix = sim2_pattern.findall(os.path.splitext(os.path.split(data)[-1])[0])[0]
# =============================================================================
elif isinstance(data, (str, Path)):
if os.path.isfile(data):
data_ds = geo.load_any(data, sep = ';')
outpath = os.path.split(data)[0]
# =============================================================================
# suffix = sim2_pattern.findall(os.path.splitext(os.path.split(data)[-1])[0])[0]
# =============================================================================
elif os.path.isdir(data):
# Recherche en 1er les fichiers .csv
outpath, filelist = geo.get_filelist(data, extension = '.csv')
if len(filelist) > 0:
data_ds = geo.merge_data(os.path.join(outpath, filelist))
# Si aucun, recherche en 2e les fichiers .nc (1 fichier par variable)
else:
data_ds = xr.Dataset()
for var in ['PRENEI_Q', 'PRELIQ_Q', 'PRETOT_Q', 'T_Q', 'TINF_H_Q',
'TSUP_H_Q', 'ETP_Q', 'Q_Q', 'FF_Q', 'DLI_Q', 'SSI_Q', 'HU_Q']:
outpath, filelist = geo.get_filelist(data, extension = '.nc', tag = var)
if (filelist) > 0:
ds_temp = geo.merge(os.path.join(outpath, filelist))
data_ds[var] = ds_temp[var]
else:
data_ds = geo.load_any(data)
if isinstance(data_ds, pd.DataFrame):
# data_ds = geo.convert_to_cwatm(data_ds, data_type = "SIM2")
data_ds = stzfr.sim2(data_ds)
# Géoréférencer les données
data_ds = geo.georef(data_ds)
# Préparation outpath
time_var = geo.main_time_dims(data_ds)[0]
print('---')
print(data_ds[time_var].min())
start_year = pd.to_datetime(data_ds[time_var].min().values).strftime("%Y-%m")
end_year = pd.to_datetime(data_ds[time_var].max().values).strftime("%Y-%m")
var_list = geo.main_vars(data_ds)
for var in var_list:
var_ds = data_ds[[var]].copy()
# Reprojections et exports
# rio_kwargs['dst_crs'] = 2154
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
rio_kwargs['resolution'] = res
n_mask = 0
for m in mask:
n_mask += 1
ds_rprj = geo.reproject(data = var_ds, mask = m, bounds = bounds,
x0 = x0, y0 = y0, base_template = base_template,
**rio_kwargs)
geo.export(ds_rprj, os.path.join(
outpath, '_'.join([var, start_year, end_year]) + res_suffix + f'_mask{n_mask}.nc')
)
#%%% Landcover data
[docs]
@formating
def landcover(data, *,
year = None,
sep = '|',
decimal = '.',
csv_date_format = "%y-%m-%d",
mask = None,
bounds = None,
resolution = None,
x0 = None,
y0 = None,
base_template = None,
outpath = None,
polygon_file = None,
**rio_kwargs,
):
"""
This function process a guideline file, typically a tabular CSV file, in which are
described where and how to process the data to create CWatM landcover parameters.
Therefore, besides this guideline file, ``cwatm.landcover()`` will also need a vector
file with landcover polygons (or a raster file with area values), and some
tabular CSV files containing cropCoefficient and interceptCap timeseries
during one year (these timeseries can be parse, they do not need to be complete nor regular).
With these elements, this function generates the following files:
- at the root (*cwatm/* folder):
- '<guideline_filename>_area_<year>_<mask>_<resolution>.nc':
raster file with, in each cell, the area of each landcover described
in the guideline file
- '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc':
a oen-year netCDF with the ``cropgroupnumber`` values in each cell
- '<guideline_filename>_fracLandcover_<year>_<mask>_<resolution>.nc':
a raster file with the fraction of CWatM landcovers in each cell
- in landcover sub-folders (*grassland/*, *forest/*, *irrNonPaddy/*, *irrPaddy/*):
- '<guideline_filename>_cropCoefficient_<year>_<mask>_<resolution>.nc'
- '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc'
- '<guideline_filename>_interceptCap_<year>_<mask>_<resolution>.nc'
- '<guideline_filename>_maxRootDepth_<year>_<mask>_<resolution>.nc'
Nested use
^^^^^^^^^^
For each `data`, this function creates netCDF files in the root folder and in
the subfolders of each 6 CWatM landcover. It can be used in a nested way:
this function can be called with `data` guidelines relative to a sub-landcover
(for example a 'crop.csv' `data` defining 'maize' and 'wheat'), and the generated files
(here for example .\cwatm\grassland\crop_maxRootDepth.nc) can be used as
arguments in the guidelines of a 'final.csv' `data` (which defines e.g.
'wetland', 'pasture', 'crop' ... and uses the netCDF above for 'crop' values).
In this example, the 'crop.csv' call will generates netCDF files in
CWatM landcover subfolders ('forest', 'grassland', 'irrNonPaddy', ...), which
will be used as arguments in 'final.csv', but also some netCDF files in the
root folder, such as 'crop_fracLandcover.nc' or 'crop_cropgroupnumber.nc'.
These root files are useless, as they are intended to give values weighted
by CWatM landcovers, but the 'crop.csv' `data` does not include all CWatM
landcovers. The files that should be used in CWatM are 'final_fracLandcover.nc' etc.
in the root folder, and 'final_maxRootDepth.nc' etc. in the 6 landcover subfolders.
Parameters
----------
data : pandas.DataFrame, or str or pathlib.Path
Tabular files listing the rules to differentiate landcovers and to
compute CWatM landcover parameters.
mask : str, Path, shapely.geometry, xarray.DataArray or geopandas.GeoDataFrame, optional
Filepath of mask used to clip the data.
bounds : iterable or None, optional, default None
Boundaries of the target domain as a tuple (x_min, y_min, x_max, y_max).
The values are expected to be given according to ``bounds_crs`` if it is not None.
If ``bounds_crs`` is None, ``bounds`` are expected to be given according to the destination
CRS ``dst_crs`` if it is not None. It ``dst_crs`` is also None, ``bounds`` are then
expected to be given according to the source CRS (``src_crs`` of ``data``'s CRS).
resolution : float or tuple, optional
DESCRIPTION. The default is None.
x0: number, optional
Origin of the X-axis, used to align the reprojection grid.
y0: number, optional
Origin of the Y-axis, used to align the reprojection grid.
base_template : str, Path, xarray.DataArray or geopandas.GeoDataFrame, optional
Filepath, used as a template for spatial profile. Supported file formats
are *.tif*, *.nc* and vector formats supported by geopandas (*.shp*, *.json*, ...).
outpath : str or pathlib.Path, optional
Required when ``data`` is not a filepath. Define the location where the
outputs will be exported (inside a *cwatm/* sub-folder). If ``outpath`` is None
and ``data`` is a filepath, the outputs will be exported at the root of ``data``
(inside a *cwatm/* sub-folder).
**rio_kwargs : keyword args, optional
Argument passed to the ``xarray.Dataset.rio.reproject()`` function call.
**Note**: These arguments are prioritary over ``base_template`` attributes.
May contain:
- ``dst_crs`` : str
- ``resolution`` : float or tuple
- ``shape`` : tuple (int, int) of (height, width)
- ``transform`` : Affine
- ``nodata`` : float or None
- ``resampling`` :
- see ``help(rasterio.enums.Resampling)``
- most common are: ``5`` (average), ``13`` (sum), ``0`` (nearest),
``9`` (min), ``8`` (max), ``1`` (bilinear), ``2`` (cubic)...
- the functionality ``'std'`` (standard deviation) is also available
- see ``help(xarray.Dataset.rio.reproject)``
Returns
-------
Generates the landcover files required by CWatM:
- at the root (*cwatm/* folder):
- '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc':
a oen-year netCDF with the ``cropgroupnumber`` values in each cell
- '<guideline_filename>_fracLandcover_<year>_<mask>_<resolution>.nc':
a raster file with the fraction of CWatM landcovers in each cell
- '<guideline_filename>_area_<year>_<mask>_<resolution>.nc':
raster file with, in each cell, the area of each landcover described
in the guideline file (not required for CWatM, but usefull for other
inputs formatting)
- in landcover sub-folders (*grassland/*, *forest/*, *irrNonPaddy/*, *irrPaddy/*):
- '<guideline_filename>_cropCoefficient_<year>_<mask>_<resolution>.nc'
- '<guideline_filename>_cropgroupnumber_<year>_<mask>_<resolution>.nc'
- '<guideline_filename>_interceptCap_<year>_<mask>_<resolution>.nc'
- '<guideline_filename>_maxRootDepth_<year>_<mask>_<resolution>.nc'
"""
# ---- Initializations
landcover_list = ['forest', 'grassland', 'irrPaddy', 'irrNonPaddy', 'sealed', 'water']
guidelines = geo.load_any(data,
sep = sep,
decimal = decimal,
header = 0,
index_col = 0)
year_arg = year # backup of `year` argument
if year is None:
year = datetime.datetime.now().year # to have a consistent time axis
# ---- Folders
if isinstance(data, (str, Path)):
data = Path(data)
outpath = data.parent / 'cwatm'
outname = data.stem
else:
if outpath is None:
print("Err: `outpath` argument is required when `data` is not a file")
return
else:
outname = 'landcover'
for var in ['area', 'cropgroupnumber', 'fracLandcover']:
if not (outpath / var).exists():
os.makedirs(outpath / var)
for landcover in landcover_list:
if not (outpath / landcover).exists():
os.makedirs(outpath / landcover)
for var in ['cropCoefficient', 'cropgroupnumber', 'interceptCap', 'maxRootDepth']:
if not (outpath / landcover / var).exists():
os.makedirs(outpath / landcover / var)
# ---- Rasterize
area_ds = xr.Dataset()
load_dict = {}
#-------------------------
### 1} For each resolution
#-------------------------
for res in resolution:
if res is None:
print('Err: the `resolution` argument is mandatory')
return
n_mask = 0
#-------------------
### 2} For each mask
#-------------------
for m in mask:
n_mask += 1
areaLandcover = xr.Dataset({'forest': 0,
'grassland': 0,
'irrPaddy': 0,
'irrNonPaddy': 0,
'sealed': 0,
'water': 0})
cropgroupnumber = areaLandcover.copy(deep = True)
print(f"\nresolution {res} m | mask {n_mask} | area & fracLandcover")
#--------------------------------------------------
### 3} For each row (= rule) in `data` (guidelines)
#--------------------------------------------------
for idx in guidelines.index:
# to avoid loading the same file several times
if guidelines.loc[idx, 'polygon_file'] in load_dict:
# use the previously loaded file
vect_ds = load_dict[guidelines.loc[idx, 'polygon_file']]
else:
if polygon_file is None:
# load the file...
polyg_path = Path(guidelines.loc[idx, 'polygon_file'])
if not polyg_path.is_absolute(): polyg_path = (data.parent / polyg_path).resolve()
vect_ds = geo.load_any(polyg_path)
# ...and then store it, for potential later use
load_dict[guidelines.loc[idx, 'polygon_file']] = vect_ds.copy()
else:
vect_ds = geo.load_any(polygon_file)
load_dict[guidelines.loc[idx, 'polygon_file']] = vect_ds.copy()
if isinstance(vect_ds, pd.DataFrame):
# Interpreting the condition (building the selection)
cond = guidelines.loc[idx, 'column_name: code']
ini = True
if 'or' in cond:
cond_list = cond.split(' or ')
else:
cond_list = [cond]
for subcond in cond_list:
if ' in ' in subcond:
in_pattern = re.compile("(.*)( in )[\(\)\[\]](.*)[\(\)\[\]]")
col_name, _, val = in_pattern.findall(subcond)[0]
val = val.replace(' ', '').split(',')
else:
col_name, val = subcond.replace('=', ':').replace(' ', '').split(':')
val = [val]
# Correct the dtypes
if is_numeric_dtype(vect_ds[col_name]):
val = [float(v) for v in val]
# Extracting rows
subsel = vect_ds[vect_ds[col_name] == val[0]].index
# Safeguard for mixed dtypes
if len(subsel) == 0:
try: subsel = vect_ds[vect_ds[col_name] == float(val[0])].index
except: pass
if len(val) > 1:
for v in val:
add_subsel = vect_ds[vect_ds[col_name] == v].index
# Safeguard for mixed dtypes
if len(add_subsel) == 0:
try: add_subsel = vect_ds[vect_ds[col_name] == float(v)].index
except: pass
# Merge the indexes
subsel = subsel.union(add_subsel)
# Combining the subcond
if ini:
sel = subsel
ini = False
else:
sel = sel.union(subsel)
# Adding time filter
time_var = geo.main_time_dims(vect_ds)[0]
# if there is no time axis in `vect_ds` (polygon_file)
# or if `vect_ds` contains only one year, the year condition
# is ignored or useless, meaning that the index selection is
# kept as it is (all previously selected
# rows from `vect_ds` will be considered)
if time_var is None:
pass
elif len(vect_ds.indexes[time_var].year.unique()) <= 1:
# info
if year != vect_ds.indexes[time_var].year.unique()[0]:
print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the 'polygon_file'. Year condition is ignored.")
pass
# Otherwise, the year condition will be applied:
else:
# safeguard
if any(vect_ds[time_var].dt.year == year):
sel = sel.intersection(
vect_ds[vect_ds[time_var].dt.year == year].index)
else:
print(f"Warning: the specified `year` ({year_arg}) does not exist in 'polygon_file' {guidelines.loc[idx, 'polygon_file']}")
print("Which year from the 'polygon_file' should be used instead?")
replacement_year = input('- ' + '\n- '.join(vect_ds.indexes[time_var].year.astype(str).unique().values) + '\n')
sel = sel.intersection(
vect_ds[vect_ds[time_var].dt.year == replacement_year].index)
# Apply selection
vect_cond_ds = vect_ds.loc[sel, ['geometry']]
# =============================================================================
# vect_cond_ds[idx] = 1 # dummy numeric column for rasterization
# =============================================================================
# Safeguard
if len(vect_cond_ds) > 0:
# Rasterize data
area = geo.rasterize(
vect_cond_ds,
mask = m,
bounds = bounds,
resolution = res,
x0 = x0, y0 = y0,
base_template = base_template,
rasterize_mode = 'coverage',
force_polygon = True,
**rio_kwargs,
)
# Add the time axis
area = area.expand_dims({'time': [pd.to_datetime(f"{year}-01-01", format = "%Y-%m-%d")]})
# Group all `area[idx]` [xr.DataArray] into the same `area_ds` [xr.Dataset]
area_ds[idx] = area[geo.main_vars(area)[0]]
elif isinstance(vect_ds, xr.Dataset):
area_ds = vect_ds
if not set(guidelines.index) == set(geo.main_vars(area_ds)):
print("Err: `guidelines` and raster area file do not contain the same variables")
return
# Adding time filter
time_var = geo.main_time_dims(area_ds)[0]
# if there is no time axis in `vect_ds` (polygon_file)
# or if `vect_ds` contains only one year, the year condition
# is ignored or useless, meaning that the index selection is
# kept as it is (all previously selected
# rows from `vect_ds` will be considered)
if time_var is None:
pass
elif len(area_ds.indexes[time_var].year.unique()) <= 1:
# info
if year != vect_ds.indexes[time_var].year.unique()[0]:
print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the 'polygon_file'. Year condition is ignored.")
pass
# Otherwise, the year condition will be applied:
else:
# safeguard
if any(vect_ds[time_var].dt.year == year):
area_ds = area_ds.loc[{time_var: slice(str(year), str(year))}]
else:
print(f"Warnin: the specified `year` ({year_arg}) does not exist in 'polygon_file' {guidelines.loc[idx, 'polygon_file']}")
print("Which year from the 'polygon_file' should be used instead?")
replacement_year = input('- ' + '\n- '.join(vect_ds.indexes[time_var].year.astype(str).unique().values) + '\n')
area_ds = area_ds.loc[{time_var: slice(str(replacement_year), str(replacement_year))}]
# Complete the landcover fractions
areaLandcover[guidelines.loc[idx, 'cwatm_class']] = areaLandcover[guidelines.loc[idx, 'cwatm_class']] + area_ds[idx]
# ---- Export base results
area_ds = geo.georef(area_ds, crs = area_ds.rio.crs) # area_ds = geo.georef(area_ds)
area_ds = geo.cropfill(area_ds, m)
geo.export(area_ds, outpath / 'area' / (outname + '_area' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc'))
# ---- Export landcover fractions
areaLandcover = geo.georef(areaLandcover, crs = area_ds.rio.crs)
areaLandcover = geo.cropfill(areaLandcover, m)
fractionLandcover = areaLandcover.copy(deep = True)
for lc in fractionLandcover.data_vars:
fractionLandcover = fractionLandcover.rename({lc: 'frac' + lc})
fractionLandcover = fractionLandcover / fractionLandcover.to_array().sum("variable")
fractionLandcover = geo.georef(fractionLandcover, crs = area_ds.rio.crs)
geo.export(fractionLandcover, outpath / 'fracLandcover' / (outname + '_fracLandcover' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc'))
# ---- Combine parameters
param_list = ['maxRootDepth', 'rootFraction1', 'cropgroupnumber',
'arnoBeta', 'cropCoefficient', 'interceptCap']
#-----------------------------------------------------
### 3'} For each CWatM landcover class with vegetation
#-----------------------------------------------------
# forest, grassland, irrPaddy, irrNonPaddy
for cwatm_class in set(guidelines.cwatm_class.unique()) - {'sealed', 'water'}:
# Extract guidelines for each class (forest, grassland, irrPaddy, irrNonPaddy, sealed, water)
class_indexes = guidelines[guidelines.cwatm_class == cwatm_class].index
# Average parameters values according to area coverage, and group them into `class_param` xr.Dataset
class_param = xr.Dataset()
class_param_dynamic = xr.Dataset()
#-----------------------------------
### 4'} For each landcover parameter
#-----------------------------------
for param in set(param_list).intersection(set(guidelines.columns)):
print(f"\nresolution {res} m | mask {n_mask} | {param}")
ini_idx = True # `ini` is True for first iteration
#------------------------------------------------
### 5'} For each row (= rule) in this CWatM class
#------------------------------------------------
for idx in class_indexes:
# Retrieve parameter values
raw_val = guidelines.loc[idx, param]
# Safeguard for columns with mixed dtypes (numeric and paths)
try: raw_val = float(raw_val)
except: pass
if isinstance(raw_val, (int, float)):
if np.isnan(raw_val): raw_val = 0
val = raw_val
elif isinstance(raw_val, (str, Path)):
# load the file...
param_path = Path(raw_val)
if not param_path.is_absolute(): param_path = (data.parent / param_path).resolve()
val = geo.load_any(param_path,
# next arguments are only used for CSV data
sep = sep,
decimal = decimal,
header = 0,
index_col = 0,
date_format = csv_date_format)
# Adding time filter
if isinstance(val, xr.Dataset):
# Convert xr.Dataset into xr.DataArray
val = val[geo.main_vars(val)[0]]
param_time_var = geo.main_time_dims(val)[0]
# if there is no time axis in `val` or if `val`
# contains only one year, the `year` condition
# is ignored or useless
if param_time_var is None:
pass
elif len(val.indexes[param_time_var].year.unique()) <= 1:
# info
if year != val.indexes[param_time_var].year.unique()[0]:
print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the parameter file {param_path.name}. Year condition is ignored.")
val[param_time_var] = pd.to_datetime(val[param_time_var]) + pd.DateOffset(years = year - val[param_time_var][0].dt.year.item())
# Otherwise, the year condition will be applied:
else:
# safeguard
if any(val[param_time_var].dt.year == year):
val = val.loc[{param_time_var: slice(f"{year}-01-01", f"{year}-12-31")}]
else:
print(f"Warning: the specified `year` ({year_arg}) does not exist in {param_path.name}")
print("Which year should be used instead?")
replacement_year = input('- ' + '\n- '.join(val.indexes[param_time_var].year.astype(str).unique().values) + '\n')
val = val.loc[{param_time_var: slice(f"{replacement_year}-01-01", f"{replacement_year}-12-31")}]
# Standardize the date to first January of `year`
if param not in ['cropCoefficient', 'interceptCap']: # yearly values
val = val[[{param_time_var: 0}]] # Only a single date is kept
val[param_time_var] = pd.to_datetime(f"{year}-01-01") # this date is normalized to 1st January
elif isinstance(val, pd.DataFrame): # the time axis should be the index
# Format the time dimension
# Concatenate 3 arrays to be sure to have a full year starting from 1st January to 31st December
val = pd.concat([val, val.set_index(val.index + pd.DateOffset(years=1)), val.set_index(val.index + pd.DateOffset(years=2))])
ini_year = val.index[0].year
# Fill the gaps in the index
val = val.reindex(pd.date_range(start = val.index[0], end = val.index[-1], freq = '1D'))
val.index.name = 'time'
# Fill the gaps in the values, by interpolating according to the time axis
val = val.interpolate(method = 'time')
# Keep only a full year starting from 1st January to 31st December
full_year = val[(val.index.day == 1) & (val.index.month == 1)].index.year[0].item()
val = val.loc[f'{full_year}-01-01': f'{full_year}-12-31']
# Change first year to first year of initial data
val.index = val.index + pd.DateOffset(years = ini_year - val.index[0].year)
# Keep only one value "each 10 days" (or more exaclty, keep only 01st, 11st and 21st of each month)
val = val[(val.index.day == 1) | (val.index.day == 11) | (val.index.day == 21)]
# Export (for info, not necessary for CWatM)
if not (param_path.parent / '10day').exists():
os.makedirs(param_path.parent / '10day' )
geo.export(val, param_path.parent / '10day' / param_path.name,
sep = sep,
decimal = decimal,
date_format = csv_date_format)
# Convert pd.DataFrame into pd.Series
val = val[geo.main_vars(val)[0]]
# safeguard
param_time_var = val.index.name
if param_time_var != 'time':
print("Info: {raw_val} may not contain time as index")
# if `val` contains only one year, the `year`
# condition is ignored or useless
if len(val.index.year.unique()) <= 1:
# info
if year != val.index.year.unique()[0]:
print(f"Warning: the specified `year` ({year_arg}) does not match the (only) year in the parameter file {param_path.name}. Year condition is ignored.")
val.index = val.index + pd.DateOffset(years = year - val.index[0].year)
# Otherwise, the year condition will be applied:
else:
# safeguard
if any(val.index.year == year):
val = val[f"{year}-01-01":f"{year}-12-31"]
else:
print(f"Warning: the specified `year` ({year_arg}) does not exist in {param_path.name}")
print("Which year should be used instead?")
replacement_year = input('- ' + '\n- '.join(val.index.year.astype(str).unique().values) + '\n')
val = val[f"{replacement_year}-01-01":f"{replacement_year}-12-31"]
# Standardize the date to first January of `year`
if param not in ['cropCoefficient', 'interceptCap']: # yearly values
val = val.iloc[[0]] # Only a single date is kept
val.index = [pd.to_datetime(f"{year}-01-01")] # this date is normalized to 1st January
# Convert to xr.DataArray
val = xr.DataArray.from_series(val)
else: # raw_val is Nan
pass
# Compute area-weighted averages
if ini_idx:
if param in ['cropCoefficient', 'interceptCap']: # daily values
class_param_dynamic[param] = area_ds[idx].loc[{'time': f"{year}-01-01"}] * val # note how time dimension is dropped here
else: # yearly values
class_param[param] = area_ds[idx].loc[{'time': [f"{year}-01-01"]}] * val
sum_area = area_ds[idx].rename('area')
else:
if param in ['cropCoefficient', 'interceptCap']: # daily values
class_param_dynamic[param] = class_param_dynamic[param] + area_ds[idx].loc[{'time': f"{year}-01-01"}] * val # note how time dimension is dropped here
else: # yearly values
class_param[param] = class_param[param] + area_ds[idx].loc[{'time': [f"{year}-01-01"]}] * val
sum_area = sum_area + area_ds[idx].rename('area')
ini_idx = False # after first iteration, `ini` becomes False
# `sum_area`is reinitialized and computed for each param, but it is equal for each param
# Ideally it should be computed only once
# Normalize by sum of area to get the weighted average by coverage
class_param = class_param / sum_area
# class_param = class_param.where(sum_area != 0, np.nan) # replace Inf with 0
class_param_dynamic = class_param_dynamic / sum_area[{'time': 0}]
# class_param_dynamic = class_param_dynamic.where(sum_area[{'time': 0}] != 0, np.nan) # replace Inf with 0
# Georeference
class_param = geo.georef(class_param, crs = area_ds.rio.crs) # geo.georef(class_param)
class_param = geo.cropfill(class_param, m) # fill previous nan with mean value
class_param_dynamic = geo.georef(class_param_dynamic, crs = area_ds.rio.crs) # geo.georef(class_param)
class_param_dynamic = geo.cropfill(class_param_dynamic, m) # fill nan with mean value
# Complete cropgroupnumber (for global result weighted by landcover)
for param in set(param_list).intersection(set(guidelines.columns)):
if param == 'cropgroupnumber':
cropgroupnumber[cwatm_class] = class_param[param]
# ---- Export parameters
if param in ['cropCoefficient', 'interceptCap']:
geo.export(class_param_dynamic[[param]], outpath / cwatm_class / param / (outname + f'_{param}' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc'))
else: # 'cropgroupnumber', 'maxRootDepth'
geo.export(class_param[[param]], outpath / cwatm_class / param / (outname + f'_{param}' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc'))
# ---- Compute cropgroupnumber
# Weight cropnumber of each class with corresponding area
cropgroupnumber = cropgroupnumber * areaLandcover
# Divide by the sum of the area of all classes, in order to retrieve a
# cropgroupnumber weighted by the area fraction of the 4 classes WITH VEGETATION
# (it is different from fractionLandcover that takes into account the 6 landcovers)
cropgroupnumber = cropgroupnumber / areaLandcover[['forest', 'grassland', 'irrPaddy', 'irrNonPaddy']].to_array().sum("variable")
cropgroupnumber_mean = xr.Dataset({'cropgroupnumber': cropgroupnumber.to_array().sum('variable')})
# Mask and georef
cropgroupnumber_mean = cropgroupnumber_mean.where(areaLandcover['forest'].notnull())
cropgroupnumber_mean = geo.georef(cropgroupnumber_mean, crs = area_ds.rio.crs)
geo.export(cropgroupnumber_mean, outpath / 'cropgroupnumber' / (outname + '_cropgroupnumber' + f'_{year_arg}' + f'_mask{n_mask}' + f'_{res}m' + '.nc'))
# TODO: to remove?
# Obsolete...
@formating
def ocspaysbasque(data, *,
mask=None,
bounds=None,
resolution=None,
x0=None,
y0=None,
base_template=None,
**rio_kwargs,
):
# OCS SCOT Pays Basque
data_folder, filelist = geo.get_filelist(data, extension = '.nc')
# outpath = outpath[:-3] # to remove '.nc'
outpath = os.path.splitext(filelist[0])[0]
outpath = os.path.join(data_folder, outpath)
# Corriger les données
ds = stzfr.ocspaysbasque([os.path.join(data_folder, f) for f in filelist])
# Géoréférencer les données
ds = geo.georef(data = ds, data_type = "other", crs = 4326)
# Compression avec pertes (packing)
ds_comp = geo.pack(ds)
geo.export(ds_comp, outpath + '_pack' + '.nc')
# Reprojections et exports
# rio_kwargs['dst_crs'] = 2154
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
rio_kwargs['resolution'] = res
n_mask = 0
for m in mask:
n_mask += 1
ds_rprj = geo.reproject(data = ds, mask = m, bounds = bounds,
x0 = x0, y0 = y0, base_template = base_template,
**rio_kwargs)
geo.export(ds_rprj, outpath + res_suffix + f'_mask{n_mask}' + '.nc')
# exporter(ds_rprj, outpath + f'_{rundate}' + res_suffix + '.nc') # Pas de rundate car noms des fichiers trop longs...
del ds_rprj
del ds
del ds_comp
#%%% Soil data
@formating
def soil(data, # vector_data,
tabular_data,
fractionLandcover_data,
landcover_folder = None,
maxRootDepth_forest = None,
maxRootDepth_grassland = None,
maxRootDepth_irrPaddy = None,
maxRootDepth_irrNonPaddy = None,
*, mask=None,
bounds=None,
resolution=None,
x0=None,
y0=None,
base_template=None,
outpath=None,
**rio_kwargs,
):
"""
Note: This function takes a lot of time (it can take 1h for a whole department).
Parameters
----------
data : TYPE
DESCRIPTION.
tabular_data : TYPE
DESCRIPTION.
fractionLandcover_data : TYPE
DESCRIPTION.
landcover_folder : TYPE, optional
DESCRIPTION. The default is None.
maxRootDepth_forest : TYPE, optional
DESCRIPTION. The default is None.
maxRootDepth_grassland : TYPE, optional
DESCRIPTION. The default is None.
maxRootDepth_irrPaddy : TYPE, optional
DESCRIPTION. The default is None.
maxRootDepth_irrNonPaddy : TYPE, optional
DESCRIPTION. The default is None.
* : TYPE
DESCRIPTION.
mask : TYPE, optional
DESCRIPTION. The default is None.
bounds : TYPE, optional
DESCRIPTION. The default is None.
resolution : TYPE, optional
DESCRIPTION. The default is None.
x0 : TYPE, optional
DESCRIPTION. The default is None.
y0 : TYPE, optional
DESCRIPTION. The default is None.
base_template : TYPE, optional
DESCRIPTION. The default is None.
outpath : TYPE, optional
DESCRIPTION. The default is None.
**rio_kwargs : TYPE
DESCRIPTION.
: TYPE
DESCRIPTION.
Returns
-------
None.
"""
# ---- Load data
if isinstance(data, (str, Path)):
data = Path(data)
outpath = data.parent / 'cwatm'
if not outpath.exists():
os.makedirs(outpath)
for lc in ['forest', 'general']:
if not (outpath / lc).exists():
os.makedirs(outpath / lc)
else:
if outpath is None:
print("Err: `outpath` argument is required when `data` is not a file")
return
vector_ds = geo.load_any(data)
tabular_ds = geo.load_any(tabular_data, sep = ';')
fractionLc_ds = geo.load_any(fractionLandcover_data)
x_var1, y_var1 = geo.main_space_dims(fractionLc_ds)[0]
#-------------------------
### 1} For each resolution
#-------------------------
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
n_mask = 0
#-------------------
### 2} For each mask
#-------------------
for m in mask:
n_mask += 1
vector_reprj = geo.reproject(vector_ds,
mask = m,
bounds = bounds,
resolution = res,
x0 = x0,
y0 = y0,
base_template = base_template,
**rio_kwargs)
# Loading maxRootDepth data, depending if user passed a folder or paths for each landcover
maxRootDepth_ds = xr.Dataset()
maxRootDepth_path_dict = {'forest': maxRootDepth_forest,
'grassland': maxRootDepth_grassland,
'irrNonPaddy': maxRootDepth_irrNonPaddy,
'irrPaddy': maxRootDepth_irrPaddy,
} # dict of file paths
## Case where user passes a folder
# folders have to have standard CWatM landcover names, and files inside
# should be named 'maxRootDepth.nc'
if landcover_folder is not None:
landcover_folder = Path(landcover_folder)
for lc in maxRootDepth_path_dict.keys():
if (landcover_folder / lc / 'maxRootDepth.nc').is_file():
temp = geo.load_any(landcover_folder / lc / 'maxRootDepth.nc')
main_var = geo.main_vars(temp)[0]
maxRootDepth_ds[lc] = temp[main_var]
## Case where user passes multiple file paths
else:
for lc in maxRootDepth_path_dict.keys():
if maxRootDepth_path_dict[lc] is not None:
if Path(maxRootDepth_path_dict[lc]).is_file():
temp = geo.load_any(maxRootDepth_path_dict[lc])
main_var = geo.main_vars(temp)[0]
maxRootDepth_ds[lc] = temp[main_var]
# Remove the time axis
t_var2 = geo.main_time_dims(maxRootDepth_ds)[0]
if t_var2 is not None:
maxRootDepth_ds = maxRootDepth_ds[{t_var2: 0}].drop_vars(t_var2)
# Compute the average over all landcovers that have common parameters
# 'forest' stands aside, 'grassland' averages 'grassland', 'irrNonPaddy' and 'irrPaddy'
maxRootDepth_ds['general'] = maxRootDepth_ds[set(maxRootDepth_ds.data_vars) - {'forest'}].to_array().mean('variable')
# maxRootDepth_ds = maxRootDepth_ds.drop_vars(['irrNonPaddy', 'irrPaddy'])
maxRootDepth_ds = maxRootDepth_ds[['forest', 'general']]
# Clean cut of maxRootDepth
# =============================================================================
# maxRootDepth_ds = maxRootDepth_ds.where(geo.load_any(m))
# =============================================================================
x_var2, y_var2 = geo.main_space_dims(maxRootDepth_ds)[0]
# generate a raster dataset containing the area covered by soil UCS polygons on each cell
vector_area = vector_reprj[['geometry']].copy()
vector_area.loc[:, 'area'] = 1
raster_area = geo.reproject(vector_area,
mask = m,
bounds = bounds,
resolution = res,
x0 = x0,
y0 = y0,
base_template = base_template,
rasterize = True,
rasterize_mode = {'area': 'coverage'},
**rio_kwargs)
raster_area = raster_area.rename({'area = 1': 'area'})
# Columns to use for ROSETTA transfer functions
# (only columns present in data will be used)
col_to_extract = ['TAUX ARGILE_val_mod', # [g/kg] --> needs to divide by 10 to convert to %
'TAUX LIMON_val_mod', # [g/kg]
'TAUX SABLE_val_mod', # [g/kg]
'DENS_APPAR_val_mod', # [g/cm3]
'HUMID_PF42_val_mod', # [g/kg] water moisture at permanent wilting point (-1500 kPa)
# --> needs to be divided by 1000 to convert to cm3/cm3
'HUMID_PF2_val_mod', # [g/kg] water moisture at field capacity (-33 kPa)
]
var_list = list(set(col_to_extract).intersection(tabular_ds.columns))
# ---- Create a vector grid
shape = raster_area.rio.shape # height, width
x_res, _, x_min, _, y_res, y_max, _, _, _ = raster_area.rio.transform()
grid_gdf, xy_coords = geo.vector_grid(height = shape[0],
width = shape[1],
x_min = x_min,
y_max = y_max,
x_res = x_res,
y_res = y_res,
crs = vector_reprj.crs)
coords = [xy_coords['y_coords'], xy_coords['x_coords']]
dims = ['y', 'x']
# ---- Computations
results = {'forest': maxRootDepth_ds[['forest']].rename({'forest': 'soildepth1'}).copy(deep = True),
'general': maxRootDepth_ds[['general']].rename({'general': 'soildepth1'}).copy(deep = True)
}
for lc in ['forest', 'general']:
results[lc]['soildepth_total'] = results['forest']['soildepth1'] * 0 # initialization into a null array
for var in var_list:
for i in ['1', '2', '3']: # CWatM soil layer number
results[lc][var+i] = results['forest']['soildepth1'] * 0 # initialization into a null array
# =============================================================================
# lay_1 = xr.Dataset() # first soil layer in CWatM
# lay_2 = xr.Dataset() # second layer
# lay_3 = xr.Dataset() # third layer
# =============================================================================
for ucs in tabular_ds.no_ucs.unique():
print(f" . UCS {ucs}")
# Create a raster map of area covered by this specific UCS on each cell
# (only few cells will be non-null)
ucs_area = geo.rasterize_categorical(vector_reprj[vector_reprj.no_ucs == ucs],
grid_gdf,
is_polygon = True,
)
da_stacked = ucs_area.where(ucs_area > 0, drop = True).stack(c = [x_var2, y_var2])
loc = da_stacked[da_stacked.notnull()].c.values
# Selection for one UCS
sub_tabular_ds = tabular_ds[tabular_ds.no_ucs == ucs]
uts_list = sub_tabular_ds.id_uts.unique()
for uts in uts_list:
# Selection for one UTS
uts_ds = sub_tabular_ds[sub_tabular_ds.id_uts == uts].copy()
uts_ds.set_index('no_strate', inplace = True)
uts_ds.loc[0] = 0 # add a first null ligne
uts_ds.sort_values(by = 'prof_inf_moy', inplace = True)
# Safeguard for missing data
uts_ds.reset_index(inplace = True, drop = True, names = 'no_strate')
# n_strates = uts_ds.no_strate.max() # number of strates for this UTS
n_strates = uts_ds.index.max() # number of strates for this UTS
# =============================================================================
# # Create an xr.DataArray with strate values on the z-axis
# strate_ds = ucs_area.copy(deep = True)
# strate_ds = strate_ds.expand_dims(dim = {'strate': range(1, n_strates+1)}, axis = 0).copy()
# =============================================================================
# =============================================================================
# for no_st in strate_ds.strate.values:
# =============================================================================
# =============================================================================
# strate_ds.loc[{'strate': no_st}] = uts_ds[uts_ds.no_strate == no_st]['prof_inf_moy'].item()
# =============================================================================
# strate_ds = strate_ds.where(strate_ds.strate == st, uts_ds[uts_ds.no_strate == st]['prof_inf_moy'].item())
# =============================================================================
# for lc in landcover:
# for var in var_list:
# diff = maxRootDepth_ds[lc] - uts_ds.loc[no_st, 'prof_inf_moy']
# cond = diff >= 0
# lay_2[var] = lay_2[var] + xr.where(
# cond,
# uts_ds.loc[no_st, var] * uts_ds.loc[no_st, 'prof_inf_moy'],
# ...
# )
#
# lay_3[var] = lay_3[var] + xr.where(
# cond,
# -diff * uts_ds.loc[no_st, var]) +
# (uts_ds.loc[i, 'prof_inf_moy'] - uts_ds.loc[i-1, 'prof_inf_moy']) * uts_ds.loc[i, var] for i in range(no_st, strate_ds.strate.max())
# =============================================================================
# Compute soil depth
results['forest']['soildepth_total'] = results['forest']['soildepth_total'] + ucs_area * uts_ds['prof_inf_moy'].max()*uts_ds['pourcent'].max()/100
# average soil depth weighted by the coverage of each UTS
# Dividing by the total coverage (raster_area) will be done at the end,
# to do it at once, and to avoid nan (0 / 0).
for lc in maxRootDepth_ds:
# Compute texture
for var in var_list:
for cell in loc:
root_depth = maxRootDepth_ds[lc].loc[{x_var2: cell[0], y_var2: cell[1]}].item()
val1 = 0
# Soil strate number, notconsidering the first strate (layer 0 added by the script to the data)
no_st = 1 # uts_ds.index.min()+1
strate_depths = uts_ds.loc[:, 'prof_inf_moy'].values
# 5 cm, first soil layer of CWatM
while (0.05 > strate_depths[no_st]) \
& (no_st < uts_ds.index.max()):
val1 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
no_st += 1
if no_st >= uts_ds.index.max():
val1 += (0.05 - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
# val1 = val1 / strate_depths[no_st]
val1 = val1 / 0.05
val2 = val1
val3 = val1
else:
val1 += (0.05 - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
val2 = (strate_depths[no_st] - 0.05) * uts_ds.loc[no_st, var]
no_st += 1
while (root_depth > strate_depths[no_st]) \
& (no_st < uts_ds.index.max()):
val2 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
no_st += 1
if no_st >= uts_ds.index.max():
val1 = val1 / 0.05
val2 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
val2 = val2 / (strate_depths[no_st] - 0.05)
val3 = val2
else:
val2 += (root_depth - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
val3 = (strate_depths[no_st] - root_depth) * uts_ds.loc[no_st, var]
no_st += 1
while (strate_depths[no_st] < strate_depths.max()) \
& (no_st < uts_ds.index.max()):
# depth of the third soil layer = max depth
val3 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
no_st += 1
val3 += (strate_depths[no_st] - strate_depths[no_st-1]) * uts_ds.loc[no_st, var]
val1 = val1 / 0.05
val2 = val2 / (root_depth - 0.05)
val3 = val3 / (strate_depths.max() - root_depth)
for i, val in {1: val1, 2: val2, 3:val3}.items():
results[lc][var + str(i)].loc[{x_var2: cell[0], y_var2: cell[1]}] \
= results[lc][var + str(i)].loc[{x_var2: cell[0], y_var2: cell[1]}] \
+ ucs_area.loc[{x_var2: cell[0], y_var2: cell[1]}] * val * uts_ds['pourcent'].max()/100
# Finish computing soil depth
# So fat total soil depth is computed the same way for forest and grassland
results['general']['soildepth_total'] = results['forest']['soildepth_total'] # same total soil depth
# ---- Normalize by total area
for lc in ['forest', 'general']:
# Soil depth
results[lc]['soildepth_total'] = results[lc]['soildepth_total'] / raster_area['area']
# Corrections when maxRootDepths deeper than max soil depth:
results[lc]['soildepth1'] = results[lc]['soildepth1'].where(
results[lc]['soildepth1'] < results[lc]['soildepth_total'], results[lc]['soildepth_total'])
results[lc]['soildepth2'] = results[lc]['soildepth_total'] - results[lc]['soildepth1']
# results[lc]['soildepth2'] = results[lc]['soildepth2'].where(results[lc]['soildepth2'] >= 0, 0) # replace negative values with 0
# Soil texture
for var in var_list:
for i in ['1', '2', '3']: # CWatM soil layer number
results[lc][var+i] = results[lc][var+i] / raster_area['area']
# ---- Compute soil parameters with transfer functions
for i in ['1', '2', '3']: # CWatM soil layer number
# soildata must be a list of lists like [# [%sand, %silt, %clay, (bulk density, th33, th1500)]]
# see https://github.com/usda-ars-ussl/rosetta-soil
# Safeguard if percentages do not sum up to 100%
norm = results[lc]['TAUX SABLE_val_mod'+i].values.flatten(order = 'C') + results[lc]['TAUX LIMON_val_mod'+i].values.flatten(order = 'C') \
+ results[lc]['TAUX ARGILE_val_mod'+i].values.flatten(order = 'C')
soildata = SoilData.from_array(
np.array([
results[lc]['TAUX SABLE_val_mod'+i].values.flatten(order = 'C')/norm*100, # [g/kg] -->[-] --> [%]
results[lc]['TAUX LIMON_val_mod'+i].values.flatten(order = 'C')/norm*100,
results[lc]['TAUX ARGILE_val_mod'+i].values.flatten(order = 'C')/norm*100,
# results[lc]['DENS_APPAR_val_mod'+i].values.flatten(order = 'C'), # faulty in RRP Pays Basque
]).transpose().tolist()
)
mean, _, code = rosetta(3, soildata)
results[lc]['KSat'+i] = ([y_var2, x_var2], 10**mean.transpose()[4].reshape(results[lc]['soildepth_total'].shape)) # [cm/d]
results[lc]['alpha'+i] = ([y_var2, x_var2], 10**mean.transpose()[2].reshape(results[lc]['soildepth_total'].shape)) # [1/cm]
results[lc]['lambda'+i] = ([y_var2, x_var2], 10**mean.transpose()[3].reshape(results[lc]['soildepth_total'].shape) - 1) # [-]
results[lc]['thetas'+i] = ([y_var2, x_var2], mean.transpose()[1].reshape(results[lc]['soildepth_total'].shape)) # [-]
results[lc]['thetar'+i] = ([y_var2, x_var2], mean.transpose()[0].reshape(results[lc]['soildepth_total'].shape)) # [-]
# ---- Correction and export
# results[lc] = geo.georef(results[lc], crs = results[lc].rio.crs)
for i in ['3', '2', '1']:
for var in ['soildepth', 'soildepth_total', 'KSat', 'alpha',
'lambda', 'thetas', 'thetar',] + var_list:
if (var in ['soildepth', 'soildepth_total']) & (i == '3'): # does not exist
continue
# Safeguard for holes in results
if var != 'soildepth':
if i in ['1', '2']:
# fill with lower layer
results[lc][var+i] = results[lc][var+i].where(
results[lc][var+i].notnull() | results[lc]['soildepth1'].isnull(), results[lc][var+str(int(i)+1)])
# =============================================================================
# # Fill with average value
# results[lc][var+i] = results[lc][var+i].where(
# results[lc][var+i].notnull() | results[lc]['soildepth1'].isnull(), results[lc][var+i].mean())
# =============================================================================
# =============================================================================
# # Mask data
# results[lc][var+i] = results[lc][var+i].where(results[lc]['soildepth1'].notnull())
# =============================================================================
# Georef
results[lc][var+i] = geo.georef(results[lc][var+i])
# Fill with average value
results[lc][var+i] = geo.cropfill(results[lc][var+i], m)
# Export
geo.export(
results[lc][var+i],
outpath / lc / (var+i + f'_mask{n_mask}' + f'_{res}m' + '.nc'))
#%%% Water Use data
@formating
def withdrawal(data, *,
domes_efficiency,
indus_efficiency,
yearly_to_monthly = None,
mask = None,
bounds = None,
resolution = None,
x0 = None,
y0 = None,
base_template = None,
outpath = None,
**rio_kwargs,
):
# ---- Initialisations
# Load
gdf = geo.load_any(data)
gdf['volume'] = gdf['volume'] / 1e6 # convert m3 into Mm3
# TODO : pouvoir universaliser ces noms de colonnes
gdf_dict = dict()
gdf_dict['surface'] = gdf[gdf['libelle_type_milieu'] == 'Surface continental']
gdf_dict['groundwater'] = gdf[gdf['libelle_type_milieu'] == 'Souterrain']
gdf_dict['domestic'] = gdf[gdf['libelle_usage'] == 'EAU POTABLE']
gdf_dict['industry'] = gdf[gdf['libelle_usage'] == 'INDUSTRIE et ACTIVITES ECONOMIQUES (hors irrigation, hors énergie)']
# Folders
if isinstance(data, (str, Path)):
data = Path(data)
outpath = data.parent / 'cwatm'
outname = data.stem
if not outpath.exists():
os.makedirs(outpath)
else:
if outpath is None:
print("Err: `outpath` argument is required when `data` is not a file")
return
else:
outname = 'withdrawal'
# Time
time_dim = geo.main_time_dims(gdf)[0]
year_start = int(gdf[time_dim].dt.year.min())
year_end = int(gdf[time_dim].dt.year.max())
# Gross/Netto coefficients
efficiency_dict = dict()
efficiency_dict['domestic'] = domes_efficiency
efficiency_dict['industry'] = indus_efficiency
# ---- Format yearly_to_monthly
if yearly_to_monthly is not None:
# Load files, and disqualify numerics
if isinstance(yearly_to_monthly, (int, float)):
print("Err: `yearly_to_monthly` argument should be an iterable or a xarray variable with a length of 12. "
f"As it is a {type(yearly_to_monthly)}, it is ignored.")
yearly_to_monthly = None
elif isinstance(yearly_to_monthly, (str, Path)):
yearly_to_monthly = geo.load_any(yearly_to_monthly)
if isinstance(yearly_to_monthly, xr.DataArray):
yearly_to_monthly = yearly_to_monthly.to_dataset(name = 'volume_coeff')
if yearly_to_monthly is not None:
if isinstance(yearly_to_monthly, xr.Dataset):
yearly_to_monthly.rename({geo.main_time_dims(yearly_to_monthly)[0]: 'time',
geo.main_vars(yearly_to_monthly)[0]: 'volume_coeff'})
if len(yearly_to_monthly.time) > 12:
print("Warning: `yearly_to_monthly` argument should have a length of 12. "
f"As its actual lenght is {len(yearly_to_monthly)}, it is truncated.")
yearly_to_monthly = yearly_to_monthly.isel(time = range(0, 12)).copy()
yearly_to_monthly = yearly_to_monthly.assign_coords(pd.to_datetime(
year = 2020,
month = yearly_to_monthly.time.dt.month,
day = yearly_to_monthly.time.dt.day)
).sortby('time')
elif len(yearly_to_monthly.time) < 12:
print(f"Err: `yearly_to_monthly` argument is ignored because it should have a length of 12, not {len(yearly_to_monthly.time)}")
yearly_to_monthly = None
elif isinstance(yearly_to_monthly, (list, tuple, np.ndarray)):
if len(yearly_to_monthly) > 12:
print("Warning: `yearly_to_monthly` argument should have a length of 12. "
f"As its actual lenght is {len(yearly_to_monthly)}, it is truncated.")
yearly_to_monthly = yearly_to_monthly[0:12]
# Convert list to DataArray
yearly_to_monthly = xr.DataArray(
coords = {'time': pd.date_range(
start = '2000-01-01',
end = '2000-12-01',
freq = 'MS')},
data = yearly_to_monthly,
name = 'volume_coeff'
)
elif len(yearly_to_monthly.time) < 12:
print(f"Err: `yearly_to_monthly` argument is ignored because it should have a length of 12, not {len(yearly_to_monthly.time)}")
yearly_to_monthly = None
else:
# Convert list to DataArray
yearly_to_monthly = xr.DataArray(
coords = {'time': pd.date_range(
start = '2000-01-01',
end = '2000-12-01',
freq = 'MS')},
data = yearly_to_monthly,
name = 'volume_coeff'
)
elif isinstance(yearly_to_monthly, (pd.DataFrame, pd.Series)):
# Convert to numpy.array and correct values
if isinstance(yearly_to_monthly, pd.DataFrame):
# Convert pd.DataFrames to pd.Series
yearly_to_monthly = yearly_to_monthly[geo.main_vars(yearly_to_monthly)[0]]
if len(yearly_to_monthly) > 12:
print("Warning: `yearly_to_monthly` argument should have a length of 12. "
f"As its actual lenght is {len(yearly_to_monthly)}, it is truncated.")
yearly_to_monthly = yearly_to_monthly[0:12]
elif len(yearly_to_monthly.time) < 12:
print(f"Err: `yearly_to_monthly` argument is ignored because it should have a length of 12, not {len(yearly_to_monthly.time)}")
yearly_to_monthly = None
if yearly_to_monthly is not None:
if isinstance(yearly_to_monthly.index, pd.DatetimeIndex):
# If there is an index, reoerder it (just in case it is aligned
# with an hydrological or meteorological year instead of a calendar year)
yearly_to_monthly.set_index(pd.to_datetime({
'year': 2000,
'month': yearly_to_monthly.index.month,
'day': yearly_to_monthly.index.day}).sort_values(), inplace = True)
# Convert to DataArray
yearly_to_monthly = yearly_to_monthly.to_xarray()
else:
print("Err: `yearly_to_monthly` is ingored as its type is not recognized. ")
yearly_to_monthly = None
# Compute values so that the distribution sums up to 1
if yearly_to_monthly is not None:
yearly_to_monthly = yearly_to_monthly / (yearly_to_monthly.sum(dim = 'time'))
# For each mask
n_mask = 0
for m in mask:
n_mask += 1
# TODO
# ---- Cut json files on masks --> move to stzfr.bnpe()
# =============================================================================
# print(f"\nCut shapefile over mask {n_mask}/{len(mask)}:")
# reprj_gdf = geo.reproject(gdf,
# # main_vars = 1, # useless
# mask = m)
#
# geo.export(reprj_gdf,
# concat_folder / Path(f"mask{n_mask}") / f"{year_start}-{year_end}_clipped.json",
# encoding='utf-8')
# =============================================================================
# TODO
# ---- CSV summary --> move to stzfr.bnpe()
# =============================================================================
# # Reorder columns (optional)
# col_names_base = ['code_ouvrage', 'nom_commune', 'volume', 'code_usage', 'libelle_usage', 'prelevement_ecrasant'] + [time_dim]
#
# if ('libelle_type_milieu' in gdf.columns) & ('code_type_milieu' in gdf.columns):
# col_names = col_names_base + ['code_type_milieu', 'libelle_type_milieu']
# elif ('libelle_type_milieu' in gdf.columns) & ('code_type_milieu' not in gdf.columns):
# col_names = col_names_base + ['libelle_type_milieu']
# elif ('libelle_type_milieu' not in gdf.columns) & ('code_type_milieu' in gdf.columns):
# col_names = col_names_base + ['code_type_milieu']
# # =============================================================================
# # df = pd.DataFrame(reprj_gdf[col_names])
# # =============================================================================
#
# col_names = [time_dim] + col_names
# remaining_col = list(set(gdf.columns) - set(col_names))
# # =============================================================================
# # df = df[[time_dim] + col_names
# # =============================================================================
# df = pd.DataFrame(gdf[col_names + remaining_col])
#
# # Remove useless rows
# df = df[df.volume > 0]
#
# # Sort by nom_commune
# df.sort_values(by = 'nom_commune', inplace = True)
#
# # Export du fichier récapitulatif csv
# df.to_csv(concat_folder / "recap.csv", sep = ';', encoding = 'utf-8')
# =============================================================================
# For each resolution
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
rio_kwargs['resolution'] = res
# Rasterization
ds_dict = dict()
# ---- [Rasterization] Static map of ratio surface/groundwater abstraction
daily = False # Determine time step
monthly = False
yearly = False
time_step = (gdf_dict['surface'][time_dim][1] - gdf_dict['surface'][time_dim][0]).total_seconds()
if round(time_step / 86400) == 1:
daily = True
elif round(time_step / 2592000) == 1:
monthly = True
elif round(time_step / 31557600) == 1:
yearly = True
for origin in ['surface', 'groundwater']:
ds_dict[origin] = geo.reproject(data = gdf_dict[origin], mask = m, bounds = bounds,
x0 = x0, y0 = y0,
base_template = base_template,
rasterize = True,
main_var_list = 'volume',
rasterize_mode = 'sum',
**rio_kwargs)
x_var, y_var = geo.main_space_dims(ds_dict[origin])[0]
ds_dict['swAbstractionFrac'] = ( ds_dict['surface'] / (ds_dict['surface'] + ds_dict['groundwater']) ).mean(dim = time_dim)
ds_dict['swAbstractionFrac'] = ds_dict['swAbstractionFrac'].where(
(ds_dict['groundwater'].mean(dim = time_dim) > 0) | (ds_dict['groundwater'].mean(dim = time_dim).isnull()), 1)
# ---- [Rasterization] Monthly data of domestic and industry withdrawals
for use in ['domestic', 'industry']:
gdf_dict[use].set_index('time', inplace = True)
ds_dict[use] = geo.reproject(data = gdf_dict[use], mask = m, bounds = bounds,
x0 = x0, y0 = y0,
base_template = base_template,
rasterize = True,
main_var_list = 'volume',
rasterize_mode = 'sum',
**rio_kwargs)
# Annual mean
if yearly:
# yearly sum --> monthly average
monthly_ds = ds_dict[use].copy(deep = True)
monthly_ds = monthly_ds.reindex({time_dim: pd.date_range(
start = f'{year_start}-01-01',
end = f'{year_end}-12-01',
freq = 'MS')})
for date in ds_dict[use][time_dim]:
year = str(date.dt.year.item())
year_array = ds_dict[use]['volume'].loc[
{time_dim: slice(f'{year}-01-01', f'{year}-12-31')}]
for month in range(1, 13):
monthly_ds['volume'].loc[
{time_dim: pd.to_datetime([f'{year}-{month}-01'])}
] = year_array.assign_coords(
{time_dim: pd.to_datetime([f"{year}-{month}-01"])}
)
# ========== previous method: much slower =====================================
# monthly_list = list()
# for date in ds_dict[use][time_dim]:
# year = date.dt.year.item()
# year_array = ds_dict[use][['volume']].loc[
# {time_dim: slice(f'{year}-01-01', f'{year}-12-31')}] / 12
# for month in range(1, 13):
# monthly_list.append(
# year_array.assign_coords(
# {time_dim: pd.to_datetime([f"{year}-{month}-01"])}
# )
# )
# monthly_ds = geo.merge_data(monthly_list)
# =============================================================================
# Apply monthly variations
if yearly_to_monthly is not None:
monthly_ds.loc[
{time_dim: slice(year, year)}
] = monthly_ds.loc[
{time_dim: slice(year, year)}
] * yearly_to_monthly['volume_coeff'].assign_coords(
{time_dim: pd.to_datetime({'year': int(year),
'month': yearly_to_monthly.time.dt.month,
'day': yearly_to_monthly.time.dt.day
})
}
)
# Comppute gross and netto demands
monthly_ds = monthly_ds.rename({'volume': f'{use}GrossDemand'})
monthly_ds[f'{use}NettoDemand'] = monthly_ds[f'{use}GrossDemand'] * efficiency_dict[use]
# Update ds_dict
ds_dict[use] = monthly_ds
# ---- Export
for var in ['domestic', 'industry', 'swAbstractionFrac']:
if 'dst_crs' in rio_kwargs:
ds_dict[var] = geo.georef(data = ds_dict[var],
crs = rio_kwargs['dst_crs'])
else:
ds_dict[var] = geo.georef(data = ds_dict[var])
geo.export(ds_dict[var],
outpath / (outname + f"_{var}" + f"_{year_start}-{year_end}" + res_suffix + f'_mask{n_mask}' + '.nc'),
)
@formating
def withdrawal_modflow(data, *,
mask = None,
bounds = None,
resolution = None,
x0 = None,
y0 = None,
base_template = None,
outpath = None,
**rio_kwargs,
):
# ---- Initialisations
# Load
gdf = geo.load_any(data)
# Folders
if isinstance(data, (str, Path)):
data = Path(data)
outpath = data.parent / 'cwatm'
outname = data.stem
else:
if outpath is None:
print("Err: `outpath` argument is required when `data` is not a file")
return
else:
outname = 'withdrawal'
# Time
time_dim = geo.main_time_dims(gdf)[0]
# Select gruondwater pumpings
gdf_groundwater = gdf[gdf['libelle_type_milieu'] == 'Souterrain']
# gdf_groundwater = gdf_groundwater.drop(time_dim, axis = 1) # (Does not improve much the computation time)
# For each mask
n_mask = 0
for m in mask:
n_mask += 1
# For each resolution
for res in resolution:
if res is None:
res_suffix = ''
if 'resolution' in rio_kwargs:
rio_kwargs.pop('resolution')
else:
res_suffix = f'_{res}m'
rio_kwargs['resolution'] = res
ds_reprj = geo.reproject(data = gdf_groundwater, mask = m, bounds = bounds,
x0 = x0, y0 = y0,
base_template = base_template,
rasterize = True,
main_var_list = 'volume',
rasterize_mode = 'count',
**rio_kwargs)
ds = ds_reprj.fillna(0).sum(dim = time_dim)
ds = xr.where(
ds > 0, 1, 0, keep_attrs = True).astype(bool, keep_attrs = True)
# ds = ds.where(ds_reprj[{time_dim: 0}].notnull()) # (better to stick to the bool type, with no nan)
# ---- Export
if 'dst_crs' in rio_kwargs:
ds = geo.georef(data = ds,
crs = rio_kwargs['dst_crs'])
else:
ds = geo.georef(data = ds)
geo.export(ds,
outpath / (outname + "_wells_mask_modflow" + res_suffix + f'_mask{n_mask}' + '.nc'),
)
#%% TOOLS
#%%% EXPAND INPUTS
[docs]
def plot_annual_coeff(filepath,
*,
extension = '.csv',
date_format = "%d/%m",
to_file = False,
**kwargs):
"""
Convert a sparse timeseries tabular data (or a set of data) into a 10-day,
and plot it (or them).
Particularly handy to have a quick overview of *cropCoefficients* and
*interceptCap* annual timeseries.
Important: in this kind of sparse timeseries, it is assumed that values
correspond to their first occurence, and will be propagated foward (``01/10 | 5,
20/10 | 2`` means that the value ``5`` applies from 01/10 to 19/10, followed by the value ``2``).
Parameters
----------
data : str or pathlib.Path
File (tabular data) or folder.
extension : str, optional, default '.csv'
Used only when data is a folder. Define the file types that will be
taken into account.
date_format : str, optional, default "%d/%m"
Argument to load data ( with pandas.read_csv).
to_file : bool or path (str or pathlib.Path), default False
If True and if ``data`` is a path (str or pathlib.Path), the resulting
dataset will be exported to the same location as ``data``, while appending '_prolonged' to its name.
If ``to_file`` is a path, the resulting dataset will be exported to this specified filepath.
**kwargs :
Keys arguments for ``pandas.read_csv()``:
- ``sep``
- ``index_col``
- ``decimal``
- ...
Returns
-------
Create 10day timeseries (in a *10day* folder) as well as a html figure (in
a *continuous* folder).
"""
# ---- Initialization
if not isinstance(filepath, (str, Path)):
print("Err: Only file or folder are supported so far.")
return
filepath = Path(filepath)
if filepath.is_dir():
root, filelist = geo.get_filelist(filepath, extension = '.csv')
elif filepath.is_file():
root = filepath.parent
filelist = [filepath.name]
kwargs['date_format'] = date_format
df_all = pd.DataFrame()
if not (root / '10day').exists():
os.makedirs(root / '10day')
if not (root / 'continuous').exists():
os.makedirs(root / 'continuous')
for file in filelist:
print('\n----\n', file)
df = geo.load_any(
root / file,
**kwargs)
# Safeguard
if not isinstance(df, (pd.DataFrame, pd.Series)):
print("Err: file format not supported yet. Only tabular data is supported.")
return
varname = df.columns[0]
# Concatenate 3 arrays to be sure to have a full year starting from 1st January to 31st December
df = pd.concat([df, df.set_index(df.index + pd.DateOffset(years=1)), df.set_index(df.index + pd.DateOffset(years=2))])
# Fill the gaps in the index
df = df.reindex(pd.date_range(start = df.index[0], end = df.index[-1], freq = '1D'))
df.index.name = 'time'
# Fill the gaps in the values, by interpolating according to the time axis
df = df.interpolate(method = 'time')
# Keep only a full year starting from 1st January to 31st December
full_year = df[(df.index.day == 1) & (df.index.month == 1)].index.year[0].item()
df = df.loc[f'{full_year}-01-01': f'{full_year}-12-31']
# Change year to 2000 (for homogeneity with other CWatM inputs)
df.index = df.index + pd.DateOffset(years = 2000 - df.index[0].year)
# Keep only one value "each 10 days" (or more exaclty, keep only 01st, 11st and 21st of each month)
df = df[(df.index.day == 1) | (df.index.day == 11) | (df.index.day == 21)]
geo.export(df, root / '10day' / file)
# Rename col and concatenante
df_all.index = df.index
df_all[file] = df[varname] # df.iloc[:,0]
# df_all = geo.merge_data(root / 'continuous')
cmap = np.vstack([cmg.discrete(sequence_name = 'wong', alpha = 0.8, black = False,
alternate = False, color_format = 'float'), cmg.discrete(sequence_name = 'wong', alpha = 0.8, black = False,
alternate = False, color_format = 'float'), cmg.discrete(sequence_name = 'wong', alpha = 0.8, black = False,
alternate = False, color_format = 'float')])
_, _, figweb = ncp.plot_time_series(data = df_all,
labels = df_all.columns.values,
lstyles = ['-']*9 + ['--']*9 + ['dotted']*9,
lwidths = 1.5,
linecolors = cmap,
)
# ---- Layout
figweb.update_layout(#font_family = 'Open Sans',
title = {'font': {'size': 20},
'text': "cropCoefficients",
'xanchor': 'center',
'x': 0.45,
},
legend = {'title': {'text': 'Légende'},
'xanchor': 'right',
'y': 1.1,
'xref': 'container',
'yanchor': 'top',
'bgcolor': 'rgba(255, 255, 255, 0.2)',
},
plot_bgcolor = "white",
# legend = {'groupclick': 'togglegroup'},
width = 1700,
height = 800,
)
figweb.write_html(root / 'continuous' / f'{varname}.html')
#%%% STANDARDIZE OUTPUTS
# TODO: placeholder function that currently relies on geobricks.georef()
# It may be better to move this functionality from georef() to here
def output(data,
*, data_type = 'CWatM'):
# data_type peut être aussi = 'MODFLOW'
return geo.georef(data)
#%%% OTHER FUNCTIONS
def zones_basses(mnt_raster):
# Génère un raster indiquant les pixels sous le niveau de la mer, à partir
# d'un raster de modèle numérique de terrain
with xr.open_dataset(mnt_raster,
decode_times = False,
decode_coords = 'all') as mnt_ds:
mnt_ds.load()
def dummy_fractionLandcover(base, first_year = 2020, last_year = 2020):
frac0 = geo.dummy_input(base, 0)
frac1 = geo.dummy_input(base, 1)
mvar = geo.main_var(frac0)[0]
data_ds = frac0.copy()
data_ds = data_ds.rename({mvar: 'fracforest'})
data_ds['fracgrassland'] = frac1[mvar]
data_ds['fracirrNonPaddy'] = frac0[mvar]
data_ds['fracirrPaddy'] = frac0[mvar]
data_ds['fracsealed'] = frac0[mvar]
data_ds['fracwater'] = frac0[mvar]
data_ds = data_ds.expand_dims(dim = {'time': pd.date_range(start = f"{first_year}-01-01", periods = last_year - first_year + 1, freq = 'YE') }, axis = 0)
data_ds = geo.georef(data_ds, crs = data_ds.rio.crs)
data_ds, _ = geo.standard_fill_value(data_ds)
for var in geo.main_var(data_ds):
data_ds[var].attrs['long_name'] = var
return data_ds
def dummy_dzRel(base, out_path):
array = geo.dummy_input(base, 1)
mvar = geo.main_var(array)[0]
data_ds = xr.Dataset()
for n in [0, 1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]:
var = f'dzRel{n:04}'
data_ds[var] = array[mvar]
data_ds[var].attrs['long_name'] = var
data_ds = geo.georef(data_ds, crs = data_ds.rio.crs)
data_ds, _ = geo.standard_fill_value(data_ds)
geo.export(data_ds, Path(out_path)/"dzRel_dummy.nc")
return data_ds
def dummy_lakeres(base, out_path):
# array = geo.dummy_input(base, 1)
# mvar = geo.main_var(array)[0]
print("Warning: The input `base` should necessary be the mask")
data_ds = geo.dummy_input(base, 0)
mvar = geo.main_var(data_ds)[0]
xvar, yvar = geo.main_space_dims(data_ds)[0]
stack_da = data_ds[mvar].stack(xy = [xvar, yvar])
# Determine the coordinates of the first not-NaN cell
x, y = stack_da[stack_da.notnull()].xy[0].item()
data_ds[mvar].loc[{xvar: x, yvar: y}] = 1
data_ds = geo.georef(data_ds, crs = data_ds.rio.crs)
data_ds, _ = geo.standard_fill_value(data_ds)
geo.export(data_ds, Path(out_path)/"lakeres_dummy.nc")
return data_ds
def dummy_routing(base, out_path):
ds_dict = {}
ds_dict['chanMan'] = geo.dummy_input(base, 0.3)
ds_dict['chanGrad'] = geo.dummy_input(base, 0.02)
ds_dict['chanGradMin'] = geo.dummy_input(base, 0.0001)
ds_dict['chanLength'] = geo.dummy_input(base, 1400)
ds_dict['chanWidth'] = geo.dummy_input(base, 17)
ds_dict['chanDepth'] = geo.dummy_input(base, 1)
for var in ds_dict:
ds_dict[var] = geo.georef(ds_dict[var], crs = ds_dict[var].rio.crs)
ds_dict[var], _ = geo.standard_fill_value(ds_dict[var])
geo.export(ds_dict[var], Path(out_path)/f"{var}_dummy.nc")
return ds_dict