Source code for models.problems

import copy
import os
import time
from logging import getLogger

import numpy as num
import pytensor.tensor as tt
from pymc import Deterministic, Model, Potential, draw
from pyrocko import util
from pyrocko.model import get_effective_latlon
from pytensor import config as tconfig

from beat import config as bconfig
from beat.backend import ListArrayOrdering, ListToArrayBijection
from beat.models import geodetic, laplacian, polarity, seismic
from beat.models.base import init_uniform_random
from beat.utility import list2string, transform_sources, weed_input_rvs

# disable pytensor rounding warning
tconfig.compute_test_value = "pdb"
tconfig.warn__round = False

km = 1000.0

logger = getLogger("models")


__all__ = ["GeometryOptimizer", "DistributionOptimizer", "load_model"]


class InconsistentNumberHyperparametersError(Exception):
    context = (
        "Configuration file has to be updated!"
        + " Hyperparameters have to be re-estimated. \n"
        + ' Please run "beat update <project_dir>'
        + ' --parameters=hypers,hierarchicals"'
    )

    def __init__(self, errmess=""):
        self.errmess = errmess

    def __str__(self):
        return "\n%s\n%s" % (self.errmess, self.context)


geometry_composite_catalog = {
    "polarity": polarity.PolarityComposite,
    "seismic": seismic.SeismicGeometryComposite,
    "geodetic": geodetic.GeodeticGeometryComposite,
}

bem_composite_catalog = {
    "geodetic": geodetic.GeodeticBEMComposite,
}

distributer_composite_catalog = {
    "seismic": seismic.SeismicDistributerComposite,
    "geodetic": geodetic.GeodeticDistributerComposite,
    "laplacian": laplacian.LaplacianDistributerComposite,
}


