Source code for fragmentmnp.fragmentmnp

from typing import Tuple
import numpy as np
import numpy.typing as npt
from scipy.integrate import solve_ivp
from scipy import interpolate
from schema import SchemaError
from . import validation
from .output import FMNPOutput
from ._errors import FMNPNumericalError, FMNPDistributionValueError


[docs]class FragmentMNP(): """ The class that controls usage of the FRAGMENT-MNP model Parameters ---------- config : dict Model configuration options data : dict Model input data validate: bool, default=True Should config and data be validated? It is strongly recommended to use validation, but this option is provided if you are *certain* your config and data are correct and wish to speed up model initialisation """ def __init__(self, config: dict, data: dict, validate: bool = True) -> None: """ Initialise the model """ # Validate the config and data (if we are meant to) if validate: config, data = self._validate_inputs(config, data) # If we passed validation (or aren't validating), save attributes self.config = config self.data = data # Set the number of particle size classes and timesteps, and # the times at which to store the computed solution self.n_size_classes = self.config['n_size_classes'] self.n_timesteps = self.config['n_timesteps'] self.dt = self.config['dt'] self.t_grid = np.arange(0.5*self.dt, self.n_timesteps*self.dt + 0.5*self.dt, self.dt) self.t_eval = self.t_grid \ if self.config['solver_t_eval'] == 'timesteps' \ else self.config['solver_t_eval'] # Initial concentrations self.initial_concs = np.array(data['initial_concs']) self.initial_concs_diss = data['initial_concs_diss'] # Set the particle phys-chem properties self.psd = self._set_psd() self.surface_areas = self.surface_area(self.psd) self.fsd = self.set_fsd(self.n_size_classes, self.psd, self.data['fsd_beta']) self.density = data['density'] # Stop Pylance complaining about k_frag, k_diss and k_min not being # present (or being the wrong type) by declaring them as NumPy arrays self.k_frag = np.empty((self.n_size_classes, self.n_timesteps)) self.k_diss = np.empty((self.n_size_classes, self.n_timesteps)) self.k_min = np.empty((self.n_timesteps,)) # Calculate the rate constant distributions. If we've been given # a dict in data, use the contained params to create the distribution. # Else, presume that we've been given a scalar (validation will make # sure this is so) and use that as the average. for k in ['k_frag', 'k_diss', 'k_min']: if isinstance(data[k], dict): k_f = data[k]['k_f'] k_0 = data[k]['k_0'] is_compound = data[k]['is_compound'] # Get the params from the dict, excluding the average params = {n: p for n, p in data[k].items() if n not in ['k_f', 'k_0']} else: k_f = data[k] k_0 = 0.0 is_compound = True params = {} # If k_frag or k_diss, we want to calculate a 2D (s, t) distribution, # and if k_min, we just want a 1D (t) distribution if k in ['k_frag', 'k_diss']: k_dist = self.set_k_distribution(dims={'s': self.surface_areas, 't': self.t_grid}, k_f=k_f, k_0=k_0, params=params, is_compound=is_compound) # If the rate constant is k_frag, then no fragmentation is # allowed from the smallest size class and therefore we # manually set this to zero if k == 'k_frag': k_dist[0, :] = 0.0 else: k_dist = self.set_k_distribution(dims={'t': self.t_grid}, k_f=k_f, k_0=k_0, params=params, is_compound=is_compound) # Check no values are less than zero if np.any(k_dist < 0.0): msg = (f'Value for {k} distribution calculated from input ' 'data resulted in negative values. Ensure ' f'distribution params are such that all {k} values ' 'are positive.') raise FMNPDistributionValueError(msg) # Set self.k_[frag|diss|min] as this distribution setattr(self, k, k_dist)
[docs] def run(self) -> FMNPOutput: r""" Run the model with the config and data provided at initialisation. Returns ------- :class:`fragmentmnp.output.FMNPOutput` object containing model output Notes ----- The model numerically solves the following set of differential equations to give a time series of mass concentrations of particles `c`, dissolved polymer `c_diss` and mineralised polymer `c_min`. `k` is the current size class, `i` are the daughter size classes. .. math:: \frac{dc_k}{dt} = -k_{\text{frag},k} c_k + \sum_i f_{i,k} k_{\text{frag},i} c_i - k_{\text{diss},k} c_k .. math:: \frac{dc_\text{diss}}{dt} = \sum_k k_{\text{diss},k} c_k - k_\text{min} c_\text{diss} .. math:: \frac{dc_\text{min}}{dt} = k_\text{min} c_\text{diss} Here, :math:`k_{\text{frag},k}` is the fragmentation rate of size class `k`, :math:`f_{i,k}` is the fraction of daughter fragments produced from a fragmenting particle of size `i` that are of size `k`, :math:`k_{\text{diss},k}` is the dissolution rate from size class `k` and :math:`k_\text{min}` is the mineralisation rate from the dissolved pool. Mass concentrations are converted to particle number concentrations by assuming spherical particles with the density given in the input data. """ def f(t, c): """ The initial value problem for SciPy to solve. This must satisfy c'(t) = f(t, c) with initial values given in data, with N+2 solutions: c[:N] = mass concentration in each of the N size classes c[N] = total dissolved mass concentration c[N+1] = mineralised mass concentration """ # Get the number of size classes N = self.n_size_classes # Unpack the solutions c_particles = c[:N] c_dissolved = c[N] c_min = c[N + 1] # Not used as a source in any ODE (only grows) # Interpolate the time-dependent parameters to the specific # timestep given (which will be a float, rather than integer index) f_frag = interpolate.interp1d(self.t_grid, self.k_frag, axis=1, fill_value='extrapolate') f_diss = interpolate.interp1d(self.t_grid, self.k_diss, axis=1, fill_value='extrapolate') f_min = interpolate.interp1d(self.t_grid, self.k_min, axis=0, fill_value='extrapolate') k_frag = f_frag(t) k_diss = f_diss(t) k_min = f_min(t) # Build array of d/dt dcdt = np.empty(N+2) # Particle ODEs, for each size class: # Gains: fragmentation from bigger size classes # Loss: dissolution and fragmentation to smaller size classes for k in range(N): dcdt[k] = ( - k_frag[k] * c_particles[k] + np.sum(self.fsd[:, k] * k_frag * c_particles[:N]) - k_diss[k] * c_particles[k] ) # Dissolved mass ODE: # Gains: sum of kdiss[k] * c_particles[k] # Loss: k_min * c_dissolved dcdt_dissolved = np.sum(k_diss * c_particles) - k_min * c_dissolved # Assign to last entry dcdt[N] = dcdt_dissolved # Mineralized ODE: # Gains: k_min_avg * c_dissolved # No loss, so it's purely accumulative dcdt_min = k_min * c_dissolved dcdt[N + 1] = dcdt_min # Final differential to return return dcdt # Build the new N+2 initial conditions: N particulate states, # +1 dissolved, +1 mineralized y0 = np.concatenate([ self.initial_concs, # microplastic mass per size class [self.initial_concs_diss], # dissolved mass [0.0] # mineralized (initially zero) ]) # Solve the ODE soln = solve_ivp( fun=f, method=self.config['solver_method'], t_span=(self.t_grid.min(), self.t_grid.max()), y0=y0, t_eval=self.t_eval, rtol=self.config['solver_rtol'], atol=self.config['solver_atol'], max_step=self.config['solver_max_step'] ) if not soln.success: raise FMNPNumericalError('Model solution could not be found: ' + f'{soln.message}') # Extract solution c_part_sol = soln.y[:self.n_size_classes, :] c_diss_sol = soln.y[self.n_size_classes, :] c_min_sol = soln.y[self.n_size_classes + 1, :] # Convert microparticle mass to particle number n_part_sol = self.mass_to_particle_number(c_part_sol) # Build the FMNPOutput object from the solution return FMNPOutput( t=soln.t, c=c_part_sol, n=n_part_sol, c_diss=c_diss_sol, c_min=c_min_sol, soln=soln, psd=self.psd, )
[docs] def mass_to_particle_number(self, mass): """ Convert mass (concentration) to particle number (concentration). """ return mass / (self.density * self.volume(self.psd))[:, np.newaxis]
def _set_psd(self) -> npt.NDArray[np.float64]: """ Calculate the particle size distribution based on the config options passed in `self.config` """ if 'particle_size_classes' in self.config: # If a particle size distribution has been specified, use that psd = np.array(self.config['particle_size_classes']) elif 'particle_size_range' in self.config: # Else, construct the size distribution from the given size range psd = np.logspace(*self.config['particle_size_range'], self.n_size_classes) else: # TODO move this check into validation raise ValueError('particle_size_classes or particle_size_range ' + 'must be present in the model config, but ' + 'neither were found.') return psd
[docs] @staticmethod def set_k_distribution(dims: dict, k_f: float, k_0: float = 0.0, params: dict = {}, is_compound: bool = True) -> npt.NDArray[np.float64]: r""" Create a distribution based on the rate constant scaling factor ``k_f`` and baseline adjustment factor ``k_0``. The distribution will be a compound or additive combination of power law / polynomial, exponential, logarithmic and logistic regressions, encapsulated in the function :math:`X(x)`, and have dimensions given by `dims`. For a distribution with `D` dimensions: .. math:: k(\mathbf{x}) = k_f \prod_{d=1}^D X(x_d) + k_0 or .. math:: k(\mathbf{x}) = k_f \sum_{d=1}^D X(x_d) + k_0 :math:`X(x)` is then given either by: .. math:: X(x) = A_x \hat{x}^{\alpha_x} \cdot B_x e^{-\beta_x \hat{x}} \cdot C_x \ln (\gamma_x \hat{x}) \cdot \frac{D_x}{1 + e^{-\delta_{x,1}(\hat{x} - \delta_{x,2})}} or the user can specify a polynomial instead of the power law term: .. math:: X(x) = \sum_{n=1}^N A_{x,n} \hat{x}^n \cdot B_x e^{-\beta_x \hat{x}} \cdot C_x \ln (\gamma_x \hat{x}) \cdot \frac{D_x}{1 + e^{-\delta_{x,1}(\hat{x} - \delta_{x,2})}} In the above, the dimension value :math:`\hat{x}` is normalised such that the median value is equal to 1: :math:`\hat{x} = x/\tilde{x}`. Parameters ---------- dims : dict A dictionary that maps dimension names to their grids, e.g. to create a distribution of time `t` and particle surface area `s`, `dims` would equal `{'t': t, 's': s}`, where `t` and `s` are the timesteps and particle surface area bins over which to create this distribution. The dimension names must correspond to the subscripts used in `params`. The values are normalised such that the median of each dimension is 1. k_f : float Rate constant scaling factor k_0 : float, default=0 Rate constant baseline adjustment factor params : dict, default={} A dictionary of values to parameterise the distribution with. See the notes below. is_compound : bool, default=True Whether the regression for each dimension are combined by multiplying (compound) or adding. Returns ------- k = np.ndarray Distribution array over the dims provided Notes ----- `k` is modelled as a function of the dims provided, and the model builds this distribution as a combination of power law / polynomial, exponential, logarithmic and logistic regressions, enabling a broad range of dependencies to be accounted for. This distribution is intended to be applied to rate constants used in the model, such as `k_frag` and `k_diss`. The `params` dict gives the parameters used to construct this distribution using the equation above. That is, :math:`A_{x}` (where `x` is the dimension), :math:`\alpha_{x_i}` etc are given in the `params` dict as e.g. `A_t`, `alpha_t`, where the subscript (`t` in this case) is the name of the dimension corresponding to the `dims` dict. This function does not require any parameters to be present in `params`. Non-present values are defaulted to values that remove the influence of that particular expression, and letting all parameters default results in a constant `k` distribution. More specifically, the params that can be specified are: A_x : array-like or float, default=1 Power law coefficient(s) for dim `x` (e.g. ``A_t`` for dim `t`). If a scalar is provided, this is used as the coefficient for a power law expression with ``alpha_x`` as the exponent. If a list is provided, these are used as coefficients in a polynomial expression, where the order of the polynomial is given by the length of the list. For example, if a length-2 list ``A_t=[2, 3]`` is given, then the resulting polynomial will be :math:`3t^2 + 2t` (note the list is in *ascending* order of polynomials). alpha_x : float, default=0 If `A_x` is a scalar, `alpha_x` is the exponent for this power law expression. For example, if ``A_t=2`` and ``alpha_t=0.5``, the resulting power law will be :math:`2t^{0.5}`. B_x : float, default=1 Exponential coefficient. beta_x : float, default=0 Exponential scaling factor. C_x : float or None, default=None If a scalar is given, this is the coefficient for the logarithmic expression. If ``None`` is given, the logarithmic expression is set to 1 (i.e. it is ignored). gamma_x : float, default=1 Logarithmic scaling factor. D_x : float or None, default=None If a scalar is given, this is the coefficient for the logistic expression. If `None` is given, the logistic expression is set to 1. delta1_x : float, default=1 Logistic growth rate (steepness of the logistic curve). delta2_x : float or None, default=None Midpoint of the logistic curve, which denotes the `x` value where the logistic curve is at its midpoint. If `None` is given, the midpoint is assumed to be the at the midpoint of the `x` range. For example, for the time dimension `t`, if the model timesteps go from 1 to 100, then the default is ``delta2_t=50``. If any dimension values are equal to 0, the logarithmic term returns 0 rather than being undefined. .. warning:: The parameters used for calculating distributions such as `k_frag` and `k_diss` have changed from previous versions, which only allowed for a power law relationship. This causes breaking changes after v0.1.0. """ if dims == {}: raise Exception('Trying to create k distribution but `dims` dict', 'is empty. You must provide at least one', 'dimension.') else: # Create a grid out of our dimensions (and preserve the matrix # indexing order with 'ij') grid = np.meshgrid(*[x for x in dims.values()], indexing='ij') # List of the regressions for each dimension, which will be # populated when we loop over the dimensions X = [] # Loop over the dimensions for i, x in enumerate(grid): # Normalise the values x_norm = x / np.median(x) # Get the name of this dim name = list(dims.keys())[i] # Pull out the params for convenience A = params.get(f'A_{name}', 1.0) alpha = float(params.get(f'alpha_{name}', 0.0)) B = float(params.get(f'B_{name}', 1.0)) beta = float(params.get(f'beta_{name}', 0.0)) C = params.get(f'C_{name}', None) gamma = float(params.get(f'gamma_{name}', 1.0)) D = params.get(f'D_{name}', None) delta_1 = float(params.get(f'delta1_{name}', 1.0)) delta_2 = params.get(f'delta2_{name}', None) # Let users specify a polynomial by listing A coefficients, # presuming the exponents (alpha) will be 1, 2, 3 etc # corresponding to the list elements in A. Otherwise, use # A and alpha as a power law A*x**alpha if isinstance(A, (list, tuple, np.ndarray)): powers = np.arange(1, len(A) + 1) power_x = 0.0 for i, power in enumerate(powers): power_x = power_x + A[i] * x_norm**power else: power_x = float(A) * x_norm**alpha # Exponential term exp_x = B * np.exp(-beta*x_norm) # Only calculate the ln term if C is not None, and if x=0, # then set ln(x) to 0 if C is not None: arg = gamma * x_norm ln_term = np.log(arg, out=np.zeros_like(arg, dtype=np.float64), where=(arg != 0)) ln_x = float(C) * ln_term else: ln_x = 1.0 # If the logistic delta_2 term (the x value at the midpoint # along the k axis) isn't specified, # then calculate it as # halfway along the x axis if (delta_2 is None) and (D is not None): delta_2 = (x_norm.max() - x_norm.min()) / 2 # Calculate the logistic contribution, only if D is not None logit_x = float(D) / \ (1 + np.exp(-delta_1 * (x_norm - float(delta_2)))) \ if D is not None else 1.0 # Multiply all the expressions together for this dimension X.append(power_x * exp_x * ln_x * logit_x) # Calculate the final distribution by multiplying X across the # dimensions if is_compound: k = k_f * np.prod(X, axis=0) + k_0 else: k = k_f * np.sum(X, axis=0) + k_0 return k
[docs] @staticmethod def set_fsd(n: int, psd: npt.NDArray[np.float64], beta: float) -> npt.NDArray[np.float64]: r""" Set the fragment size distribution matrix, assuming that fragmentation events result in a split in mass between daughter fragments that scales proportionally to :math:`d^\beta`, where :math:`d` is the particle diameter and :math:`\beta` is an empirical fragment size distribution parameter. For example, if :math:`\beta` is negative, then a larger proportion of the fragmenting mass goes to smaller size classes than larger. For an equal split between daughter size classes, set :math:`\beta` to 0. Parameters ---------- n : int Number of particle size classes psd : np.ndarray Particle size distribution beta : float Fragment size distribution empirical parameter Returns ------- np.ndarray Matrix of fragment size distributions for all size classes """ # Start with a zero-filled array of shape (N,N) fsd = np.zeros((n, n)) # Fill with the split to daughter size classes scaled # proportionally to d^beta for i in np.arange(1, n): fsd[i, :-(n-i)] = (psd[:-(n-i)] ** beta / np.sum(psd[:-(n-i)] ** beta)) return fsd
[docs] @staticmethod def surface_area(psd: npt.NDArray[np.float64]) -> \ npt.NDArray[np.float64]: """ Return the surface area of the particles, presuming they are spheres. This function can be overloaded to account for different shaped particles. """ return 4.0 * np.pi * (psd / 2.0) ** 2
[docs] @staticmethod def volume(psd: npt.NDArray[np.float64]) -> \ npt.NDArray[np.float64]: """ Return the volume of the particles, presuming they are spheres. This function can be overloaded to account for different shaped particles. """ return (4.0/3.0) * np.pi * (psd / 2.0) ** 3
@staticmethod def _f_surface_area(psd: npt.NDArray[np.float64], gamma: float = 1.0) -> npt.NDArray[np.float64]: r""" Calculate the scaling factor for surface area, which is defined as the ratio of the surface area to volume ratio of the polymer for each size class to the median size class, such that ``f`` is 1 for the median size class, larger for the smaller size classes (because there are more particles per unit volume), and smaller for larger size classes. An empirical parameter ``gamma`` linearly scales the factor by :math:`f^\gamma`. Parameters ---------- psd : np.ndarray The particle size distribution gamma: float Empirical scaling factor that scales ``f`` as :math:`s^\gamma`, where ``s`` is the surface area to volume ratio of each size class. Therefore, if ``gamma`` is 1, then ``k_diss`` scales directly with ``s``. Returns ------- np.ndarray Surface area scaling factor Notes ----- By assuming spherical particles, calculating their volumes and surface areas and simplifying the algebra, ``f`` can be defined as .. math:: f_\text{s} = \left(\frac{s}{\hat{s}}\right)^\gamma where :math:`s` is the surface area to volume ratio, and :math:`\hat{s}` is the median of :math:`s`: .. math:: s = \frac{4 \pi r_\text{max}}{\textbf{r}} Here, :math:`r_\text{max}` is the radius of the largest particle size class, and :math:`\textbf{r}` is an array of the particle size class radii. """ # Calculate ratio of surface area to the largest volume and scale # to the median (so f = 1 for the median size class) surface_area_volume_ratio = (4 * np.pi * (psd.max() / 2) ** 3) / psd f = surface_area_volume_ratio / np.median(surface_area_volume_ratio) return f ** gamma @staticmethod def _validate_inputs(config: dict, data: dict) -> Tuple[dict, dict]: """ Validate the config and data dicts passed to the model Parameters ---------- config : dict Model config dict data : dict Model input data dict Returns ------- Tuple[dict, dict] Config and data dicts validated and filled with defaults """ # Try and validate the config try: # Returns the config dict with defaults filled config = validation.validate_config(config) except SchemaError as err: raise SchemaError('Model config did not pass validation!') from err # Try and validate data try: # Returns the data dict with defaults filled data = validation.validate_data(data, config) except SchemaError as err: raise SchemaError('Input data did not pass validation!') from err # Return the config and data with filled defaults return config, data