Source code for sampler.smc

"""
Sequential Monte Carlo Sampler module;

Runs on any pymc model.
"""

import logging

import numpy as np
from pymc.blocking import RaveledVars
from pymc.model import modelcontext

from beat import backend, utility

from .base import choose_proposal, init_stage, iter_parallel_chains, update_last_samples
from .metropolis import Metropolis

__all__ = ["SMC", "smc_sample"]


logger = logging.getLogger("smc")

sample_factor_final_stage = 1


[docs] class SMC(Metropolis): """ Sequential Monte-Carlo sampler class. Parameters ---------- vars : list List of variables for sampler out_vars : list List of output variables for trace recording. If empty unobserved_RVs are taken. n_chains : int Number of chains per stage has to be a large number of number of n_jobs (processors to be used) on the machine. scaling : float Factor applied to the proposal distribution i.e. the step size of the Markov Chain likelihood_name : string name of the :class:`pymc.determinsitic` variable that contains the model likelihood - defaults to 'like' proposal_dist : :class:`pymc.metropolis.Proposal` Type of proposal distribution, see :mod:`pymc.step_methods.metropolis` for options tune : boolean Flag for adaptive scaling based on the acceptance rate coef_variation : scalar, float Coefficient of variation, determines the change of beta from stage to stage, i.e.indirectly the number of stages, low coef_variation --> slow beta change, results in many stages and vice verca (default: 1.) check_bound : boolean Check if current sample lies outside of variable definition speeds up computation as the forward model won't be executed default: True model : :class:`pymc.Model` Optional model for sampling step. Defaults to None (taken from context). backend : str type of backend to use for sample results storage, for alternatives see :class:`backend.backend:catalog` References ---------- .. [Ching2007] Ching, J. and Chen, Y. (2007). Transitional Markov Chain Monte Carlo Method for Bayesian Model Updating, Model Class Selection, and Model Averaging. J. Eng. Mech., 10.1061/(ASCE)0733-9399(2007)133:7(816), 816-832. `link <http://ascelibrary.org/doi/abs/10.1061/%28ASCE%290733-9399 %282007%29133:7%28816%29>`__ """ default_blocked = True def __init__( self, vars=None, out_vars=None, scale=1.0, n_chains=100, tune=True, tune_interval=100, model=None, check_bound=True, likelihood_name="like", proposal_name="MultivariateNormal", backend="csv", coef_variation=1.0, **kwargs, ): super(SMC, self).__init__( vars=vars, out_vars=out_vars, scale=scale, n_chains=n_chains, tune=tune, tune_interval=tune_interval, model=model, check_bound=check_bound, likelihood_name=likelihood_name, backend=backend, proposal_name=proposal_name, **kwargs, ) self.beta = 0 self.coef_variation = coef_variation self.likelihoods = np.zeros(n_chains) def _sampler_state_blacklist(self): """ Returns sampler attributes that are not saved. """ bl = [ "likelihoods", "check_bnd", "logp_forw_func", "bij", "lij", "lordering", "proposal_samples_array", "vars", "_BlockedStep__newargs", ] return bl
[docs] def calc_beta(self): """ Calculate next tempering beta and importance weights based on current beta and sample likelihoods. Returns ------- beta(m+1) : scalar, float tempering parameter of the next stage beta(m) : scalar, float tempering parameter of the current stage weights : :class:`numpy.ndarray` Importance weights (floats) """ low_beta = self.beta up_beta = 2.0 old_beta = self.beta while up_beta - low_beta > 1e-6: current_beta = (low_beta + up_beta) / 2.0 temp = np.exp( (current_beta - self.beta) * (self.likelihoods - self.likelihoods.max()) ) cov_temp = np.std(temp) / np.mean(temp) if cov_temp > self.coef_variation: up_beta = current_beta else: low_beta = current_beta beta = current_beta weights = temp / np.sum(temp) return beta, old_beta, weights
[docs] def calc_covariance(self): """ Calculate trace covariance matrix based on importance weights. Returns ------- cov : :class:`numpy.ndarray` weighted covariances (NumPy > 1.10. required) """ cov = np.cov( self.array_population, aweights=self.weights.ravel(), bias=False, rowvar=0 ) cov = utility.ensure_cov_psd(cov) if np.isnan(cov).any() or np.isinf(cov).any(): raise ValueError( "Sample covariances contains Inf or NaN! Please try reducing the" " upper and lower bounds of hyper parameters!" ) return cov
[docs] def select_end_points(self, mtrace): """ Read trace results (variables and model likelihood) and take end points for each chain and set as start population for the next stage. Parameters ---------- mtrace : :class:`pymc.backend.base.MultiTrace` Returns ------- population : list of :func:`pymc.Point` dictionaries array_population : :class:`numpy.ndarray` Array of trace end-points likelihoods : :class:`numpy.ndarray` Array of likelihoods of the trace end-points """ q = self.bij.map(self.test_point) array_population = np.zeros((self.n_chains, q.data.size)) n_steps = len(mtrace) # collect end points of each chain and put into array last_idx = 0 for var_name, shp, dtype in q.point_map_info: slc_population = mtrace.get_values( varname=var_name, burn=n_steps - 1, combine=True ) arr_len = np.prod(shp, dtype=int) slc = slice(last_idx, last_idx + arr_len) if len(shp) == 0: array_population[:, slc] = np.atleast_2d(slc_population).T else: array_population[:, slc] = slc_population last_idx += arr_len # get likelihoods likelihoods = mtrace.get_values( varname=self.likelihood_name, burn=n_steps - 1, combine=True ) # map end array_endpoints to dict points population = [] for i in range(self.n_chains): population.append( self.bij.rmap( RaveledVars(array_population[i, :], point_map_info=q.point_map_info) ) ) return population, array_population, likelihoods
[docs] def get_chain_previous_lpoint(self, mtrace): """ Read trace results and take end points for each chain and set as previous chain result for comparison of metropolis select. Parameters ---------- mtrace : :class:`pymc.backend.base.MultiTrace` Returns ------- chain_previous_lpoint : list all unobservedRV values, including dataset likelihoods """ array_population = np.zeros((self.n_chains, self.lordering.size)) n_steps = len(mtrace) for _, slc, shp, _, var in self.lordering.vmap: slc_population = mtrace.get_values( varname=var, burn=n_steps - 1, combine=True ) if len(shp) == 0: array_population[:, slc] = np.atleast_2d(slc_population).T else: array_population[:, slc] = slc_population chain_previous_lpoint = [] # map end array_endpoints to list lpoints and apply resampling for r_idx in self.resampling_indexes: chain_previous_lpoint.append(self.lij.a2l(array_population[r_idx, :])) return chain_previous_lpoint
[docs] def get_map_end_points(self): """ Calculate mean of the end-points and return point. Returns ------- Dictionary of trace variables """ idx = self.likelihoods.flatten().argmax() return self.bij.rmap(self.array_population[idx, :])
[docs] def resample(self): """ Resample pdf based on importance weights. based on Kitagawas deterministic resampling algorithm. Returns ------- outindex : :class:`numpy.ndarray` Array of resampled trace indexes """ parents = np.arange(self.n_chains) N_childs = np.zeros(self.n_chains, dtype=int) cum_dist = np.cumsum(self.weights) aux = np.random.rand(1) u = parents + aux u /= self.n_chains j = 0 for i in parents: while u[i] > cum_dist[j]: j += 1 N_childs[j] += 1 indx = 0 outindx = np.zeros(self.n_chains, dtype=int) for i in parents: if N_childs[i] > 0: for j in range(indx, (indx + N_childs[i])): outindx[j] = parents[i] indx += N_childs[i] return outindx
def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
[docs] def smc_sample( n_steps, step=None, start=None, homepath=None, stage=0, n_jobs=1, progressbar=False, buffer_size=5000, buffer_thinning=1, model=None, update=None, random_seed=None, rm_flag=False, ): """ Sequential Monte Carlo samlping Samples the solution space with n_chains of Metropolis chains, where each chain has n_steps iterations. Once finished, the sampled traces are evaluated: (1) Based on the likelihoods of the final samples, chains are weighted (2) the weighted covariance of the ensemble is calculated and set as new proposal distribution (3) the variation in the ensemble is calculated and the next tempering parameter (beta) calculated (4) New n_chains Metropolis chains are seeded on the traces with high weight for n_steps iterations (5) Repeat until beta > 1. Parameters ---------- n_steps : int The number of samples to draw for each Markov-chain per stage step : :class:`SMC` SMC initialisation object start : List of dictionaries with length of (n_chains) Starting points in parameter space (or partial point) Defaults to random draws from variables (defaults to empty dict) stage : int Stage where to start or continue the calculation. It is possible to continue after completed stages (stage should be the number of the completed stage + 1). If None the start will be at stage = 0. n_jobs : int The number of cores to be used in parallel. Be aware that pytensor has internal parallelisation. Sometimes this is more efficient especially for simple models. step.n_chains / n_jobs has to be an integer number! homepath : string Result_folder for storing stages, will be created if not existing. progressbar : bool Flag for displaying a progress bar buffer_size : int this is the number of samples after which the buffer is written to disk or if the chain end is reached buffer_thinning : int every nth sample of the buffer is written to disk default: 1 (no thinning) model : :class:`pymc.Model` (optional if in `with` context) has to contain deterministic variable name defined under step.likelihood_name' that contains the model likelihood update : :py:class:`models.Problem` Problem object that contains all the observed data and (if applicable) covariances to be updated each transition step. rm_flag : bool If True existing stage result folders are being deleted prior to sampling. References ---------- .. [Minson2013] Minson, S. E. and Simons, M. and Beck, J. L., (2013), Bayesian inversion for finite fault earthquake source models I- Theory and algorithm. Geophysical Journal International, 2013, 194(3), pp.1701-1726, `link <https://gji.oxfordjournals.org/content/194/3/1701.full>`__ """ model = modelcontext(model) step.n_steps = int(n_steps) if n_steps < 1: raise TypeError("Argument `n_steps` should be above 0.", exc_info=1) if step is None: raise TypeError("Argument `step` has to be a SMC step object.") if homepath is None: raise TypeError("Argument `homepath` should be path to result_directory.") if n_jobs > 1: if not (step.n_chains / float(n_jobs)).is_integer(): raise ValueError("n_chains / n_jobs has to be a whole number!") if start is not None: if len(start) != step.n_chains: raise TypeError( "Argument `start` should have dicts equal the " "number of chains (step.N-chains)" ) else: step.population = start if not any(step.likelihood_name in var.name for var in model.deterministics): raise TypeError( "Model (deterministic) variables need to contain " "a variable %s " "as defined in `step`." % step.likelihood_name ) stage_handler = backend.SampleStage(homepath, backend=step.backend) chains, step, update = init_stage( stage_handler=stage_handler, step=step, stage=stage, progressbar=progressbar, buffer_thinning=buffer_thinning, update=update, model=model, rm_flag=rm_flag, ) with model: while step.beta < 1.0: if step.stage == 0: # Initial stage logger.info("Sample initial stage: ...") draws = 1 else: draws = n_steps logger.info("Beta: %f Stage: %i" % (step.beta, step.stage)) # Metropolis sampling intermediate stages chains = stage_handler.clean_directory(step.stage, chains, rm_flag) sample_args = { "draws": draws, "step": step, "stage_path": stage_handler.stage_path(step.stage), "progressbar": progressbar, "model": model, "n_jobs": n_jobs, "chains": chains, "buffer_size": buffer_size, "buffer_thinning": buffer_thinning, } mtrace = iter_parallel_chains(**sample_args) ( step.population, step.array_population, step.likelihoods, ) = step.select_end_points(mtrace) if update is not None: logger.info("Updating Covariances ...") map_pt = step.get_map_end_points() update.update_weights(map_pt, n_jobs=n_jobs) mtrace = update_last_samples( homepath, step, progressbar, model, n_jobs, rm_flag ) ( step.population, step.array_population, step.likelihoods, ) = step.select_end_points(mtrace) step.beta, step.old_beta, step.weights = step.calc_beta() if step.beta > 1.0: logger.info("Beta > 1.: %f" % step.beta) step.beta = 1.0 save_sampler_state(step, update, stage_handler) chains = stage_handler.clean_directory(-1, chains, rm_flag) else: step.covariance = step.calc_covariance() step.proposal_dist = choose_proposal( step.proposal_name, scale=step.covariance ) step.resampling_indexes = step.resample() step.chain_previous_lpoint = step.get_chain_previous_lpoint(mtrace) save_sampler_state(step, update, stage_handler) step.stage += 1 del mtrace # Metropolis sampling final stage draws = n_steps * sample_factor_final_stage logger.info("Sample final stage with n_steps %i " % draws) step.stage = -1 temp = np.exp((1 - step.old_beta) * (step.likelihoods - step.likelihoods.max())) step.weights = temp / np.sum(temp) step.covariance = step.calc_covariance() step.proposal_dist = choose_proposal(step.proposal_name, scale=step.covariance) step.resampling_indexes = step.resample() step.chain_previous_lpoint = step.get_chain_previous_lpoint(mtrace) sample_args["draws"] = draws sample_args["step"] = step sample_args["stage_path"] = stage_handler.stage_path(step.stage) sample_args["chains"] = chains iter_parallel_chains(**sample_args) save_sampler_state(step, update, stage_handler) logger.info("Finished sampling!")
def save_sampler_state(step, update, stage_handler): logger.info("Saving sampler state ...") if update is not None: weights = update.get_weights() else: weights = None outparam_list = [step.get_sampler_state(), weights] stage_handler.dump_smc_params(step.stage, outparam_list) def tune(acc_rate): """ Tune adaptively based on the acceptance rate. Parameters ---------- acc_rate: scalar, float Acceptance rate of the Metropolis sampling Returns ------- scaling: scalar float """ # a and b after Muto & Beck 2008 . a = 1.0 / 9 b = 8.0 / 9 return np.power((a + (b * acc_rate)), 2) return np.power((a + (b * acc_rate)), 2)