class Problem(object):
    """
    Overarching class for the optimization problems to be solved.

    Parameters
    ----------
    config : :class:`beat.BEATConfig`
        Configuration object that contains the problem definition.
    """

    _varnames = None
    _hypernames = None
    _hierarchicalnames = None

    def __init__(self, config, hypers=False):
        self.model = None

        self._like_name = "like"

        self.fixed_params = {}
        self.composites = {}
        self.hyperparams = {}

        logger.info("Analysing problem ...")
        logger.info("---------------------\n")

        # Load events
        if config.event is None:
            logger.warning("Found no event information!")
            raise AttributeError("Problem config has no event information!")
        else:
            self.event = config.event
            nsubevents = len(config.subevents)
            self.subevents = config.subevents

            if nsubevents > 0:
                logger.info("Found %i subevents." % nsubevents)

        self.config = config

        mode = self.config.problem_config.mode

        outfolder = os.path.join(self.config.project_dir, mode)

        if hypers:
            outfolder = os.path.join(outfolder, "hypers")

        self.outfolder = outfolder
        util.ensuredir(self.outfolder)

    @property
    def events(self):
        return [self.event] + self.subevents

    @property
    def nevents(self):
        return len(self.events)

    def init_sampler(self, hypers=False):
        """
        Initialise the Sampling algorithm as defined in the configuration file.
        """
        from beat import sampler

        if hypers:
            sc = self.config.hyper_sampler_config
        else:
            sc = self.config.sampler_config

        logger.info('Using "%s" backend to store samples!' % sc.backend)

        if self.model is None:
            raise Exception("Model has to be built before initialising the sampler.")

        with self.model:
            if sc.name == "Metropolis":
                logger.info(
                    "... Initiate Metropolis ... \n"
                    " proposal_distribution: %s, tune_interval=%i,"
                    " n_jobs=%i \n"
                    % (
                        sc.parameters.proposal_dist,
                        sc.parameters.tune_interval,
                        sc.parameters.n_jobs,
                    )
                )

                t1 = time.time()
                step = sampler.Metropolis(
                    n_chains=sc.parameters.n_chains,
                    likelihood_name=self._like_name,
                    tune_interval=sc.parameters.tune_interval,
                    proposal_name=sc.parameters.proposal_dist,
                    backend=sc.backend,
                )
                t2 = time.time()
                logger.info("Compilation time: %f \n" % (t2 - t1))

            elif sc.name == "SMC":
                logger.info(
                    "... Initiate Sequential Monte Carlo ... \n"
                    " n_chains=%i, tune_interval=%i, n_jobs=%i,"
                    " proposal_distribution: %s, \n"
                    % (
                        sc.parameters.n_chains,
                        sc.parameters.tune_interval,
                        sc.parameters.n_jobs,
                        sc.parameters.proposal_dist,
                    )
                )

                t1 = time.time()
                step = sampler.SMC(
                    n_chains=sc.parameters.n_chains,
                    tune_interval=sc.parameters.tune_interval,
                    coef_variation=sc.parameters.coef_variation,
                    proposal_name=sc.parameters.proposal_dist,
                    likelihood_name=self._like_name,
                    backend=sc.backend,
                )
                t2 = time.time()
                logger.info("Compilation time: %f \n" % (t2 - t1))

            elif sc.name == "PT":
                logger.info(
                    "... Initiate Metropolis for Parallel Tempering... \n"
                    " proposal_distribution: %s, tune_interval=%i,"
                    " n_chains=%i \n"
                    % (
                        sc.parameters.proposal_dist,
                        sc.parameters.tune_interval,
                        sc.parameters.n_chains,
                    )
                )
                step = sampler.Metropolis(
                    n_chains=sc.parameters.n_chains + 1,  # plus master
                    likelihood_name=self._like_name,
                    tune_interval=sc.parameters.tune_interval,
                    proposal_name=sc.parameters.proposal_dist,
                    backend=sc.backend,
                )

            else:
                raise ValueError(
                    'Sampler "%s" not supported! ' " Typo in config file?" % sc.name
                )

        return step

    def built_model(self):
        """
        Initialise :class:`pymc.Model` depending on problem composites,
        geodetic and/or seismic data are included. Composites also determine
        the problem to be solved.
        """

        logger.info("... Building model ...\n")

        pc = self.config.problem_config

        with Model() as self.model:
            self.init_random_variables()
            self.init_hyperparams()
            self.init_hierarchicals()

            total_llk = tt.zeros((1, 1), tconfig.floatX)

            for datatype, composite in self.composites.items():
                if datatype in bconfig.modes_catalog[pc.mode].keys():
                    input_rvs = weed_input_rvs(self.rvs, pc.mode, datatype=datatype)
                    fixed_rvs = weed_input_rvs(
                        self.fixed_params, pc.mode, datatype=datatype
                    )

                else:
                    input_rvs = self.rvs
                    fixed_rvs = self.fixed_params

                total_llk += composite.get_formula(
                    input_rvs, fixed_rvs, self.hyperparams, pc
                )

            # deterministic RV to write out llks to file
            like = Potential("dummy", total_llk.sum())
            llk = Deterministic(self._like_name, like)  # noqa: F841
            logger.info("Model building was successful! \n")

    def plant_lijection(self):
        """
        Add list to array bijection to model object by monkey-patching.
        """
        if self.model is not None:
            lordering = ListArrayOrdering(self.model.unobserved_RVs, intype="tensor")
            lpoint = [var.get_test_value() for var in self.model.unobserved_RVs]
            self.model.lijection = ListToArrayBijection(lordering, lpoint)
        else:
            raise AttributeError("Model needs to be built!")

    def built_hyper_model(self):
        """
        Initialise :class:`pymc.Model` depending on configuration file,
        geodetic and/or seismic data are included. Estimates initial parameter
        bounds for hyperparameters.
        """

        logger.info("... Building Hyper model ...\n")

        pc = self.config.problem_config

        if len(self.hierarchicals) == 0:
            self.init_hierarchicals()

        point = self.get_random_point(include=["hierarchicals", "priors"])

        if self.config.problem_config.mode == bconfig.geometry_mode_str:
            for param in pc.priors.values():
                point[param.name] = param.testvalue

        with Model() as self.model:
            self.init_hyperparams()

            total_llk = tt.zeros((1, 1), tconfig.floatX)

            for composite in self.composites.values():
                if hasattr(composite, "analyse_noise"):
                    composite.analyse_noise(point)
                    composite.init_weights()

                composite.update_llks(point)

                total_llk += composite.get_hyper_formula(self.hyperparams)

            like = Potential("dummy", total_llk.sum())
            llk = Deterministic(self._like_name, like)  # noqa: F841
            logger.info("Hyper model building was successful!")

    def get_random_point(self, include=["priors", "hierarchicals", "hypers"]):
        """
        Get random point in solution space.
        """
        pc = self.config.problem_config

        point = {}
        if "hierarchicals" in include:
            for name, param in self.hierarchicals.items():
                if not isinstance(param, num.ndarray):
                    point[name] = draw(param)

        if "priors" in include:
            for param in pc.priors.values():
                size = pc.get_parameter_size(param)
                point[param.name] = param.random(size)

        if "hypers" in include:
            if len(self.hyperparams) == 0:
                self.init_hyperparams()

            hps = {
                hp_name: draw(param)
                for hp_name, param in self.hyperparams.items()
                if not isinstance(param, num.ndarray)
            }

            point.update(hps)

        return point

    @property
    def varnames(self):
        """
        Sampled random variable names.

        Returns
        -------
        list of strings
        """
        if self._varnames is None:
            self._varnames = list(
                self.config.problem_config.get_random_variables()[0].keys()
            )
        return self._varnames

    @property
    def hypernames(self):
        """
        Sampled random variable names.

        Returns
        -------
        list of strings
        """
        if self._hypernames is None:
            self.init_hyperparams()
        return self._hypernames

    @property
    def hierarchicalnames(self):
        """
        Sampled random variable names.

        Returns
        -------
        list of strings
        """
        if self._hierarchicalnames is None:
            self.init_hierarchicals()
        return self._hierarchicalnames

    def init_random_variables(self):
        """
        Evaluate problem setup and initialize random variables and
        fixed variables dictionaries.
        """
        (
            rvs_kwargs,
            self.fixed_params,
        ) = self.config.problem_config.get_random_variables()

        self.rvs = {}
        for varname, kwargs in rvs_kwargs.items():
            self.rvs[varname] = init_uniform_random(kwargs)

    def init_hyperparams(self):
        """
        Evaluate problem setup and initialize hyperparameter dictionary.
        """
        pc = self.config.problem_config
        hyperparameters = copy.deepcopy(pc.hyperparameters)

        hyperparams = {}
        n_hyp = 0
        self._hypernames = []
        for datatype, composite in self.composites.items():
            hypernames = composite.get_hypernames()

            for hp_name in hypernames:
                if hp_name in hyperparameters.keys():
                    hyperpar = hyperparameters.pop(hp_name)
                    ndata = composite.get_hypersize(hp_name)
                else:
                    raise InconsistentNumberHyperparametersError(
                        "Datasets and -types require additional "
                        " hyperparameter(s): %s!" % hp_name
                    )

                if not num.array_equal(hyperpar.lower, hyperpar.upper):
                    dimension = hyperpar.dimension * ndata

                    kwargs = dict(
                        name=hyperpar.name,
                        shape=dimension,
                        lower=num.repeat(hyperpar.lower, ndata),
                        upper=num.repeat(hyperpar.upper, ndata),
                        initval=num.repeat(hyperpar.testvalue, ndata),
                        dtype=tconfig.floatX,
                        transform=None,
                    )

                    hyperparams[hp_name] = init_uniform_random(kwargs)

                    n_hyp += dimension
                    self._hypernames.append(hyperpar.name)
                    logger.info(
                        "Initialised hyperparameter %s with "
                        "size %i " % (hp_name, ndata)
                    )
                else:
                    logger.info(
                        "not solving for %s, got fixed at %s"
                        % (hyperpar.name, list2string(hyperpar.lower.flatten()))
                    )
                    hyperparams[hyperpar.name] = hyperpar.lower

        if len(hyperparameters) > 0:
            print(hyperparameters)
            raise InconsistentNumberHyperparametersError(
                "There are hyperparameters in config file, which are not"
                " covered by datasets/datatypes."
            )

        logger.info("Initialized %i hyperparameters in total!", n_hyp)

        self.hyperparams = hyperparams

    def update_llks(self, point):
        """
        Update posterior likelihoods of each composite of the problem with
        respect to one point in the solution space.

        Parameters
        ----------
        point : dict
            with numpy array-like items and variable name keys
        """
        for composite in self.composites.values():
            composite.update_llks(point)

    def apply(self, weights_dict):
        """
        Update composites in problem object with given composites.
        """
        for comp_name, weights in weights_dict.items():
            if comp_name in self.composites:
                self.composites[comp_name].apply(weights)

    def get_variance_reductions(self, point):
        """
        Get composite variance reductions (VRs) with values from given point.

        Parameters
        ----------
        point : :func:`pymc.Point`
            Dictionary with model parameters, for which the VRs are calculated
        """
        vrs = {}
        pconfig = self.config.problem_config
        for composite in self.composites.values():
            logger.info("Calculating variance reductions for %s" % composite.name)
            kwargs = {}
            if composite.name == "seismic":
                if pconfig.mode == bconfig.ffi_mode_str:
                    chop_bounds = ["b", "c"]
                elif pconfig.mode == bconfig.geometry_mode_str:
                    chop_bounds = ["a", "d"]
                else:
                    raise ValueError("Invalid problem_config mode! %s" % pconfig.mode)
                kwargs["chop_bounds"] = chop_bounds

            vr = composite.get_variance_reductions(point, **kwargs)
            vrs.update(vr)
        return vrs

    def point2sources(self, point):
        """
        Update composite sources(in place) with values from given point.

        Parameters
        ----------
        point : :func:`pymc.Point`
            Dictionary with model parameters, for which the sources are
            updated
        """
        for composite in self.composites.values():
            self.composites[composite.name].point2sources(point)

    def get_pyrocko_events(self, point=None):
        """
        Transform sources to pyrocko events.

        Parameters
        ----------
        point : :func:`pymc.Point`
            Dictionary with model parameters, for which the events are
            returned

        Returns
        -------
        events : list
            of :class:`pyrocko.model.Event`
        """

        for composite in self.composites.values():
            if hasattr(composite, "get_pyrocko_events"):
                events = composite.get_pyrocko_events(point)
            else:
                events = []

            if events:
                for event in events:
                    elat, elon = get_effective_latlon(event)
                    event.set_origin(elat, elon)

                return events

    def update_weights(self, point, n_jobs=1, plot=False):
        """
        Calculate and update model prediction uncertainty covariances of
        composites due to uncertainty in the velocity model with respect to
        one point in the solution space. Shared variables are updated in place.

        Parameters
        ----------
        point : :func:`pymc.Point`
            Dictionary with model parameters, for which the covariance matrixes
            with respect to velocity model uncertainties are calculated
        n_jobs : int
            Number of processors to use for calculation of seismic covariances
        plot : boolean
            Flag for opening the seismic waveforms in the snuffler
        """
        for composite in self.composites.values():
            if hasattr(composite, "update_weights"):
                composite.update_weights(point, n_jobs=n_jobs)

    def get_weights(self):
        """
        Assemble weights of problem composites in dict for saving.
        """
        outd = {}
        for datatype in self.config.problem_config.datatypes:
            if datatype in self.composites.keys():
                outd[datatype] = self.composites[datatype].weights

        return outd

    def get_synthetics(self, point, **kwargs):
        """
        Get synthetics for given point in solution space.

        Parameters
        ----------
        point : :func:`pymc.Point`
            Dictionary with model parameters
        kwargs especially to change output of seismic forward model
            outmode = 'traces'/ 'array' / 'data'

        Returns
        -------
        Dictionary with keys according to composites containing the synthetics
        as lists.
        """

        d = dict()

        for composite in self.composites.values():
            d[composite.name] = composite.get_synthetics(point, outmode="data")

        return d

    def init_hierarchicals(self):
        """
        Initialise hierarchical random variables of all composites.
        """
        self._hierarchicalnames = []
        for composite in self.composites.values():
            try:
                composite.init_hierarchicals(self.config.problem_config)
                self._hierarchicalnames.extend(composite._hierarchicalnames)
            except AttributeError:
                pass

    @property
    def hierarchicals(self):
        """
        Return dictionary of all hierarchical variables of the problem.
        """
        d = {}
        for composite in self.composites.values():
            if composite.hierarchicals is not None:
                d.update(composite.hierarchicals)

        return d


