import copy
import os
from collections import OrderedDict
from logging import getLogger
from time import time
import numpy as num
import pytensor.tensor as tt
from pymc import Deterministic
from pyrocko.gf import LocalEngine
from pytensor import config as tconfig
from pytensor import shared
from beat import config as bconfig
from beat import covariance as cov
from beat import heart, pytensorf, utility
from beat.ffi import get_gf_prefix, load_gf_library
from beat.models.base import (
Composite,
ConfigInconsistentError,
FaultGeometryNotFoundError,
get_hypervalue_from_point,
init_uniform_random,
)
from beat.models.distributions import multivariate_normal_chol
logger = getLogger("geodetic")
km = 1000.0
__all__ = [
"GeodeticBEMComposite",
"GeodeticGeometryComposite",
"GeodeticDistributerComposite",
]
class GeodeticComposite(Composite):
"""
Comprises data structure of the geodetic composite.
Parameters
----------
gc : :class:`config.GeodeticConfig`
configuration object containing seismic setup parameters
project_dir : str
directory of the model project, where to find the data
events : list
of :class:`pyrocko.model.Event`
contains information of reference event(s), coordinates of reference
point(s) and source time(s)
hypers : boolean
if true initialise object for hyper parameter optimization
"""
_hierarchicalnames = None
weights = None
def __init__(self, gc, project_dir, events, hypers=False):
super(GeodeticComposite, self).__init__(events)
logger.debug("Setting up geodetic structure ...\n")
self.name = "geodetic"
self._like_name = "geo_like"
geodetic_data_path = os.path.join(project_dir, bconfig.geodetic_data_name)
self.datasets = utility.load_objects(geodetic_data_path)
logger.info("Number of geodetic datasets: %i " % self.n_t)
# initialise local coordinate system and corrections
if gc.corrections_config.has_enabled_corrections:
correction_configs = gc.corrections_config.iter_corrections()
logger.info("Initialising corrections ...")
for data in self.datasets:
data.setup_corrections(
event=self.event, correction_configs=correction_configs
)
else:
for data in self.datasets:
data.update_local_coords(self.event)
# init geodetic targets
self.targets = heart.init_geodetic_targets(
datasets=self.datasets,
event=self.event,
earth_model_name=gc.gf_config.earth_model_name,
interpolation=gc.interpolation,
crust_inds=[gc.gf_config.reference_model_idx],
sample_rate=gc.gf_config.sample_rate,
)
# merge geodetic data to calculate residuals on single array
datasets, los_vectors, odws, self.Bij = heart.concatenate_datasets(
self.datasets
)
logger.info("Number of geodetic data points: %i " % self.Bij.ordering.size)
self.sdata = shared(datasets, name="geodetic_data", borrow=True)
self.slos_vectors = shared(los_vectors, name="los_vecs", borrow=True)
self.sodws = shared(odws, name="odws", borrow=True)
self.noise_analyser = cov.GeodeticNoiseAnalyser(
config=gc.noise_estimator, events=self.events
)
self.config = gc
if hypers:
self._llks = []
self._llks.extend(
shared(num.array([1.0]), name="geo_llk_%i" % t, borrow=True)
for t in range(self.n_t)
)
def init_weights(self):
self.weights = []
for i, data in enumerate(self.datasets):
if int(data.covariance.data.sum()) == data.ncoords:
logger.warning(
"Data covariance is identity matrix! Please double check!!!"
)
choli = data.covariance.chol_inverse
self.weights.append(shared(choli, name="geo_weight_%i" % i, borrow=True))
data.covariance.update_slog_pdet()
@property
def n_t(self):
return len(self.datasets)
def get_all_dataset_ids(self, hp_name):
"""
Return unique GNSS stations and radar acquisitions.
"""
return [dataset.id for dataset in self.datasets]
def analyse_noise(self, tpoint=None):
"""
Analyse geodetic noise in datasets and set
data-covariance matrixes accordingly.
"""
if self.config.noise_estimator.structure == "non-toeplitz":
results = self.assemble_results(tpoint)
else:
results = [None] * len(self.datasets)
if len(self.datasets) != len(results):
raise ValueError("Number of datasets and results need to be equal!")
for dataset, result in zip(self.datasets, results):
logger.info(
'Retrieving geodetic data-covariances with structure "%s" '
"for %s ..." % (self.config.noise_estimator.structure, dataset.id)
)
cov_d_geodetic = self.noise_analyser.get_data_covariance(
dataset, result=result
)
if dataset.covariance is None:
dataset.covariance = heart.Covariance(data=cov_d_geodetic)
else:
dataset.covariance.data = cov_d_geodetic
if int(dataset.covariance.data.sum()) == dataset.ncoords:
logger.warning(
"Data covariance is identity matrix! Please double check!!!"
)
def get_hypersize(self, hp_name=""):
"""
Return size of the hyperparameter
Parameters
----------
hp_name: str
of hyperparameter name
Returns
-------
int
"""
n_datasets = len(self.get_all_dataset_ids(hp_name))
if n_datasets == 0:
raise ConfigInconsistentError(
'Found no data for hyperparameter "%s". Please either load'
" the data or remove it from types dictionary!" % hp_name,
params="hypers",
)
elif self.config.dataset_specific_residual_noise_estimation:
return n_datasets
else:
return 1
def assemble_results(self, point, **kwargs):
"""
Assemble geodetic data for given point in solution space.
Parameters
----------
point : :func:`pymc.Point`
Dictionary with model parameters
Returns
-------
List with :class:`heart.GeodeticResult`
"""
logger.debug("Assembling geodetic data ...")
processed_synts = self.get_synthetics(point)
results = []
for i, data in enumerate(self.datasets):
res = data.displacement - processed_synts[i]
results.append(
heart.GeodeticResult(
point=point,
processed_obs=data.displacement,
processed_syn=processed_synts[i],
processed_res=res,
)
)
return results
def export(
self,
point,
results_path,
stage_number,
fix_output=False,
force=False,
update=False,
):
from pyrocko.guts import dump
from beat.plotting import map_displacement_grid
def save_covs(datasets, cov_mat="pred_v"):
"""
Save covariance matrixes of given attribute
"""
covs = {
dataset.name: getattr(dataset.covariance, cov_mat)
for dataset in datasets
}
outname = os.path.join(results_path, f"geodetic_C_{cov_mat}")
logger.info('"geodetic covariances" to: %s', outname)
num.savez(outname, **covs)
gc = self.config
results = self.assemble_results(point)
def get_filename(attr, ending="csv"):
return os.path.join(
results_path,
"{}_{}_{}.{}".format(
os.path.splitext(dataset.id)[0], attr, stage_number, ending
),
)
# export for gnss
for typ, config in gc.types.items():
if "GNSS" == typ:
from pyrocko.model import gnss
logger.info("Exporting GNSS data ...")
campaigns = config.load_data(campaign=True)
for campaign in campaigns:
model_camp = gnss.GNSSCampaign(
stations=copy.deepcopy(campaign.stations),
name=f"{campaign.name}_model",
)
dataset_to_result = {}
for dataset, result in zip(self.datasets, results):
if dataset.typ == "GNSS":
dataset_to_result[dataset] = result
for dataset, result in dataset_to_result.items():
for ista, sta in enumerate(model_camp.stations):
comp = getattr(sta, dataset.component)
comp.shift = result.processed_syn[ista]
comp.sigma = 0.0
outname = os.path.join(
results_path, "gnss_synths_%i.yaml" % stage_number
)
dump(model_camp, filename=outname)
elif "SAR" == typ:
from kite.scene import Scene, UserIOWarning
logger.info("Exporting SAR data ...")
for dataset, result in zip(self.datasets, results):
if dataset.typ == "SAR":
try:
scene_path = os.path.join(config.datadir, dataset.name)
logger.info(
f"Loading full resolution kite scene: {scene_path}"
)
scene = Scene.load(scene_path)
except UserIOWarning:
logger.warning(
"Full resolution data could not be"
" loaded! Skipping ..."
)
continue
for attr in ["processed_obs", "processed_syn", "processed_res"]:
filename = get_filename(attr, ending="csv")
displacements = getattr(result, attr)
dataset.export_to_csv(filename, displacements)
logger.info(f"Stored CSV file to: {filename}")
filename = get_filename(attr, ending="yml")
vals = map_displacement_grid(displacements, scene)
scene.displacement = vals
scene.save(filename)
logger.info(f"Stored kite scene to: {filename}")
# export stdz residuals
self.analyse_noise(point)
if update:
logger.info("Saving velocity model covariance matrixes...")
self.update_weights(point)
save_covs(self.datasets, "pred_v")
logger.info("Saving data covariance matrixes...")
save_covs(self.datasets, "data")
def init_hierarchicals(self, problem_config):
"""
Initialize hierarchical parameters.
Ramp estimation in azimuth and range direction of a radar scene and/or
Rotation of GNSS stations around an Euler pole
"""
self._hierarchicalnames = []
hierarchicals = problem_config.hierarchicals
for number, corr in enumerate(
self.config.corrections_config.iter_corrections()
):
logger.info(
f"Evaluating config for {corr.feature} corrections for datasets..."
)
if corr.enabled:
for data in self.datasets:
if data.name in corr.dataset_names:
hierarchical_names = corr.get_hierarchical_names(
name=data.name, number=number
)
else:
hierarchical_names = []
for hierarchical_name in hierarchical_names:
if not corr.enabled and hierarchical_name in hierarchicals:
raise ConfigInconsistentError(
f"{corr.feature} {data.name} disabled, but they are defined in the problem configuration (hierarchicals)!"
)
if (
corr.enabled
and hierarchical_name not in hierarchicals
and data.name in corr.dataset_names
):
raise ConfigInconsistentError(
f"{corr.feature} {data.name} corrections enabled, but they are not defined in the problem configuration! (hierarchicals)"
)
if hierarchical_name not in self.hierarchicals:
param = hierarchicals[hierarchical_name]
if not num.array_equal(param.lower, param.upper):
kwargs = dict(
name=param.name,
shape=param.dimension,
lower=param.lower,
upper=param.upper,
initval=param.testvalue,
default_transform=None,
dtype=tconfig.floatX,
)
self.hierarchicals[
hierarchical_name
] = init_uniform_random(kwargs)
self._hierarchicalnames.append(hierarchical_name)
else:
logger.info(
f"not solving for {param.name}, got fixed at {utility.list2string(param.lower.flatten())}"
)
self.hierarchicals[hierarchical_name] = param.lower
else:
logger.info(f"No {corr.feature} correction!")
logger.info("Initialized %i hierarchical parameters." % len(self.hierarchicals))
def apply_corrections(self, residuals, point=None, operation="-"):
"""
Apply all the configured correction terms e.g. SAR orbital ramps,
GNSS Euler pole rotations etc...
"""
for i, dataset in enumerate(self.datasets):
if dataset.has_correction:
for corr in dataset.corrections:
correction = corr.get_displacements(self.hierarchicals, point=point)
# correction = Print('corr')(correction)
if operation == "-":
residuals[i] -= correction # needs explicit assignment!
elif operation == "+":
residuals[i] += correction
return residuals
def update_llks(self, point):
"""
Update posterior likelihoods (in place) of the composite w.r.t.
one point in the solution space.
Parameters
----------
point : dict
with numpy array-like items and variable name keys
"""
results = self.assemble_results(point)
for i_l, result in enumerate(results):
choli = self.datasets[i_l].covariance.chol_inverse
tmp = choli.dot(result.processed_res)
_llk = num.asarray([num.dot(tmp, tmp)])
self._llks[i_l].set_value(_llk)
def get_variance_reductions(self, point, results=None, weights=None):
"""
Parameters
----------
point : dict
with parameters to point in solution space to calculate
variance reductions
Returns
-------
dict of floats,
keys are nslc_ids
"""
if results is None:
results = self.assemble_results(point)
ndatasets = len(self.datasets)
assert len(results) == ndatasets
if weights is None:
self.analyse_noise(point)
self.update_weights(point)
weights = self.weights
nweights = len(weights)
assert nweights == ndatasets
logger.debug("n weights %i , n datasets %i" % (nweights, ndatasets))
assert nweights == ndatasets
logger.debug("Calculating variance reduction for solution ...")
counter = utility.Counter()
hp_specific = self.config.dataset_specific_residual_noise_estimation
var_reds = OrderedDict()
for dataset, weight, result in zip(self.datasets, weights, results):
hp = get_hypervalue_from_point(
point, dataset, counter, hp_specific=hp_specific
)
icov = dataset.covariance.inverse(num.exp(hp * 2.0))
data = result.processed_obs
residual = result.processed_res
nom = residual.T.dot(icov).dot(residual)
denom = data.T.dot(icov).dot(data)
logger.debug("nom %f, denom %f" % (float(nom), float(denom)))
var_red = 1 - (nom / denom)
logger.debug("Variance reduction for %s is %f" % (dataset.id, var_red))
if 0:
from matplotlib import pyplot as plt
fig, ax = plt.subplots(1, 1)
im = ax.imshow(dataset.covariance.data)
plt.colorbar(im)
plt.show()
var_reds[dataset.id] = var_red
return var_reds
def get_standardized_residuals(self, point, results=None, weights=None):
"""
Parameters
----------
point : dict
with parameters to point in solution space to calculate
standardized residuals
Returns
-------
dict of arrays of standardized residuals,
keys are nslc_ids
"""
if results is None:
results = self.assemble_results(point)
if weights is None:
self.update_weights(point)
counter = utility.Counter()
hp_specific = self.config.dataset_specific_residual_noise_estimation
stdz_residuals = OrderedDict()
for dataset, result in zip(self.datasets, results):
hp = get_hypervalue_from_point(
point, dataset, counter, hp_specific=hp_specific
)
choli = num.linalg.inv(dataset.covariance.chol(num.exp(hp * 2.0)))
stdz_residuals[dataset.id] = choli.dot(result.processed_res)
return stdz_residuals
class GeodeticSourceComposite(GeodeticComposite):
"""
Comprises how to solve the non-linear geodetic forward model.
Parameters
----------
gc : :class:`config.GeodeticConfig`
configuration object containing seismic setup parameters
project_dir : str
directory of the model project, where to find the data
sources : list
of :class:`pyrocko.gf.seismosizer.Source`
mapping : list
of dict of varnames and their sizes
events : list
of :class:`pyrocko.model.Event`
contains information of reference event, coordinates of reference
point and source time
hypers : boolean
if true initialise object for hyper parameter optimization
"""
def __init__(self, gc, project_dir, sources, mapping, events, hypers=False):
super(GeodeticSourceComposite, self).__init__(
gc, project_dir, events, hypers=hypers
)
if isinstance(gc.gf_config, bconfig.GeodeticGFConfig):
self.engine = LocalEngine(store_superdirs=[gc.gf_config.store_superdir])
elif isinstance(gc.gf_config, bconfig.BEMConfig):
from beat.bem import BEMEngine
self.engine = BEMEngine(gc.gf_config)
self.sources = sources
self.mapping = mapping
@property
def n_sources_total(self):
return len(self.sources)
def point2sources(self, point):
"""
Updates the composite source(s) (in place) with the point values.
"""
tpoint = copy.deepcopy(point)
tpoint.update(self.fixed_rvs)
tpoint = utility.adjust_point_units(tpoint)
source_points = utility.split_point(
tpoint,
mapping=self.mapping,
weed_params=True,
)
for i, source in enumerate(self.sources):
utility.update_source(source, **source_points[i])
# reset source time may result in store error otherwise
source.time = 0.0
def get_formula(self, input_rvs, fixed_rvs, hyperparams, problem_config):
"""
Get geodetic likelihood formula for the model built. Has to be called
within a with model context.
Part of the pymc model.
Parameters
----------
input_rvs : dict
of :class:`pymc.distribution.Distribution`
fixed_rvs : dict
of :class:`numpy.array`
hyperparams : dict
of :class:`pymc.distribution.Distribution`
problem_config : :class:`config.ProblemConfig`
Returns
-------
posterior_llk : :class:`pytensor.tensor.Tensor`
"""
hp_specific = self.config.dataset_specific_residual_noise_estimation
tpoint = problem_config.get_test_point()
self.input_rvs = input_rvs
self.fixed_rvs = fixed_rvs
logger.info(
"Geodetic optimization on: \n " "%s" % ", ".join(self.input_rvs.keys())
)
self.input_rvs.update(fixed_rvs)
t0 = time()
disp = self.get_synths(self.input_rvs)
t1 = time()
logger.debug("Geodetic forward model on test model takes: %f" % (t1 - t0))
los_disp = (disp * self.slos_vectors).sum(axis=1)
residuals = self.Bij.srmap(
tt.cast((self.sdata - los_disp) * self.sodws, tconfig.floatX)
)
self.analyse_noise(tpoint)
self.init_weights()
if self.config.corrections_config.has_enabled_corrections:
logger.info("Applying corrections! ...")
residuals = self.apply_corrections(residuals, operation="-")
logpts = multivariate_normal_chol(
self.datasets, self.weights, hyperparams, residuals, hp_specific=hp_specific
)
llk = Deterministic(self._like_name, logpts)
return llk.sum()
def get_pyrocko_events(self, point=None):
"""
Transform sources to pyrocko events.
Returns
-------
events : list
of :class:`pyrocko.model.Event`
"""
if point is not None:
self.point2sources(point)
target = self.targets[0]
store = self.engine.get_store(target.store_id)
return [
source.pyrocko_event(store=store, target=target) for source in self.sources
]
[docs]
class GeodeticGeometryComposite(GeodeticSourceComposite):
def __init__(self, gc, project_dir, sources, mapping, events, hypers=False):
super(GeodeticGeometryComposite, self).__init__(
gc, project_dir, sources, mapping, events, hypers=hypers
)
logger.info("Initialising geometry geodetic composite ...")
if not hypers:
# synthetics generation
logger.debug("Initialising synthetics functions ... \n")
self.get_synths = pytensorf.GeoSynthesizer(
engine=self.engine,
sources=self.sources,
targets=self.targets,
mapping=mapping,
)
def __getstate__(self):
self.engine.close_cashed_stores()
return self.__dict__.copy()
[docs]
def get_synthetics(self, point):
"""
Get synthetics for given point in solution space.
Parameters
----------
point : :func:`pymc.Point`
Dictionary with model parameters
Returns
-------
list with :class:`numpy.ndarray` synthetics for each target
"""
self.point2sources(point)
displacements = heart.geo_synthetics(
engine=self.engine,
targets=self.targets,
sources=self.sources,
outmode="stacked_arrays",
)
synths = []
for disp, data in zip(displacements, self.datasets):
los_d = (disp * data.los_vector).sum(axis=1)
synths.append(los_d)
if self.config.corrections_config.has_enabled_corrections:
synths = self.apply_corrections(synths, point=point, operation="+")
return synths
[docs]
def update_weights(self, point, n_jobs=1, plot=False):
"""
Updates weighting matrixes (in place) with respect to the point in the
solution space.
Parameters
----------
point : dict
with numpy array-like items and variable name keys
"""
gc = self.config
if not self.weights:
self.init_weights()
self.point2sources(point)
# update data covariances in case model dependent non-toeplitz
if self.config.noise_estimator.structure == "non-toeplitz":
logger.info("Updating data-covariances ...")
self.analyse_noise(point)
crust_inds = range(*gc.gf_config.n_variations)
thresh = 5
if len(crust_inds) > thresh:
logger.info("Updating geodetic velocity model-covariances ...")
if self.config.noise_estimator.structure == "non-toeplitz":
logger.warning(
"Non-toeplitz estimation in combination with model "
"prediction covariances is still EXPERIMENTAL and results"
" should be interpreted with care!!"
)
for i, data in enumerate(self.datasets):
crust_targets = heart.init_geodetic_targets(
datasets=[data],
event=self.event,
earth_model_name=gc.gf_config.earth_model_name,
interpolation=gc.interpolation,
crust_inds=crust_inds,
sample_rate=gc.gf_config.sample_rate,
)
logger.debug(f"Track {data.name}")
cov_pv = cov.geodetic_cov_velocity_models(
engine=self.engine,
sources=self.sources,
targets=crust_targets,
dataset=data,
plot=plot,
event=self.event,
n_jobs=1,
)
cov_pv = utility.ensure_cov_psd(cov_pv)
data.covariance.pred_v = cov_pv
else:
logger.info(
"Not updating geodetic velocity model-covariances because "
"number of model variations is too low! < %i" % thresh
)
# update shared weights from covariance matrices
for i, data in enumerate(self.datasets):
choli = data.covariance.chol_inverse
self.weights[i].set_value(choli)
data.covariance.update_slog_pdet()
[docs]
class GeodeticBEMComposite(GeodeticSourceComposite):
def __init__(self, gc, project_dir, sources, mapping, events, hypers=False):
super(GeodeticBEMComposite, self).__init__(
gc, project_dir, sources, mapping, events, hypers=hypers
)
logger.info("Initialising BEM geodetic composite ...")
if not hypers:
# synthetics generation
logger.debug("Initialising synthetics functions ... \n")
self.get_synths = pytensorf.GeoSynthesizer(
engine=self.engine,
sources=self.sources,
targets=self.targets,
mapping=mapping,
)
[docs]
def get_synthetics(self, point):
"""
Get synthetics for given point in solution space.
Parameters
----------
point : :func:`pymc.Point`
Dictionary with model parameters
kwargs especially to change output of the forward model
Returns
-------
list with :class:`numpy.ndarray` synthetics for each target
"""
self.point2sources(point)
displacements = heart.geo_synthetics(
engine=self.engine,
targets=self.targets,
sources=self.sources,
outmode="arrays",
)
synths = []
for disp, data in zip(displacements, self.datasets):
los_d = (disp * data.los_vector).sum(axis=1)
synths.append(los_d)
if self.config.corrections_config.has_enabled_corrections:
synths = self.apply_corrections(synths, point=point, operation="+")
return synths
[docs]
def update_weights(self, point, n_jobs=1, plot=False):
"""
Updates weighting matrixes (in place) with respect to the point in the
solution space.
Parameters
----------
point : dict
with numpy array-like items and variable name keys
"""
gc = self.config
if not self.weights:
self.init_weights()
self.point2sources(point)
# update data covariances in case model dependent non-toeplitz
if self.config.noise_estimator.structure == "non-toeplitz":
logger.info("Updating data-covariances ...")
self.analyse_noise(point)
crust_inds = range(*gc.gf_config.n_variations)
thresh = 5
if len(crust_inds) > thresh:
raise NotImplementedError(
"Needs updating for this composite to vary elastic parameters."
)
logger.info("Updating geodetic velocity model-covariances ...")
if self.config.noise_estimator.structure == "non-toeplitz":
logger.warning(
"Non-toeplitz estimation in combination with model "
"prediction covariances is still EXPERIMENTAL and results"
" should be interpreted with care!!"
)
for i, data in enumerate(self.datasets):
crust_targets = heart.init_geodetic_targets(
datasets=[data],
event=self.event,
earth_model_name=gc.gf_config.earth_model_name,
interpolation=gc.interpolation,
crust_inds=crust_inds,
sample_rate=gc.gf_config.sample_rate,
)
logger.debug("Track %s" % data.name)
cov_pv = cov.geodetic_cov_velocity_models(
engine=self.engine,
sources=self.sources,
targets=crust_targets,
dataset=data,
plot=plot,
event=self.event,
n_jobs=1,
)
cov_pv = utility.ensure_cov_psd(cov_pv)
data.covariance.pred_v = cov_pv
else:
logger.info(
"Not updating geodetic velocity model-covariances because "
"number of model variations is too low! < %i" % thresh
)
# update shared weights from covariance matrices
for i, data in enumerate(self.datasets):
choli = data.covariance.chol_inverse
self.weights[i].set_value(choli)
data.covariance.update_slog_pdet()
[docs]
class GeodeticDistributerComposite(GeodeticComposite):
"""
Comprises how to solve the geodetic (static) linear forward model.
Distributed slip
"""
def __init__(self, gc, project_dir, events, hypers=False):
super(GeodeticDistributerComposite, self).__init__(
gc, project_dir, events, hypers=hypers
)
self.gfs = {}
self.gf_names = {}
self._mode = "ffi"
self.gfpath = os.path.join(project_dir, self._mode, bconfig.linear_gf_dir_name)
self.fault = None
def get_gflibrary_key(self, crust_ind, wavename, component):
return "%i_%s_%s" % (crust_ind, wavename, component)
[docs]
def load_gfs(self, crust_inds=None, make_shared=True):
"""
Load Greens Function matrixes for each variable to be inverted for.
Updates gfs and gf_names attributes.
Parameters
----------
crust_inds : list
of int to indexes of Green's Functions
make_shared : bool
if True transforms gfs to :class:`pytensor.shared` variables
"""
if crust_inds is None:
crust_inds = range(*self.config.gf_config.n_variations)
if not isinstance(crust_inds, list):
raise TypeError("crust_inds need to be a list!")
for crust_ind in crust_inds:
gfs = {}
for var in self.slip_varnames:
gflib_name = get_gf_prefix(
datatype=self.name,
component=var,
wavename="static",
crust_ind=crust_ind,
)
gfpath = os.path.join(self.gfpath, gflib_name)
gfs = load_gf_library(directory=self.gfpath, filename=gflib_name)
if make_shared:
gfs.init_optimization()
key = self.get_gflibrary_key(
crust_ind=crust_ind, wavename="static", component=var
)
self.gf_names[key] = gfpath
self.gfs[key] = gfs
[docs]
def load_fault_geometry(self):
"""
Load fault-geometry, i.e. discretized patches.
Returns
-------
:class:`heart.FaultGeometry`
"""
try:
return utility.load_objects(
os.path.join(self.gfpath, bconfig.fault_geometry_name)
)[0]
except Exception:
raise FaultGeometryNotFoundError()
[docs]
def point2sources(self, point):
"""
Returns the fault source patche(s) with the point values updated.
Parameters
----------
point : dict
with random variables from solution space
"""
tpoint = copy.deepcopy(point)
# tpoint.update(self.fixed_rvs) if vars are fixed GFS are not calculated
if self.nevents == 1:
events = [self.event] # single event
else:
events = self.events # multi event
if self.fault is None:
self.fault = self.load_fault_geometry()
return self.fault.point2sources(tpoint, events=events)
[docs]
def get_synthetics(self, point):
"""
Get synthetics for given point in solution space.
Parameters
----------
point : :func:`pymc.Point`
Dictionary with model parameters
kwargs especially to change output of the forward model
Returns
-------
list with :class:`numpy.ndarray` synthetics for each target
"""
ref_idx = self.config.gf_config.reference_model_idx
if len(self.gfs) == 0:
self.load_gfs(crust_inds=[ref_idx], make_shared=False)
for gfs in self.gfs.values():
gfs.set_stack_mode("numpy")
tpoint = copy.deepcopy(point)
hps = self.config.get_hypernames()
for hyper in hps:
if hyper in tpoint:
tpoint.pop(hyper)
mu = num.zeros((self.Bij.ordering.size))
for var in self.slip_varnames:
key = self.get_gflibrary_key(
crust_ind=ref_idx, wavename="static", component=var
)
mu += self.gfs[key].stack_all(slips=point[var])
synths = self.Bij.a2l(mu)
if self.config.corrections_config.has_enabled_corrections:
synths = self.apply_corrections(synths, point=point, operation="+")
return synths
[docs]
def update_weights(self, point, n_jobs=1, plot=False):
"""
Updates weighting matrixes (in place) with respect to the point in the
solution space.
Parameters
----------
point : dict
with numpy array-like items and variable name keys
"""
gc = self.config
if not self.weights:
self.init_weights()
# update data covariances in case model dependent non-toeplitz
if self.config.noise_estimator.structure == "non-toeplitz":
logger.info("Updating data-covariances ...")
self.analyse_noise(point)
crust_inds = range(*gc.gf_config.n_variations)
thresh = 5
if len(crust_inds) > thresh:
logger.info("Updating geodetic velocity model-covariances ...")
if self.config.noise_estimator.structure == "non-toeplitz":
logger.warning(
"Non-toeplitz estimation in combination with model "
"prediction covariances is still EXPERIMENTAL and results"
" should be interpreted with care!!"
)
crust_inds = list(range(*self.config.gf_config.n_variations))
n_variations = len(crust_inds)
if len(self.gfs) != n_variations:
logger.info("Loading geodetic linear GF matrixes ...")
self.load_gfs(crust_inds=crust_inds, make_shared=False)
crust_displacements = num.zeros((n_variations, self.Bij.ordering.size))
for i, crust_ind in enumerate(crust_inds):
mu = num.zeros((self.Bij.ordering.size))
for var in self.slip_varnames:
key = self.get_gflibrary_key(
crust_ind=crust_ind, wavename="static", component=var
)
mu += self.gfs[key].stack_all(slips=point[var])
crust_displacements[i, :] = mu
crust_synths = self.Bij.a_nd2l(crust_displacements)
if len(crust_synths) != self.n_t:
raise ValueError(
"Number of datasets %i and number of synthetics %i "
"inconsistent!" % (self.n_t, len(crust_synths))
)
for i, data in enumerate(self.datasets):
logger.debug("Track %s" % data.name)
cov_pv = num.cov(crust_synths[i], rowvar=0)
cov_pv = utility.ensure_cov_psd(cov_pv)
data.covariance.pred_v = cov_pv
else:
logger.info(
"Not updating geodetic velocity model-covariances because "
"number of model variations is too low! < %i" % thresh
)
# update shared weights from covariance matrices
for i, data in enumerate(self.datasets):
choli = data.covariance.chol_inverse
self.weights[i].set_value(choli)
data.covariance.update_slog_pdet()