"""
Diagnostic for fig 3.24 in Chapter 3 of IPCC AR6 WGI.
This diagnostic either calculates bias with respect to the reference
dataset or plots the data and the reference dataset.
The diagnostic accepts the groups of the 1D datasets (with exception
of the resolved mask).
The additional keywords for the diagnostic provided through the recipe:
bias: an option boolean flag showing if bias is calculated, default: false
mask: an optional dictionary if the data should be masked, default: false
mask is a dictionary and should contain keywords:
flag: flag showing if the data should be masked
type: type of mask, accepted values: 'simple' and 'resolved'
group: name of the variable group with resolved mask (needed only if
type='resolved')
statistics: a dictionary with the statistics which will be plotted. Needs
keywords 'best_guess': a string with the statistical operator, and
'borders' a list with the statistics to be the borders for teh shading.
mpl_style: name of the matplotlib style file (optional)
caption: figure caption (optional)
color_style: optional name of the color style, colors are defined for
variable groups.
The resolved mask can have as many dimensions as one needs.
Initial development by: Lee de Mora (PML)
ledm@pml.ac.uk
Revised and corrected (15.01.2021): Elizaveta Malinina (CCCma)
elizaveta.malinina-rieger@ec.gc.ca
Rewritten and adapted to main branch v2.12+ (08.03.2025):
Elizaveta Malinina (CCCma)
elizaveta.malinina-rieger@ec.gc.ca
"""
import logging
import os
import sys
import esmvalcore.preprocessor as eprep
import iris
import iris.plot
import matplotlib.pyplot as plt
import esmvaltool.diag_scripts.shared.plot as eplot
from esmvaltool.diag_scripts.shared import (
group_metadata,
run_diagnostic,
save_data,
save_figure,
select_metadata,
)
# This part sends debug statements to stdout
logger = logging.getLogger(os.path.basename(__file__))
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
[docs]
class Data4Analyis:
"""Data class which later will be used for plotting.
Attributes
----------
name: str
name of the data group for which class is created
bias: bool
flag, showing if bias should be calculated
mask: bool
flag if data should be masked
mask_type: str
type of the mask: 'simple' or 'resolved'
reference: str
name of the reference dataset
ref_cube: iris.cube.Cube
iris cube with the reference dataset data
data: iris.cube.CubeList
iris cubelist with the data
best_guess: iris.cube.Cube
iris cube with the best guess to be plotted
border1: iris.cube.Cube
iris cube with the bottom/top for the shading
border2: iris.cube.Cube
iris cube with the top/bottom for the shading
"""
def __init__(
self,
name: str,
group: list,
cfg: dict,
mask_meta: list[dict] | None,
):
"""Initialize the class.
Parameters
----------
name:
Name of the data class, comes from the name of the group
group:
List with the metadata of the variable group
cfg:
Config dictionary coming from ESMValCore
mask_type:
Type of the mask to be used, if no mask to be used set to False
mask_meta:
Metadata for the mask group if mask_type='resolved'
"""
self.name = name
self.bias = bool(cfg.get("bias"))
self.mask = cfg.get("mask").get("flag") if cfg.get("mask") else False
self.mask_type = cfg["mask"].get("type") if self.mask else False
self.determine_reference(group)
self.obtain_data(group, mask_meta)
stats = cfg.get("data_statistics")
if not stats:
raise ValueError(
"statistics dictionary should be provided in the recipe. "
"The keywords 'best_guess' and 'borders' should be provided."
)
self.calculate_statistics(stats, cfg)
[docs]
def determine_reference(self, group: list):
"""Determine the reference dataset from the data group.
Parameters
----------
group:
List with the metadata of the variable group
Raises
------
ValueError
if more than one or no reference datasets have been provided
"""
if self.bias or self.mask:
reference = list(group_metadata(group, "reference_dataset").keys())
if len(reference) > 1:
raise ValueError("More then one reference dataset was given")
self.reference = reference[0]
if reference:
ref_metadata = select_metadata(group, dataset=self.reference)[
0
]
self.ref_cube = iris.load_cube(ref_metadata["filename"])
group.remove(ref_metadata)
else:
raise ValueError("No reference dataset has been provided")
[docs]
def obtain_data(self, group: list, mask_meta: list[dict] | None):
"""Obtain data for further statistics calculation.
Parameters
----------
group:
List with the metadata of the variable group
mask_meta:
List with the mask_metadata in case mask_type='resolved'
"""
files = list(group_metadata(group, "filename", sort=True).keys())
data_cblst = iris.load(files)
self.data = data_cblst
if self.mask:
self.mask_data(mask_meta)
if self.bias:
self.data = eprep.bias(self.data, reference=self.ref_cube)
[docs]
def mask_data(self, mask_meta: list[dict] | None):
"""Masks data.
Parameters
----------
mask_meta:
List with the mask_metadata in case mask_type='resolved'
Raises
------
ValueError
if more than one datasets for resolved mask are provided
or if data cubes have more than one dimension or if an
unsupported type of mask is provided
"""
if self.mask_type == "simple":
mask = self.ref_cube.data.mask
elif self.mask_type == "resolved":
mask_files = list(
group_metadata(mask_meta, "filename", sort=True).keys()
)
if len(mask_files) > 1:
raise ValueError(
"More than one dataset for the resolved mask "
"has been provided. Only one is supported."
)
mask_cb = iris.load_cube(mask_files[0])
data_coord = self.data[0].dim_coords
if len(data_coord) > 1:
raise ValueError(
"The data cubes have more than one "
"cordinates. Only flat cubes are supported."
)
data_coord = data_coord[0]
# determine the number of dimension over which the data is calculated
dim = [c.name() for c in mask_cb.dim_coords].index(
data_coord.name()
)
mask = []
# if there is less than half of data over the dimension the cell
# is masked
for i in range(mask_cb.shape[dim]):
mask.append(
mask_cb.data[(slice(None),) * dim + (i,)].count()
<= 0.5 * len(mask_cb.data[(slice(None),) * dim + (i,)])
)
# masking the reference cube
self.ref_cube.data.mask = self.ref_cube.data.mask | mask
else:
raise ValueError(
f"Mask type {self.mask_type} is not supported. "
"Only 'simple' and 'resolved' are supported."
)
for cube in self.data:
cube.data.mask |= mask
[docs]
def calculate_statistics(self, stats: dict, cfg: dict):
"""Calculate statistics which later will be plotted.
Parameters
----------
stats:
dictionary with the statistics which will be calculated.
Dictionary should have keywords 'best_guess' and 'borders'.
cfg:
Config dictionary coming from ESMValCore
"""
prov_dic = create_provenance("")
f_name = f"{self.name}_{self.data[0].var_name}_"
f_name = f_name + "bias_" if self.bias else f_name
if len(self.data) > 1:
bg_dic = eprep.multi_model_statistics(
self.data,
span="full",
statistics=[stats["best_guess"]],
ignore_scalar_coords=True,
)
self.best_guess = bg_dic[list(bg_dic.keys())[0]]
bg_name = f_name + list(bg_dic.keys())[0]
bord_dic = eprep.multi_model_statistics(
self.data,
span="full",
statistics=stats["borders"],
ignore_scalar_coords=True,
)
# saving border data only if more than 1 cube provided
self.border1 = bord_dic[list(bord_dic.keys())[0]]
save_data(
f_name + list(bord_dic.keys())[0], prov_dic, cfg, self.border1
)
self.border2 = bord_dic[list(bord_dic.keys())[1]]
save_data(
f_name + list(bord_dic.keys())[1], prov_dic, cfg, self.border2
)
else:
# if just one dataset is in the group there is no need
# to calculate the borders
self.best_guess = self.data[0]
bg_name = f_name[:-1]
self.border1 = None
self.border2 = None
logger.info(
"There were no statistics calculated for %s"
" because only one cube was provided",
self.name,
)
# saving the best guess data
save_data(bg_name, prov_dic, cfg, self.best_guess)
[docs]
def create_provenance(caption: str):
"""Create provenance dictionary."""
provenance_dic = {
"authors": ["malinina_elizaveta", "demora_lee"],
"caption": caption,
"references": ["eyring21ipcc"],
}
return provenance_dic
[docs]
def plot_bias_plot(data_list: list[Data4Analyis], cfg: dict):
"""Plot the diagnostic figure.
Parameters
----------
data_list:
List with the data classes which will be plotted
cfg:
Config dictionary coming from ESMValCore
"""
caption = cfg.get("caption") if cfg.get("caption") else ""
prov_dic = create_provenance(caption=caption)
st_file = eplot.get_path_to_mpl_style(cfg.get("mpl_style"))
plt.style.use(st_file)
fig = plt.figure(figsize=(6, 2.5))
for data in data_list:
if data.best_guess.dim_coords[0].name() == "longitude":
# to make the Pacific and Atlantic continuous
data.best_guess = data.best_guess.intersection(
longitude=(20.0, 380.0)
)
data.ref_cube = data.ref_cube.intersection(longitude=(20.0, 380.0))
# check if there are borders
if data.border1:
data.border1 = data.border1.intersection(
longitude=(20.0, 380.0)
)
data.border2 = data.border2.intersection(
longitude=(20.0, 380.0)
)
data_col = eplot.get_dataset_style(data.name, cfg.get("color_style"))
iris.plot.plot(
data.best_guess, color=data_col["color"], label=data.name
)
# if there is more than one realization, there's border plotted
if data.border1:
iris.plot.fill_between(
data.best_guess.dim_coords[0],
data.border1,
data.border2,
alpha=0.2,
linewidth=0,
color=data_col["color"],
)
if cfg.get("bias"):
xlim = fig.axes[0].get_xlim()
plt.hlines(0, *xlim, colors="silver", linestyles="dashed", zorder=1)
plt.xlim(*xlim) # to make the xlims the same as before
else:
ref_col = eplot.get_dataset_style(
data_list[0].reference, cfg.get("color_style")
)
iris.plot.plot(data_list[0].ref_cube, c=ref_col["color"])
plt.legend()
plt.title(caption)
x_label = data_list[0].best_guess.dim_coords[0].name()
x_label += f", {data_list[0].best_guess.dim_coords[0].units}"
plt.xlabel(x_label)
y_label = data_list[0].best_guess.var_name
y_label = y_label + " bias" if cfg.get("bias") else y_label
y_label = y_label + f", {data_list[0].best_guess.units.origin}"
plt.ylabel(y_label)
plt.tight_layout()
fig_path = os.path.join(cfg["plot_dir"], "figure")
save_figure(fig_path, prov_dic, cfg, fig, close=True)
[docs]
def main(cfg: dict):
"""Diagnostic itself."""
input_data = cfg["input_data"]
groups = group_metadata(input_data.values(), "variable_group", sort=True)
mask_type = cfg.get("mask").get("type") if cfg.get("mask") else False
mask_group = (
cfg.get("mask").get("group") if mask_type == "resolved" else None
)
mask_meta = groups.pop(mask_group) if mask_group else None
data_list = []
for group, group_cont in groups.items():
group_data = Data4Analyis(
name=group,
group=group_cont,
cfg=cfg,
mask_meta=mask_meta,
)
data_list.append(group_data)
plot_bias_plot(data_list, cfg)
logger.info("Success")
if __name__ == "__main__":
with run_diagnostic() as config:
main(config)