class SourceOptimizer(Problem):
    """
    Defines the base-class setup involving non-linear fault geometry.

    Parameters
    ----------
    config : :class:'config.BEATconfig'
        Contains all the information about the model setup and optimization
        boundaries, as well as the sampler parameters.
    """

    def __init__(self, config, hypers=False):
        super(SourceOptimizer, self).__init__(config, hypers)

        pc = config.problem_config
        n_sources_total = sum(pc.n_sources)

        if self.nevents != n_sources_total and self.nevents != 1:
            raise ValueError(
                "Number of events and sources have to be equal or only one "
                "event has to be used! Number if events %i and number of "
                "sources: %i!" % (self.nevents, n_sources_total)
            )

        # Init sources
        self.sources = []
        running_idx = 0
        for source_type, n_source in zip(pc.source_types, pc.n_sources):
            for _ in range(n_source):
                if self.nevents > 1:
                    event = self.events[running_idx]
                else:
                    event = self.event

                logger.info(
                    "Using %s for %i sources for event %s",
                    source_type,
                    n_source,
                    event.__str__(),
                )

                source = bconfig.source_catalog[source_type].from_pyrocko_event(event)
                source.stf = bconfig.stf_catalog[pc.stf_type](duration=event.duration)

                # hardcoded inversion for hypocentral time
                if source.stf is not None:
                    source.stf.anchor = -1.0

                running_idx += 1
                self.sources.append(source)


