import copy
import os
from collections import OrderedDict
from logging import getLogger
from time import time
import numpy as num
import pytensor.tensor as tt
from pymc import Deterministic
from pyrocko.gf import LocalEngine
from pyrocko.trace import Trace
from pytensor import config as tconfig
from pytensor import shared
from beat import config as bconfig
from beat import covariance as cov
from beat import heart, pytensorf, utility
from beat.ffi import get_gf_prefix, load_gf_library
from beat.models.base import (
Composite,
ConfigInconsistentError,
FaultGeometryNotFoundError,
get_hypervalue_from_point,
init_uniform_random,
)
from beat.models.distributions import multivariate_normal_chol
logger = getLogger("seismic")
__all__ = ["SeismicGeometryComposite", "SeismicDistributerComposite"]
class SeismicComposite(Composite):
"""
Comprises how to solve the non-linear seismic forward model.
Parameters
----------
sc : :class:`config.SeismicConfig`
configuration object containing seismic setup parameters
events: list
of :class:`pyrocko.model.Event`
project_dir : str
directory of the model project, where to find the data
hypers : boolean
if true initialise object for hyper parameter optimization
"""
_datasets = None
_weights = None
_targets = None
_hierarchicalnames = None
def __init__(self, sc, events, project_dir, hypers=False):
super(SeismicComposite, self).__init__(events)
logger.debug("Setting up seismic structure ...\n")
self.name = "seismic"
self._like_name = "seis_like"
self.correction_name = "time_shift"
self.engine = LocalEngine(store_superdirs=[sc.gf_config.store_superdir])
if sc.responses_path is not None:
responses_path = os.path.join(sc.responses_path, bconfig.response_file_name)
else:
responses_path = sc.responses_path
# load data
self.datahandlers = []
for i in range(self.nevents):
seismic_data_path = os.path.join(
project_dir, bconfig.multi_event_seismic_data_name(i)
)
if os.path.exists(seismic_data_path):
logger.info(
"Loading seismic data for event %i"
" from: %s " % (i, seismic_data_path)
)
self.datahandlers.append(
heart.init_datahandler(
seismic_config=sc,
seismic_data_path=seismic_data_path,
responses_path=responses_path,
)
)
else:
logger.info("Did not find seismic data for event %i." % i)
self.noise_analyser = cov.SeismicNoiseAnalyser(
structure=sc.noise_estimator.structure,
pre_arrival_time=sc.noise_estimator.pre_arrival_time,
engine=self.engine,
events=self.events,
chop_bounds=["b", "c"],
)
self.wavemaps = []
for i, wc in enumerate(sc.waveforms):
logger.info('Initialising seismic wavemap for "%s" ...' % wc.name)
if wc.include:
try:
wmap = heart.init_wavemap(
waveformfit_config=wc,
datahandler=self.datahandlers[wc.event_idx],
event=self.events[wc.event_idx],
mapnumber=i,
)
except IndexError:
raise IndexError(
"Did not find seismic data for event %i ! "
"Data for sub-events follows naming: "
"seismic_data_subevent_1.pkl, seismic_data_subevent_2.pkl, etc."
)
self.wavemaps.append(wmap)
else:
logger.info(
'The waveform defined in "%s %i" config is not '
"included in the optimization!" % (wc.name, i)
)
if hypers:
self._llks = []
for t in range(self.n_t):
self._llks.append(
shared(num.array([1.0]), name="seis_llk_%i" % t, borrow=True)
)
def _hyper2wavemap(self, hypername):
dummy = "_".join(hypername.split("_")[1:-1])
for wmap in self.wavemaps:
if wmap._mapid == dummy:
return wmap
raise ValueError("No waveform mapping found for hyperparameter! %s" % hypername)
def get_hypersize(self, hp_name):
"""
Return size of the hyperparameter
Parameters
----------
hp_name: str
of hyperparameter name
Returns
-------
int
"""
if self.config.dataset_specific_residual_noise_estimation:
wmap = self._hyper2wavemap(hp_name)
return wmap.hypersize
else:
return 1
def __getstate__(self):
self.engine.close_cashed_stores()
return self.__dict__.copy()
def analyse_noise(self, tpoint=None, chop_bounds=["b", "c"]):
"""
Analyse seismic noise in datatraces and set
data-covariance matrixes accordingly.
"""
if self.config.noise_estimator.structure == "non-toeplitz":
results = self.assemble_results(
tpoint, order="wmap", chop_bounds=chop_bounds
)
else:
results = [None] * len(self.wavemaps)
for wmap, wmap_results in zip(self.wavemaps, results):
logger.info(
'Retrieving seismic data-covariances with structure "%s" '
"for %s ..." % (self.config.noise_estimator.structure, wmap._mapid)
)
cov_ds_seismic = self.noise_analyser.get_data_covariances(
wmap=wmap,
results=wmap_results,
sample_rate=self.config.gf_config.sample_rate,
chop_bounds=chop_bounds,
)
for j, trc in enumerate(wmap.datasets):
if trc.covariance is None:
trc.covariance = heart.Covariance(data=cov_ds_seismic[j])
else:
trc.covariance.data = cov_ds_seismic[j]
if int(trc.covariance.data.sum()) == trc.data_len():
logger.warning(
"Data covariance is identity matrix!" " Please double check!!!"
)
def init_hierarchicals(self, problem_config):
"""
Initialise random variables for temporal station corrections.
"""
hierarchicals = problem_config.hierarchicals
self._hierarchicalnames = []
nwmaps = len(self.wavemaps)
nspecwmaps = num.sum(
[1 for wmap in self.wavemaps if wmap.config.domain == "spectrum"]
)
if (
not self.config.station_corrections
and self.correction_name in hierarchicals
):
raise ConfigInconsistentError(
"Station corrections disabled, but they are defined"
" in the problem configuration!"
)
if (
self.config.station_corrections
and self.correction_name not in hierarchicals
):
raise ConfigInconsistentError(
"Station corrections enabled, but they are not defined"
" in the problem configuration!"
)
if (
self.config.station_corrections
and self.correction_name in hierarchicals
and nwmaps == nspecwmaps
):
raise ConfigInconsistentError(
"Station corrections enabled, and they are defined"
" in the problem configuration, but they are not required"
" it's only spectra!"
)
if self.correction_name in hierarchicals:
logger.info("Estimating time shift for each station and waveform map...")
for wmap in self.wavemaps:
hierarchical_name = wmap.time_shifts_id
nhierarchs = len(wmap.get_station_names())
if wmap.config.domain == "spectrum":
logger.info(
'%s got fixed at "0.0" for spectra' % (hierarchical_name)
)
self.hierarchicals[hierarchical_name] = num.zeros(
(nhierarchs), dtype="int16"
)
else:
logger.info(
"For %s with %i shifts" % (hierarchical_name, nhierarchs)
)
if hierarchical_name in hierarchicals:
logger.info(
"Using wavemap specific imported:"
" %s " % hierarchical_name
)
param = hierarchicals[hierarchical_name]
else:
logger.info("Using global %s" % self.correction_name)
param = copy.deepcopy(
problem_config.hierarchicals[self.correction_name]
)
param.lower = num.repeat(param.lower, nhierarchs)
param.upper = num.repeat(param.upper, nhierarchs)
param.testvalue = num.repeat(param.testvalue, nhierarchs)
if hierarchical_name not in self.hierarchicals:
if not num.array_equal(param.lower, param.upper):
kwargs = dict(
name=hierarchical_name,
shape=param.dimension,
lower=param.lower,
upper=param.upper,
initval=param.testvalue,
default_transform=None,
dtype=tconfig.floatX,
)
self.hierarchicals[hierarchical_name] = init_uniform_random(
kwargs
)
self._hierarchicalnames.append(hierarchical_name)
else:
logger.info(
"not solving for %s, got fixed at %s"
% (
param.name,
utility.list2string(param.lower.flatten()),
)
)
self.hierarchicals[hierarchical_name] = param.lower
def export(
self,
point,
results_path,
stage_number,
fix_output=False,
force=False,
update=False,
chop_bounds=["b", "c"],
):
"""
Save results for given point to result path.
"""
def save_covs(wmap, cov_mat="pred_v"):
"""
Save covariance matrixes of given attribute
"""
covs = {
dataset.nslcd_id_str: getattr(dataset.covariance, cov_mat)
for dataset in wmap.datasets
}
outname = os.path.join(
results_path, "%s_C_%s_%s" % ("seismic", cov_mat, wmap._mapid)
)
logger.info('"%s" to: %s' % (wmap._mapid, outname))
num.savez(outname, **covs)
from pyrocko import io
# synthetics and data
results = self.assemble_results(point, chop_bounds=chop_bounds)
for traces, attribute in heart.results_for_export(
results=results, datatype="seismic"
):
filename = "%s_%i.mseed" % (attribute, stage_number)
outpath = os.path.join(results_path, filename)
try:
io.save(traces, outpath, overwrite=force)
except io.mseed.CodeTooLong:
if fix_output:
for tr in traces:
tr.set_station(tr.station[-5::])
tr.set_location(str(self.config.gf_config.reference_model_idx))
io.save(traces, outpath, overwrite=force)
else:
raise ValueError(
"Some station codes are too long! "
"(the --fix_output option will truncate to "
"last 5 characters!)"
)
# export stdz residuals
self.analyse_noise(point, chop_bounds=chop_bounds)
if update:
logger.info("Saving velocity model covariance matrixes...")
self.update_weights(point, chop_bounds=chop_bounds)
for wmap in self.wavemaps:
save_covs(wmap, "pred_v")
logger.info("Saving data covariance matrixes...")
for wmap in self.wavemaps:
save_covs(wmap, "data")
def init_weights(self):
"""
Initialise shared weights in wavemaps.
"""
logger.info("Initialising weights ...")
for wmap in self.wavemaps:
weights = []
for j, trc in enumerate(wmap.datasets):
icov = trc.covariance.chol_inverse
weights.append(
shared(
icov, name="seis_%s_weight_%i" % (wmap._mapid, j), borrow=True
)
)
wmap.add_weights(weights=weights)
def get_all_station_names(self):
"""
Returns list of station names in the order of wavemaps.
"""
us = []
for wmap in self.wavemaps:
us.extend(wmap.get_station_names())
return us
def get_unique_time_shifts_ids(self):
"""
Return unique time_shifts ids from wavemaps, which are keys to
hierarchical RVs of station corrections
"""
ts = []
for wmap in self.wavemaps:
ts.append(wmap.time_shifts_id)
return utility.unique_list(ts)
def get_unique_station_names(self):
"""
Return unique station names from all wavemaps
"""
return utility.unique_list(self.get_all_station_names())
@property
def n_t(self):
return sum(wmap.n_t for wmap in self.wavemaps)
@property
def datasets(self):
if self._datasets is None:
ds = []
for wmap in self.wavemaps:
ds.extend(wmap.datasets)
self._datasets = ds
return self._datasets
@property
def weights(self):
if self._weights is None or len(self._weights) == 0:
ws = []
for wmap in self.wavemaps:
if wmap.weights:
ws.extend(wmap.weights)
self._weights = ws
return self._weights
@property
def targets(self):
if self._targets is None:
ts = []
for wmap in self.wavemaps:
ts.extend(wmap.targets)
self._targets = ts
return self._targets
def assemble_results(
self,
point,
chop_bounds=["a", "d"],
order="list",
outmode="stacked_traces",
force=True,
):
"""
Assemble seismic traces for given point in solution space.
Parameters
----------
point : :func:`pymc.Point`
Dictionary with model parameters
force : bool
force preparation of data with input params otherwise cached is
used
Returns
-------
List with :class:`heart.SeismicResult`
"""
if point is None:
raise ValueError("A point has to be provided!")
logger.debug("Assembling seismic waveforms ...")
syn_proc_traces, obs_proc_traces = self.get_synthetics(
point, outmode=outmode, chop_bounds=chop_bounds, order="wmap", force=force
)
results = []
for i, wmap in enumerate(self.wavemaps):
wc = wmap.config
at = wc.arrival_taper
wmap_results = []
for j, obs_tr in enumerate(obs_proc_traces[i]):
taper = at.get_pyrocko_taper(float(obs_tr.tmin - at.a))
if outmode != "tapered_data":
source_contributions = [syn_proc_traces[i][j]]
else:
source_contributions = syn_proc_traces[i][j]
wmap_results.append(
heart.SeismicResult(
point=point,
processed_obs=obs_tr,
source_contributions=source_contributions,
taper=taper,
filterer=wmap.config.filterer,
domain=wc.domain,
)
)
if order == "list":
results.extend(wmap_results)
elif order == "wmap":
results.append(wmap_results)
else:
raise ValueError('Order "%s" is not supported' % order)
return results
def update_llks(self, point):
"""
Update posterior likelihoods of the composite with respect to one point
in the solution space.
Parameters
----------
point : dict
with numpy array-like items and variable name keys
"""
results = self.assemble_results(point, chop_bounds=["b", "c"])
for k, result in enumerate(results):
choli = self.datasets[k].covariance.chol_inverse
tmp = choli.dot(result.processed_res.ydata)
_llk = num.asarray([num.dot(tmp, tmp)])
self._llks[k].set_value(_llk)
def get_standardized_residuals(
self, point, chop_bounds=["b", "c"], results=None, weights=None
):
"""
Parameters
----------
point : dict
with parameters to point in solution space to calculate
standardized residuals
Returns
-------
dict of arrays of standardized residuals,
keys are nslc_ids
"""
if results is None:
results = self.assemble_results(
point, order="list", chop_bounds=chop_bounds
)
if weights is None:
self.update_weights(point, chop_bounds=chop_bounds)
counter = utility.Counter()
hp_specific = self.config.dataset_specific_residual_noise_estimation
stdz_residuals = OrderedDict()
for dataset, result, target in zip(self.datasets, results, self.targets):
hp = get_hypervalue_from_point(
point, dataset, counter, hp_specific=hp_specific
)
ydata = result.processed_res.get_ydata()
choli = num.linalg.inv(dataset.covariance.chol(num.exp(hp * 2.0)))
stdz_residuals[target.nslcd_id_str] = choli.dot(ydata)
return stdz_residuals
def get_variance_reductions(
self, point, results=None, weights=None, chop_bounds=["a", "d"]
):
"""
Parameters
----------
point : dict
with parameters to point in solution space to calculate
variance reductions
Returns
-------
dict of floats,
keys are nslc_ids
"""
if results is None:
results = self.assemble_results(
point, order="list", chop_bounds=chop_bounds
)
ndatasets = len(self.datasets)
assert len(results) == ndatasets
if weights is None:
self.analyse_noise(point, chop_bounds=chop_bounds)
self.update_weights(point, chop_bounds=chop_bounds)
weights = self.weights
nweights = len(weights)
assert nweights == ndatasets
logger.debug("n weights %i , n datasets %i" % (nweights, ndatasets))
logger.debug("Calculating variance reduction for solution ...")
counter = utility.Counter()
hp_specific = self.config.dataset_specific_residual_noise_estimation
var_reds = OrderedDict()
for result, tr in zip(results, self.datasets):
nslcd_id_str = result.processed_obs.nslcd_id_str
hp = get_hypervalue_from_point(point, tr, counter, hp_specific=hp_specific)
icov = tr.covariance.inverse(num.exp(hp * 2.0))
data = result.processed_obs.get_ydata()
residual = result.processed_res.get_ydata()
nom = residual.T.dot(icov).dot(residual)
denom = data.T.dot(icov).dot(data)
logger.debug("nom %f, denom %f" % (float(nom), float(denom)))
var_reds[nslcd_id_str] = float(1 - (nom / denom))
logger.debug(
"Variance reduction for %s is %f"
% (nslcd_id_str, var_reds[nslcd_id_str])
)
if 0:
from matplotlib import pyplot as plt
fig, ax = plt.subplots(1, 1)
im = ax.imshow(tr.covariance.data)
plt.colorbar(im)
plt.show()
return var_reds
[docs]
class SeismicGeometryComposite(SeismicComposite):
"""
Comprises how to solve the non-linear seismic forward model.
Parameters
----------
sc : :class:`config.SeismicConfig`
configuration object containing seismic setup parameters
project_dir : str
directory of the model project, where to find the data
sources : list
of :class:`pyrocko.gf.seismosizer.Source`
mapping : list
of dict of varnames and their sizes
events : list
of :class:`pyrocko.model.Event`
contains information of reference event(s), coordinates of reference
point(s) and source time(s)
hypers : boolean
if true initialise object for hyper parameter optimization
"""
def __init__(self, sc, project_dir, sources, mapping, events, hypers=False):
super(SeismicGeometryComposite, self).__init__(
sc, events, project_dir, hypers=hypers
)
self._mode = "geometry"
self.synthesizers = {}
self.choppers = {}
self.sources = sources
self.mapping = mapping
self.correction_name = "time_shift"
self.config = sc
@property
def n_sources_total(self):
return len(self.sources)
[docs]
def point2sources(self, point):
"""
Updates the composite source(s) (in place) with the point values.
Parameters
----------
point : dict
with random variables from solution space
"""
tpoint = copy.deepcopy(point)
tpoint.update(self.fixed_rvs)
tpoint = utility.adjust_point_units(tpoint)
# update source times
if "time" in tpoint:
if self.nevents == 1:
tpoint["time"] += self.event.time # single event
else:
# careful! if setup with multi-source geodetic and geodetic + seismic
# the joint source needs to be first, TODO clean-up
times = tpoint["time"]
for i in range(times.size): # multi event
times[i] += self.events[i].time
tpoint["time"] = times
source_points = utility.split_point(
tpoint,
mapping=self.mapping,
weed_params=True,
)
for i, source in enumerate(self.sources):
utility.update_source(source, **source_points[i])
[docs]
def get_pyrocko_events(self, point=None):
"""
Transform sources to pyrocko events.
Returns
-------
events : list
of :class:`pyrocko.model.Event`
"""
if point is not None:
self.point2sources(point)
events = []
target = self.targets[0]
store = self.engine.get_store(target.store_id)
for source in self.sources:
events.append(source.pyrocko_event(store=store, target=target))
return events
[docs]
def get_synthetics(self, point, **kwargs):
"""
Get synthetics for given point in solution space.
Parameters
----------
point : :func:`pymc.Point`
Dictionary with model parameters
kwargs especially to change output of seismic forward model
outmode = 'traces'/ 'array' / 'data'
Returns
-------
default: array of synthetics for all targets
"""
outmode = kwargs.pop("outmode", "stacked_traces")
chop_bounds = kwargs.pop("chop_bounds", ["a", "d"])
order = kwargs.pop("order", "list")
nprocs = kwargs.pop("nprocs", 4)
force = kwargs.pop("force", False)
self.point2sources(point)
sc = self.config
synths = []
obs = []
for i, wmap in enumerate(self.wavemaps):
wc = wmap.config
if not wmap.is_prepared or force:
wmap.prepare_data(
source=self.events[wc.event_idx],
engine=self.engine,
outmode="stacked_traces", # no source individual contribs
chop_bounds=chop_bounds,
)
arrival_times = copy.deepcopy(wmap._arrival_times)
if self.config.station_corrections and wc.domain == "time":
try:
arrival_times += point[wmap.time_shifts_id][
wmap.station_correction_idxs
]
except KeyError: # got reference point from config
if self.correction_name in point:
arrival_times += float(point[self.correction_name]) * num.ones(
wmap.n_t
)
else: # fixed individual station corrections
arrival_times += self.hierarchicals[wmap.time_shifts_id][
wmap.station_correction_idxs
]
if self.nevents == 1:
logger.debug("Using all sources for each wavemap!")
sources = self.sources
else:
logger.debug(
"Using individual sources based on event index " "for each wavemap!"
)
sources = [self.sources[wc.event_idx]]
synthetics, _ = heart.seis_synthetics(
engine=self.engine,
sources=sources,
targets=wmap.targets,
arrival_taper=wc.arrival_taper,
wavename=wmap.name,
filterer=wc.filterer,
pre_stack_cut=sc.pre_stack_cut,
arrival_times=arrival_times,
outmode=outmode,
chop_bounds=chop_bounds,
nprocs=nprocs,
# plot=True,
**kwargs,
)
if self.config.station_corrections and wc.domain == "time":
# set tmin to data tmin
for tr, dtr in zip(synthetics, wmap._prepared_data):
if isinstance(tr, list):
for t in tr:
t.tmin = dtr.tmin
t.tmax = dtr.tmax
else:
tr.tmin = dtr.tmin
tr.tmax = dtr.tmax
if wc.domain == "spectrum":
valid_spectrum_indices = wmap.get_valid_spectrum_indices(
chop_bounds=chop_bounds, pad_to_pow2=True
)
synthetics = heart.fft_transforms(
synthetics,
valid_spectrum_indices=valid_spectrum_indices,
outmode=outmode,
pad_to_pow2=True,
)
if order == "list":
synths.extend(synthetics)
obs.extend(wmap._prepared_data)
elif order == "wmap":
synths.append(synthetics)
obs.append(wmap._prepared_data)
else:
raise ValueError('Order "%s" is not supported' % order)
return synths, obs
[docs]
def update_weights(self, point, n_jobs=1, plot=False, chop_bounds=["b", "c"]):
"""
Updates weighting matrixes (in place) with respect to the point in the
solution space.
Parameters
----------
point : dict
with numpy array-like items and variable name keys
"""
if not self.weights:
self.init_weights()
sc = self.config
self.point2sources(point)
# update data covariances in case model dependent non-toeplitz
if self.config.noise_estimator.structure == "non-toeplitz":
logger.info("Updating data-covariances ...")
self.analyse_noise(point, chop_bounds=chop_bounds)
crust_inds = range(*sc.gf_config.n_variations)
thresh = 5
if len(crust_inds) > thresh:
logger.info("Updating seismic velocity model-covariances ...")
if self.config.noise_estimator.structure == "non-toeplitz":
logger.warning(
"Non-toeplitz estimation in combination with model "
"prediction covariances is still EXPERIMENTAL and results"
" should be interpreted with care!!"
)
for wmap in self.wavemaps:
wc = wmap.config
arrival_times = wmap._arrival_times
if self.config.station_corrections:
arrival_times += point[wmap.time_shifts_id][
wmap.station_correction_idxs
]
for channel in wmap.channels:
tidxs = wmap.get_target_idxs([channel])
for station, tidx in zip(wmap.stations, tidxs):
logger.debug(
"Channel %s of Station %s " % (channel, station.station)
)
crust_targets = heart.init_seismic_targets(
stations=[station],
earth_model_name=sc.gf_config.earth_model_name,
channels=channel,
sample_rate=sc.gf_config.sample_rate,
crust_inds=crust_inds,
reference_location=sc.gf_config.reference_location,
)
t0 = time()
cov_pv = cov.seismic_cov_velocity_models(
engine=self.engine,
sources=self.sources,
targets=crust_targets,
wavename=wmap.name,
arrival_taper=wc.arrival_taper,
arrival_time=arrival_times[tidx],
filterer=wc.filterer,
chop_bounds=chop_bounds,
plot=plot,
n_jobs=n_jobs,
)
t1 = time()
logger.debug(
"%s: Calculate weight time %f"
% (station.station, (t1 - t0))
)
cov_pv = utility.ensure_cov_psd(cov_pv)
self.engine.close_cashed_stores()
dataset = wmap.datasets[tidx]
dataset.covariance.pred_v = cov_pv
else:
logger.info(
"Not updating seismic velocity model-covariances because "
"number of model variations is too low! < %i" % thresh
)
for wmap in self.wavemaps:
logger.info("Updating weights of wavemap %s" % wmap._mapid)
for i, dataset in enumerate(wmap.datasets):
choli = dataset.covariance.chol_inverse
# update shared variables
dataset.covariance.update_slog_pdet()
wmap.weights[i].set_value(choli)
[docs]
class SeismicDistributerComposite(SeismicComposite):
"""
Comprises how to solve the seismic (kinematic) linear forward model.
Distributed slip
"""
def __init__(self, sc, project_dir, events, hypers=False):
super(SeismicDistributerComposite, self).__init__(
sc, events, project_dir, hypers=hypers
)
self.gfs = {}
self.gf_names = {}
self.choppers = {}
self.sweep_implementation = "c"
self._mode = "ffi"
self.gfpath = os.path.join(project_dir, self._mode, bconfig.linear_gf_dir_name)
self.config = sc
dgc = sc.gf_config.discretization_config
for pw, pl in zip(dgc.patch_widths, dgc.patch_lengths):
if pw != pl:
raise ValueError(
"So far only square patches supported in kinematic"
" model! - fast_sweeping issues"
)
if len(sc.gf_config.reference_sources) > 1:
logger.warning(
"So far only rupture propagation on each subfault individually"
)
self.fault = self.load_fault_geometry()
logger.info(
"Fault(s) discretized to %s [km]"
" patches." % utility.list2string(dgc.patch_lengths)
)
if not hypers:
self.sweepers = []
for idx in range(self.fault.nsubfaults):
n_p_dip, n_p_strike = self.fault.ordering.get_subfault_discretization(
idx
)
self.sweepers.append(
pytensorf.Sweeper(
dgc.patch_lengths[idx],
n_p_dip,
n_p_strike,
self.sweep_implementation,
)
)
[docs]
def load_fault_geometry(self):
"""
Load fault-geometry, i.e. discretized patches.
Returns
-------
:class:`heart.FaultGeometry`
"""
try:
return utility.load_objects(
os.path.join(self.gfpath, bconfig.fault_geometry_name)
)[0]
except Exception:
raise FaultGeometryNotFoundError()
[docs]
def point2sources(self, point):
"""
Returns the fault source patche(s) with the point values updated.
Parameters
----------
point : dict
with random variables from solution space
"""
tpoint = copy.deepcopy(point)
tpoint.update(self.fixed_rvs)
if self.nevents == 1:
events = [self.event] # single event
else:
events = self.events # multi event
return self.fault.point2sources(tpoint, events=events)
def get_gflibrary_key(self, crust_ind, wavename, component):
return "%i_%s_%s" % (crust_ind, wavename, component)
[docs]
def load_gfs(self, crust_inds=None, make_shared=True):
"""
Load Greens Function matrixes for each variable to be inverted for.
Updates gfs and gf_names attributes.
Parameters
----------
crust_inds : list
of int to indexes of Green's Functions
make_shared : bool
if True transforms gfs to :class:`pytensor.shared` variables
"""
if not isinstance(crust_inds, list):
raise TypeError("crust_inds need to be a list!")
if crust_inds is None:
crust_inds = range(*self.config.gf_config.n_variations)
for wmap in self.wavemaps:
for crust_ind in crust_inds:
gfs = {}
for var in self.slip_varnames:
gflib_name = get_gf_prefix(
datatype=self.name,
component=var,
wavename=wmap._mapid,
crust_ind=crust_ind,
)
gfpath = os.path.join(self.gfpath, gflib_name + ".yaml")
if not os.path.exists(gfpath):
filename = get_gf_prefix(
datatype=self.name,
component=var,
wavename=wmap.config.name,
crust_ind=crust_ind,
)
logger.warning(
"Seismic GFLibrary %s does not exist, "
"trying to load with old naming: %s"
% (gflib_name, filename)
)
gfpath = os.path.join(self.gfpath, filename + ".yaml")
else:
logger.info("Loading SeismicGFLibrary %s " % gflib_name)
filename = gflib_name
gfs = load_gf_library(directory=self.gfpath, filename=filename)
if make_shared:
gfs.init_optimization()
key = self.get_gflibrary_key(
crust_ind=crust_ind, wavename=wmap._mapid, component=var
)
self.gf_names[key] = gfpath
self.gfs[key] = gfs
def get_formula(self, input_rvs, fixed_rvs, hyperparams, problem_config):
# no a, d taper bounds as GF library saved between b c
chop_bounds = ["b", "c"]
logger.info("Loading %s Green's Functions" % self.name)
self.load_gfs(
crust_inds=[self.config.gf_config.reference_model_idx], make_shared=False
)
hp_specific = self.config.dataset_specific_residual_noise_estimation
tpoint = problem_config.get_test_point()
self.input_rvs = input_rvs
self.fixed_rvs = fixed_rvs
logger.info(
"Seismic optimization on: \n " " %s" % ", ".join(self.input_rvs.keys())
)
t2 = time()
wlogpts = []
self.analyse_noise(tpoint, chop_bounds=chop_bounds)
for gfs in self.gfs.values():
gfs.init_optimization()
self.init_weights()
if self.config.station_corrections:
logger.info(
"Initialized %i hierarchical parameters for "
"station corrections." % len(self.get_all_station_names())
)
self.input_rvs.update(fixed_rvs)
ref_idx = self.config.gf_config.reference_model_idx
nuc_strike = input_rvs["nucleation_strike"]
nuc_dip = input_rvs["nucleation_dip"]
t2 = time()
# convert velocities to rupture onset
logger.debug("Fast sweeping ...")
starttimes0 = tt.zeros((self.fault.npatches), dtype=tconfig.floatX)
for index in range(self.fault.nsubfaults):
nuc_dip_idx, nuc_strike_idx = self.fault.fault_locations2idxs(
index=index,
positions_dip=nuc_dip[index],
positions_strike=nuc_strike[index],
backend="pytensor",
)
sf_patch_indexs = self.fault.cum_subfault_npatches[index : index + 2]
starttimes_tmp = self.sweepers[index](
(1.0 / self.fault.vector2subfault(index, input_rvs["velocities"])),
nuc_dip_idx,
nuc_strike_idx,
)
starttimes_tmp += input_rvs["time"][index]
starttimes0 = tt.set_subtensor(
starttimes0[sf_patch_indexs[0] : sf_patch_indexs[1]], starttimes_tmp
)
wlogpts = []
for wmap in self.wavemaps:
wc = wmap.config
if wc.domain == "spectrum":
raise TypeError("FFI is currently only supported for time-domain!")
# station corrections
if len(self.hierarchicals) > 0:
logger.info("Applying station corrections ...")
starttimes = (
tt.tile(starttimes0, wmap.n_t)
- tt.repeat(
self.hierarchicals[wmap.time_shifts_id][
wmap.station_correction_idxs
],
self.fault.npatches,
)
).reshape((wmap.n_t, self.fault.npatches))
else:
logger.info("No station corrections ...")
starttimes = tt.tile(starttimes0, wmap.n_t).reshape(
(wmap.n_t, self.fault.npatches)
)
targetidxs = shared(num.atleast_2d(num.arange(wmap.n_t)).T, borrow=True)
logger.debug("Stacking %s phase ..." % wc.name)
synthetics = tt.zeros(
(
wmap.n_t,
wc.arrival_taper.nsamples(self.config.gf_config.sample_rate),
),
dtype=tconfig.floatX,
)
# make sure data is init as array, if non-toeplitz above-traces!
wmap.prepare_data(
source=self.events[wc.event_idx],
engine=self.engine,
outmode="array",
chop_bounds=chop_bounds,
)
for var in self.slip_varnames:
logger.debug("Stacking %s variable" % var)
key = self.get_gflibrary_key(
crust_ind=ref_idx, wavename=wmap._mapid, component=var
)
logger.debug("GF Library key %s" % key)
synthetics += self.gfs[key].stack_all(
targetidxs=targetidxs,
starttimes=starttimes,
durations=input_rvs["durations"],
slips=input_rvs[var],
interpolation=wc.interpolation,
)
residuals = wmap.shared_data_array - synthetics
logger.debug("Calculating likelihoods ...")
logpts = multivariate_normal_chol(
wmap.datasets,
wmap.weights,
hyperparams,
residuals,
hp_specific=hp_specific,
)
wlogpts.append(logpts)
t3 = time()
logger.debug("Seismic formula on test model takes: %f" % (t3 - t2))
llk = Deterministic(self._like_name, tt.concatenate((wlogpts)))
return llk.sum()
[docs]
def get_synthetics(self, point, **kwargs):
"""
Get synthetics for given point in solution space.
Parameters
----------
point : :func:`pymc.Point`
Dictionary with model parameters
kwargs especially to change output of the forward model
outmode: stacked_traces/ tapered_data/ array
Returns
-------
list with :class:`heart.SeismicDataset` synthetics for each target
"""
outmode = kwargs.pop("outmode", "stacked_traces")
patchidxs = kwargs.pop("patchidxs", None)
if patchidxs is None:
patchidxs = num.arange(self.fault.npatches, dtype="int")
# GF library cut in between [b, c] no [a,d] possible
chop_bounds = ["b", "c"]
order = kwargs.pop("order", "list")
ref_idx = self.config.gf_config.reference_model_idx
if len(self.gfs) == 0:
self.load_gfs(crust_inds=[ref_idx], make_shared=False)
for gfs in self.gfs.values():
gfs.set_stack_mode("numpy")
tpoint = copy.deepcopy(point)
hps = self.config.get_hypernames()
for hyper in hps:
if hyper in tpoint:
tpoint.pop(hyper)
starttimes0 = num.zeros((self.fault.npatches), dtype=tconfig.floatX)
for index in range(self.fault.nsubfaults):
starttimes_tmp = self.fault.point2starttimes(tpoint, index=index).ravel()
sf_patch_indexs = self.fault.cum_subfault_npatches[index : index + 2]
starttimes0[sf_patch_indexs[0] : sf_patch_indexs[1]] = starttimes_tmp
synth_traces = []
obs_traces = []
for wmap in self.wavemaps:
wc = wmap.config
starttimes = num.tile(starttimes0, wmap.n_t).reshape(
wmap.n_t, self.fault.npatches
)
# station corrections
if self.config.station_corrections:
logger.debug(
"Applying station corrections " "for wmap {}".format(wmap._mapid)
)
try:
corrections = point[wmap.time_shifts_id]
except KeyError: # got reference point from config
if self.correction_name in point:
corrections = float(point[self.correction_name]) * num.ones(
wmap.n_t
)
else: # fixed individual station corrections
corrections = self.hierarchicals[wmap.time_shifts_id][
wmap.station_correction_idxs
]
starttimes -= num.repeat(
corrections[wmap.station_correction_idxs], self.fault.npatches
).reshape(wmap.n_t, self.fault.npatches)
# TODO check targetidxs if station blacklisted!?
targetidxs = num.atleast_2d(num.arange(wmap.n_t)).T
synthetics = num.zeros(
(wmap.n_t, wc.arrival_taper.nsamples(self.config.gf_config.sample_rate))
)
for var in self.slip_varnames:
key = self.get_gflibrary_key(
crust_ind=ref_idx, wavename=wmap._mapid, component=var
)
try:
logger.debug("Accessing GF Library key %s" % key)
gflibrary = self.gfs[key]
except KeyError:
raise KeyError(
"GF library %s not loaded! Loaded GFs:"
" %s" % (key, utility.list2string(self.gfs.keys()))
)
from time import time
gflibrary.set_stack_mode("numpy")
t0 = time()
synthetics += gflibrary.stack_all(
targetidxs=targetidxs,
starttimes=starttimes[:, patchidxs],
durations=tpoint["durations"][patchidxs],
slips=tpoint[var][patchidxs],
patchidxs=patchidxs,
interpolation=wc.interpolation,
)
t1 = time()
logger.debug("{} seconds to stack {}".format((t1 - t0), wmap._mapid))
wmap_synthetics = []
if outmode != "array":
for i, target in enumerate(wmap.targets):
tr = Trace(
ydata=synthetics[i, :],
tmin=float(gflibrary.reference_times[i]),
deltat=gflibrary.deltat,
)
tr.set_codes(*target.codes)
if outmode == "tapered_data":
# TODO subfault individual synthetics (use patchidxs arg)
tr = [tr]
wmap_synthetics.append(tr)
elif outmode == "array":
wmap_synthetics.extend(synthetics)
else:
raise ValueError(
"Supported outmodes: stacked_traces, tapered_data, array! "
"Given outmode: %s !" % outmode
)
if not wmap.is_prepared:
wmap.prepare_data(
source=self.events[wc.event_idx],
engine=self.engine,
outmode=outmode,
chop_bounds=chop_bounds,
)
if order == "list":
synth_traces.extend(wmap_synthetics)
obs_traces.extend(wmap._prepared_data)
elif order == "wmap":
synth_traces.append(wmap_synthetics)
obs_traces.append(wmap._prepared_data)
else:
raise ValueError('Order "%s" is not supported' % order)
return synth_traces, obs_traces
[docs]
def update_weights(self, point, n_jobs=1, plot=False, chop_bounds=["b", "c"]):
"""
Updates weighting matrixes (in place) with respect to the point in the
solution space.
Parameters
----------
point : dict
with numpy array-like items and variable name keys
"""
if not self.weights:
self.init_weights()
# update data covariances in case model dependent non-toeplitz
if self.config.noise_estimator.structure == "non-toeplitz":
logger.info("Updating data-covariances ...")
self.analyse_noise(point, chop_bounds=chop_bounds)
for wmap in self.wavemaps:
logger.info("Updating weights of wavemap %s" % wmap._mapid)
for i, dataset in enumerate(wmap.datasets):
choli = dataset.covariance.chol_inverse
# update shared variables
dataset.covariance.update_slog_pdet()
wmap.weights[i].set_value(choli)