[docs] class GeometryOptimizer(SourceOptimizer): """ Defines the model setup to solve for the non-linear fault geometry. Parameters ---------- config : :class:'config.BEATconfig' Contains all the information about the model setup and optimization boundaries, as well as the sampler parameters. """ def __init__(self, config, hypers=False): logger.info("... Initialising Geometry Optimizer ... \n") super(GeometryOptimizer, self).__init__(config, hypers) pc = config.problem_config if pc.mode == "geometry": composite_catalog = geometry_composite_catalog elif pc.mode == "bem": composite_catalog = bem_composite_catalog dsources = transform_sources(self.sources, pc.datatypes, pc.decimation_factors) mappings = pc.get_variables_mapping() for datatype in pc.datatypes: self.composites[datatype] = composite_catalog[datatype]( config[datatype + "_config"], config.project_dir, dsources[datatype], mappings[datatype], self.events, hypers, ) self.config = config # updating source objects with test-value in bounds tpoint = pc.get_test_point() self.point2sources(tpoint)
[docs] class DistributionOptimizer(Problem): """ Defines the model setup to solve the linear slip-distribution and returns the model object. Parameters ---------- config : :class:'config.BEATconfig' Contains all the information about the model setup and optimization boundaries, as well as the sampler parameters. """ def __init__(self, config, hypers=False): logger.info("... Initialising Distribution Optimizer ... \n") super(DistributionOptimizer, self).__init__(config, hypers) for datatype in config.problem_config.datatypes: data_config = config[datatype + "_config"] composite = distributer_composite_catalog[datatype]( data_config, config.project_dir, self.events, hypers ) composite.set_slip_varnames(self.varnames) self.composites[datatype] = composite regularization = config.problem_config.mode_config.regularization try: composite = distributer_composite_catalog[regularization]( config.problem_config.mode_config.regularization_config, config.project_dir, self.events, hypers, ) composite.set_slip_varnames(self.varnames) self.composites[regularization] = composite except KeyError: logger.info('Using "%s" regularization ...' % regularization) self.config = config
[docs] def lsq_solution(self, point, plot=False): """ Returns non-negtive least-squares solution for given input point. Parameters ---------- point : dict in solution space Returns ------- point with least-squares solution """ from scipy.optimize import nnls if self.config.problem_config.mode_config.regularization != "laplacian": raise ValueError( "Least-squares- solution for distributed slip is only " "available with laplacian regularization!" ) lc = self.composites["laplacian"] slip_varnames_candidates = ["uparr", "utens"] slip_varnames = [] for var in slip_varnames_candidates: if var in self.varnames: slip_varnames.append(var) if len(slip_varnames) == 0.0: raise ValueError( "LSQ distributed slip solution is only available for %s," " which were fixed in the setup!" % list2string(slip_varnames_candidates) ) npatches = point[slip_varnames[0]].size dzero = num.zeros(npatches, dtype=tconfig.floatX) # set perp slip variables to zero if "uperp" in self.varnames: point["uperp"] = dzero # set slip variables that are inverted for to one for inv_var in slip_varnames: point[inv_var] = num.ones(npatches, dtype=tconfig.floatX) Gs = [] ds = [] for datatype, composite in self.composites.items(): if datatype == "geodetic": crust_ind = composite.config.gf_config.reference_model_idx keys = [ composite.get_gflibrary_key( crust_ind=crust_ind, wavename="static", component=var ) for var in slip_varnames ] Gs.extend([composite.gfs[key]._gfmatrix.T for key in keys]) # removing hierarchicals from data displacements = [] for dataset in composite.datasets: displacements.append(copy.deepcopy(dataset.displacement)) displacements = composite.apply_corrections( displacements, point=point, operation="-" ) ds.extend(displacements) elif datatype == "seismic": targets_gfs = [[] for i in range(composite.n_t)] for pidx in range(npatches): Gseis, dseis = composite.get_synthetics( point, outmode="array", patchidxs=num.array([pidx], dtype="int") ) for i, gseis in enumerate(Gseis): targets_gfs[i].append(num.atleast_2d(gseis).T) else: # concatenate all the patch gfs target wise gseis_p = list(map(num.hstack, targets_gfs)) Gs.extend(gseis_p) ds.extend(dseis) if len(Gs) == 0: raise ValueError( "No Greens Function matrix available!" " (needs geodetic datatype!)" ) G = num.vstack(Gs) D = ( num.vstack([lc.smoothing_op for sv in slip_varnames]) * point[bconfig.hyper_name_laplacian] ** 2.0 ) A = num.vstack([G, D]) d = num.hstack(ds + [dzero for var in slip_varnames]) # m, rmse, rankA, singularsA = num.linalg.lstsq(A.T, d, rcond=None) m, res = nnls(A, d) npatches = self.config.problem_config.mode_config.npatches for i, var in enumerate(slip_varnames): point[var] = m[i * npatches : (i + 1) * npatches] if plot: from beat.plotting import source_geometry datatype = self.config.problem_config.datatypes[0] gc = self.composites[datatype] fault = gc.load_fault_geometry() source_geometry( fault, list(fault.iter_subfaults()), event=gc.event, values=point[slip_varnames[0]], title="slip [m]", ) # datasets=gc.datasets return point
problem_catalog = { bconfig.geometry_mode_str: GeometryOptimizer, bconfig.ffi_mode_str: DistributionOptimizer, bconfig.bem_mode_str: GeometryOptimizer, }
[docs] def load_model(project_dir, mode, hypers=False, build=True): """ Load config from project directory and return BEAT problem including model. Parameters ---------- project_dir : string path to beat model directory mode : string problem name to be loaded hypers : boolean flag to return hyper parameter estimation model instead of main model. build : boolean flag to build models Returns ------- problem : :class:`Problem` """ config = bconfig.load_config(project_dir, mode) pc = config.problem_config if hypers and len(pc.hyperparameters) == 0: raise ValueError( "No hyperparameters specified!" " option --hypers not applicable" ) if pc.mode in problem_catalog.keys(): problem = problem_catalog[pc.mode](config, hypers) else: logger.error("Modeling problem %s not supported" % pc.mode) raise ValueError("Model not supported") if build: if hypers: problem.built_hyper_model() else: problem.built_model() return problem