import logging
from time import time
import numpy as num
from pyrocko import gf, trace
from pytensor import config as tconfig
from scipy.linalg import toeplitz
from scipy.spatial import KDTree
from beat import heart
from beat.utility import distances, ensure_cov_psd, list2string, running_window_rms
logger = logging.getLogger("covariance")
__all__ = [
"geodetic_cov_velocity_models",
"geodetic_cov_velocity_models_pscmp",
"seismic_cov_velocity_models",
"SeismicNoiseAnalyser",
]
def exponential_data_covariance(n, dt, tzero):
"""
Get exponential sub-covariance matrix without variance, toeplitz form.
Parameters
----------
n : int
length of trace/ samples of quadratic Covariance matrix
dt : float
time step of samples, sampling interval
tzero : float
shortest period of waves in trace
Returns
-------
:class:`numpy.ndarray`
Notes
-----
Cd(i,j) = (Variance of trace)*exp(-abs(ti-tj)/
(shortest period T0 of waves))
i,j are samples of the seismic trace
"""
return num.exp(
-num.abs(num.arange(n)[:, num.newaxis] - num.arange(n)[num.newaxis, :])
* (dt / tzero)
)
def identity_data_covariance(n, dt=None, tzero=None):
"""
Get identity covariance matrix.
Parameters
----------
n : int
length of trace/ samples of quadratic Covariance matrix
Returns
-------
:class:`numpy.ndarray`
"""
return num.eye(n, dtype=tconfig.floatX)
def ones_data_covariance(n, dt=None, tzero=None):
"""
Get ones covariance matrix. Dummy for importing.
Parameters
----------
n : int
length of trace/ samples of quadratic Covariance matrix
Returns
-------
:class:`numpy.ndarray`
"""
return num.ones((n, n), dtype=tconfig.floatX)
NoiseStructureCatalog = {
"variance": identity_data_covariance,
"exponential": exponential_data_covariance,
"import": ones_data_covariance,
"non-toeplitz": ones_data_covariance,
}
NoiseStructureCatalog2d = {
"import": ones_data_covariance,
"non-toeplitz": ones_data_covariance,
}
def available_noise_structures():
return list(NoiseStructureCatalog.keys())
def available_noise_structures_2d():
return list(NoiseStructureCatalog2d.keys())
def import_data_covariance(data_trace, arrival_taper, sample_rate, domain="time"):
"""
Use imported covariance matrixes and check size consistency with taper.
Cut or extend based on variance and taper size.
Parameters
----------
data_trace : :class:`heart.SeismicDataset`
with data covariance matrix in the covariance attribute
arrival_taper : :class: `heart.ArrivalTaper`
determines tapering around phase Arrival
sample_rate : float
sampling rate of data_traces and GreensFunction stores
Returns
-------
covariance matrix : :class:`numpy.ndarray`
with size of given arrival taper
"""
logger.info("No data-covariance estimation, using imported" " covariances...\n")
at = arrival_taper
n_samples = at.nsamples(sample_rate)
if domain == "spectrum":
n_samples = trace.nextpow2(n_samples)
n_samples = int(n_samples // 2) + 1
if data_trace.covariance is None:
logger.warn("No data covariance given/estimated! " "Setting default: eye")
return num.eye(n_samples)
else:
data_cov = data_trace.covariance.data
if data_cov.shape[0] != n_samples:
logger.warn(
"Imported covariance %i does not agree "
" with taper samples %i! Using Identity"
" matrix and mean of variance of imported"
" covariance matrix!" % (data_cov.shape[0], n_samples)
)
data_cov = num.eye(n_samples) * data_cov.diagonal().mean()
return data_cov
class GeodeticNoiseAnalyser(object):
"""
Geodetic noise analyser
Parameters
----------
structure : string
either, import, variance, non-toeplitz
events : list
of :class:`pyrocko.meta.Event` reference event(s) from catalog
"""
def __init__(
self,
config,
events=None,
):
avail = available_noise_structures_2d()
if config.structure not in avail:
raise AttributeError(
'Selected noise structure "%s" not supported! Implemented'
" noise structures: %s" % (config.structure, list2string(avail))
)
self.events = events
self.config = config
def get_structure(self, dataset):
return NoiseStructureCatalog2d[self.config.structure](dataset.ncoords)
def do_import(self, dataset):
if dataset.covariance.data is not None:
return dataset.covariance.data
else:
raise ValueError(
"Data covariance for dataset %s needs to be defined!" % dataset.id
)
def do_non_toeplitz(self, dataset, result):
if dataset.typ == "SAR":
dataset.update_local_coords(self.events[0])
coords = num.vstack([dataset.east_shifts, dataset.north_shifts]).T
scaling = non_toeplitz_covariance_2d(
coords, result.processed_res, max_dist_perc=self.config.max_dist_perc
)
else:
scaling = dataset.covariance.data
if num.isnan(scaling).any():
raise ValueError(
"Estimated Non-Toeplitz covariance matrix for dataset %s contains Nan! "
"Please increase 'max_dist_perc'!" % dataset.id
)
return scaling
def get_data_covariance(self, dataset, result=None):
"""
Estimated data covariances of seismic traces
Parameters
----------
datasets
results
Returns
-------
:class:`numpy.ndarray`
"""
covariance_structure = self.get_structure(dataset)
if self.config.structure == "import":
scaling = self.do_import(dataset)
elif self.config.structure == "non-toeplitz":
scaling = self.do_non_toeplitz(dataset, result)
return ensure_cov_psd(scaling * covariance_structure)
[docs]
class SeismicNoiseAnalyser(object):
"""
Seismic noise analyser
Parameters
----------
structure : string
either, variance, exponential, import, non-toeplitz
pre_arrival_time : float
in [s], time before P arrival until variance is estimated
engine : :class:`pyrocko.gf.seismosizer.LocalEngine`
processing object for synthetics calculation
events : list
of :class:`pyrocko.meta.Event`
reference event(s) from catalog
chop_bounds : list of len 2
of taper attributes a, b, c, or d
"""
def __init__(
self,
structure="variance",
pre_arrival_time=5.0,
engine=None,
events=None,
sources=None,
chop_bounds=["b", "c"],
):
avail = available_noise_structures()
if structure not in avail:
raise AttributeError(
'Selected noise structure "%s" not supported! Implemented'
" noise structures: %s" % (structure, list2string(avail))
)
self.events = events
self.engine = engine
self.sources = sources
self.pre_arrival_time = pre_arrival_time
self.structure = structure
self.chop_bounds = chop_bounds
def get_structure(self, wmap, chop_bounds=None):
if chop_bounds is None:
chop_bounds = self.chop_bounds
_, fmax = wmap.get_taper_frequencies()
tzero = 1.0 / fmax
if wmap.config.domain == "spectrum":
n = wmap.get_nsamples_spectrum(chop_bounds=chop_bounds, pad_to_pow2=True)
dsample = wmap.get_deltaf(chop_bounds=chop_bounds, pad_to_pow2=True)
else:
n = wmap.get_nsamples_time(chop_bounds)
dsample = wmap.deltat
return NoiseStructureCatalog[self.structure](n, dsample, tzero)
def do_import(self, wmap):
scalings = []
for tr, target in zip(wmap.datasets, wmap.targets):
scaling = import_data_covariance(
tr,
arrival_taper=wmap.config.arrival_taper,
sample_rate=1.0 / wmap.deltat,
domain=wmap.config.domain,
)
scalings.append(scaling)
return scalings
def do_non_toeplitz(self, wmap, results):
if results is None:
ValueError(
"Results need(s) to be given for non-toeplitz" " covariance estimates!"
)
else:
scalings = []
for result in results:
residual = result.processed_res.get_ydata()
window_size = residual.size // 5
if window_size == 0:
raise ValueError(
"Length of trace too short! Please widen taper in time"
" domain or frequency bands in spectral domain."
)
scaling = non_toeplitz_covariance(residual, window_size=window_size)
scalings.append(scaling)
return scalings
def do_variance_estimate(self, wmap, chop_bounds=None):
filterer = wmap.config.filterer
scalings = []
for i, (tr, target) in enumerate(zip(wmap.datasets, wmap.targets)):
wavename = None # None uses first tabulated phase
arrival_time = heart.get_phase_arrival_time(
engine=self.engine,
source=self.events[wmap.config.event_idx],
target=target,
wavename=wavename,
)
if arrival_time < tr.tmin:
logger.warning(
"no data for variance estimation on pre-P arrival"
" in wavemap %s, for trace %s!"
% (wmap._mapid, list2string(tr.nslc_id))
)
logger.info('Using reference arrival "%s" instead!' % wmap.name)
arrival_time = heart.get_phase_arrival_time(
engine=self.engine,
source=self.events[wmap.config.event_idx],
target=target,
wavename=wmap.name,
)
if filterer:
ctrace = tr.copy()
# apply all the filters
for filt in filterer:
filt.apply(ctrace)
ctrace = ctrace.chop(
tmin=tr.tmin, tmax=arrival_time - self.pre_arrival_time
)
nslc_id_str = list2string(ctrace.nslc_id)
if wmap.config.domain == "spectrum":
valid_spectrum_indices = wmap.get_valid_spectrum_indices(
chop_bounds=chop_bounds, pad_to_pow2=True
)
data = heart.fft_transforms(
[ctrace],
valid_spectrum_indices,
outmode="stacked_traces",
pad_to_pow2=True,
)[0].get_ydata()
else:
data = ctrace.get_ydata()
if data.size == 0:
raise ValueError(
"Trace %s contains no pre-P arrival data! Please either "
"remove/blacklist or make sure data contains times before"
" the P arrival time!" % nslc_id_str
)
scaling = num.nanvar(data)
if num.isfinite(scaling).all():
logger.info("Variance estimate of %s = %g" % (nslc_id_str, scaling))
scalings.append(scaling)
else:
raise ValueError(
"Pre P-trace of %s contains Inf or" " NaN!" % nslc_id_str
)
return scalings
[docs]
def get_data_covariances(self, wmap, sample_rate, results=None, chop_bounds=None):
"""
Estimated data covariances of seismic traces
Parameters
----------
wmap : :class:`beat.WaveformMapping`
results
sample_rate : float
sampling rate of data_traces and GreensFunction stores
Returns
-------
:class:`numpy.ndarray`
"""
covariance_structure = self.get_structure(wmap, chop_bounds)
if self.structure == "import":
scalings = self.do_import(wmap)
elif self.structure == "non-toeplitz":
scalings = self.do_non_toeplitz(wmap, results)
else:
scalings = self.do_variance_estimate(wmap, chop_bounds=chop_bounds)
cov_ds = []
for scaling in scalings:
cov_d = ensure_cov_psd(scaling * covariance_structure)
cov_ds.append(cov_d)
return cov_ds
def model_prediction_sensitivity(engine, *args, **kwargs):
"""
DEPRECATED!
Calculate the model prediction Covariance Sensitivity Kernel.
(numerical derivation with respect to the input source parameter(s))
Following Duputel et al. 2014
:Input:
:py:class:'engine'
source_parms = list of parameters with respect to which the kernel
is being calculated e.g. ['strike', 'dip', 'depth']
!!!
NEEDS to have seismosizer source object parameter variable name convention
!!!
(see seismosizer.source.keys())
calculate_model_prediction_sensitivity(request, source_params, **kwargs)
calculate_model_prediction_sensitivity(sources,
targets, source_params, **kwargs)
Returns traces in a list[parameter][targets] for each station and channel
as specified in the targets. The location code of each trace is placed to
show the respective source parameter.
"""
if len(args) not in (0, 1, 2, 3):
raise gf.BadRequest("invalid arguments")
if len(args) == 2:
kwargs["request"] = args[0]
kwargs["source_params"] = args[1]
elif len(args) == 3:
kwargs.update(gf.Request.args2kwargs(args[0:1]))
kwargs["source_params"] = args[2]
request = kwargs.pop("request", None)
nprocs = kwargs.pop("nprocs", 1)
source_params = kwargs.pop("source_params", None)
h = kwargs.pop("h", None)
if request is None:
request = gf.Request(**kwargs)
if h is None:
h = num.ones(len(source_params)) * 1e-1
# create results list
sensitivity_param_list = []
sensitivity_param_trcs = []
for i in range(len(source_params)):
sensitivity_param_list.append([0] * len(request.targets))
sensitivity_param_trcs.append([0] * len(request.targets))
for ref_source in request.sources:
par_count = 0
for param in source_params:
print(param, "with h = ", h[par_count])
calc_source_p2h = ref_source.clone()
calc_source_ph = ref_source.clone()
calc_source_mh = ref_source.clone()
calc_source_m2h = ref_source.clone()
setattr(calc_source_p2h, param, ref_source[param] + (2 * h[par_count]))
setattr(calc_source_ph, param, ref_source[param] + (h[par_count]))
setattr(calc_source_mh, param, ref_source[param] - (h[par_count]))
setattr(calc_source_m2h, param, ref_source[param] - (2 * h[par_count]))
calc_sources = [
calc_source_p2h,
calc_source_ph,
calc_source_mh,
calc_source_m2h,
]
response = engine.process(
sources=calc_sources, targets=request.targets, nprocs=nprocs
)
for i_k in range(len(request.targets)):
# zero padding if necessary
trc_lengths = num.array(
[
len(response.results_list[i][i_k].trace.data)
for i in range(len(response.results_list))
]
)
Id = num.where(trc_lengths != trc_lengths.max())
for i_l in Id[0]:
response.results_list[i_l][i_k].trace.data = num.concatenate(
(
response.results_list[i_l][i_k].trace.data,
num.zeros(trc_lengths.max() - trc_lengths[i_l]),
)
)
# calculate numerical partial derivative for
# each source and target
sensitivity_param_list[par_count][i_k] = sensitivity_param_list[
par_count
][i_k] + (
-response.results_list[0][i_k].trace.data
+ 8 * response.results_list[1][i_k].trace.data
- 8 * response.results_list[2][i_k].trace.data
+ response.results_list[3][i_k].trace.data
) / (12 * h[par_count])
par_count = par_count + 1
# form traces from sensitivities
par_count = 0
for param in source_params:
for k in range(len(request.targets)):
sensitivity_param_trcs[par_count][i_k] = trace.Trace(
network=request.targets[k].codes[0],
station=request.targets[k].codes[1],
ydata=sensitivity_param_list[par_count][k],
deltat=response.results_list[0][k].trace.deltat,
tmin=response.results_list[0][k].trace.tmin,
channel=request.targets[k].codes[3],
location=param,
)
par_count = par_count + 1
return sensitivity_param_trcs
[docs]
def seismic_cov_velocity_models(
engine,
sources,
targets,
arrival_taper,
arrival_time,
wavename,
filterer,
plot=False,
n_jobs=1,
chop_bounds=["b", "c"],
):
"""
Calculate model prediction uncertainty matrix with respect to uncertainties
in the velocity model for station and channel.
Parameters
----------
engine : :class:`pyrocko.gf.seismosizer.LocalEngine`
contains synthetics generation machine
sources : list
of :class:`pyrocko.gf.seismosizer.Source`
targets : list
of :class:`pyrocko.gf.seismosizer.Targets`
arrival_taper : :class: `heart.ArrivalTaper`
determines tapering around phase Arrival
arrival_time : None or :class:`numpy.NdArray` or float
of phase to apply taper, if None theoretic arrival of ray tracing used
filterer : list
of :class:`heart.Filter` determining the filtering corner frequencies
of various filters
plot : boolean
open snuffler and browse traces if True
n_jobs : int
number of processors to be used for calculation
Returns
-------
:class:`numpy.ndarray` with Covariance due to velocity model uncertainties
"""
arrival_times = num.ones(len(targets), dtype="float64") * arrival_time
t0 = time()
synths, _ = heart.seis_synthetics(
engine=engine,
sources=sources,
targets=targets,
arrival_taper=arrival_taper,
wavename=wavename,
filterer=filterer,
arrival_times=arrival_times,
pre_stack_cut=True,
plot=plot,
outmode="array",
chop_bounds=chop_bounds,
)
t1 = time()
logger.debug("Trace generation time %f" % (t1 - t0))
return num.cov(synths, rowvar=0)
[docs]
def geodetic_cov_velocity_models(
engine, sources, targets, dataset, plot=False, event=None, n_jobs=1
):
"""
Calculate model prediction uncertainty matrix with respect to uncertainties
in the velocity model for geodetic targets using fomosto GF stores.
Parameters
----------
engine : :class:`pyrocko.gf.seismosizer.LocalEngine`
contains synthetics generation machine
target : :class:`pyrocko.gf.targets.StaticTarget`
dataset and observation points to calculate covariance for
sources : list
of :py:class:`pyrocko.gf.seismosizer.Source` determines the covariance
matrix
plot : boolean
if set, a plot is produced and not covariance matrix is returned
Returns
-------
:class:`numpy.ndarray` with Covariance due to velocity model uncertainties
"""
t0 = time()
displacements = heart.geo_synthetics(
engine=engine, targets=targets, sources=sources, outmode="stacked_arrays"
)
t1 = time()
logger.debug("Synthetics generation time %f" % (t1 - t0))
synths = num.zeros((len(targets), dataset.samples))
for i, disp in enumerate(displacements):
synths[i, :] = (
disp[:, 0] * dataset.los_vector[:, 0]
+ disp[:, 1] * dataset.los_vector[:, 1]
+ disp[:, 2] * dataset.los_vector[:, 2]
) * dataset.odw
if plot:
from matplotlib import pyplot as plt
indexes = dataset.get_distances_to_event(event).argsort() # noqa
ax = plt.axes()
im = ax.matshow(synths) # [:, indexes])
plt.colorbar(im)
plt.show()
return num.cov(synths, rowvar=0)
[docs]
def geodetic_cov_velocity_models_pscmp(store_superdir, crust_inds, target, sources):
"""
Calculate model prediction uncertainty matrix with respect to uncertainties
in the velocity model for geodetic targets based on pscmp.
Deprecated!!!
Parameters
----------
store_superdir : str
Absolute path to the geodetic GreensFunction directory
crust_inds : list
of int of indices for respective GreensFunction store indexes
target : :class:`heart.GeodeticDataset`
dataset and observation points to calculate covariance for
sources : list
of :py:class:`pscmp.PsCmpRectangularSource` determines the covariance
matrix
Returns
-------
:class:`numpy.ndarray` with Covariance due to velocity model uncertainties
"""
synths = num.zeros((len(crust_inds), target.samples))
for crust_ind in crust_inds:
disp = heart.geo_layer_synthetics(
store_superdir,
crust_ind,
lons=target.lons,
lats=target.lats,
sources=sources,
)
synths[crust_ind, :] = (
disp[:, 0] * target.los_vector[:, 0]
+ disp[:, 1] * target.los_vector[:, 1]
+ disp[:, 2] * target.los_vector[:, 2]
) * target.odw
return num.cov(synths, rowvar=0)
def autocovariance(data):
"""
Calculate autocovariance of data.
Returns
-------
:class:`numpy.ndarray`
Notes
-----
Following Dettmer et al. 2007 JASA
"""
n = data.size
meand = data.mean()
autocov = num.zeros((n), tconfig.floatX)
for j in range(n):
for k in range(n - j):
autocov[j] += (data[j + k] - meand) * (data[k] - meand)
return autocov / n
def toeplitz_covariance(data, window_size):
"""
Get Toeplitz banded matrix for given data.
Returns
-------
toeplitz : :class:`numpy.ndarray` 1-d, (size data)
stds : :class:`numpy.ndarray` 1-d, size data
of running windows
"""
stds = running_window_rms(data, window_size=window_size, mode="same")
coeffs = autocovariance(data / stds)
return toeplitz(coeffs), stds
def non_toeplitz_covariance(data, window_size):
"""
Get scaled non- Toeplitz covariance matrix, which may be able to account
for non-stationary data-errors. For 1-d data.
Parameters
----------
data : :class:`numpy.ndarray`
of data to estimate non-stationary error matrix for
window_size : int
samples to take on running rmse estimation over data
Returns
-------
:class:`numpy.ndarray` (size data, size data)
"""
toeplitz, stds = toeplitz_covariance(data, window_size)
return toeplitz * stds[:, num.newaxis] * stds[num.newaxis, :]
def k_nearest_neighbor_rms(coords, data, k=None, max_dist_perc=0.2):
"""
Calculate running rms on irregular sampled 2d spatial data.
Parameters
----------
coords : :class:`numpy.ndarray` 2-d, (size data, n coords-dims)
containing spatial coordinates (east_shifts, north_shifts)
data : :class:`numpy.ndarray` 1-d, (size data)
containing values of physical quantity
k : int
taking k - nearest neighbors for std estimation
max_dist_perc : float
max distance [decimal percent] to select as nearest neighbors
"""
if k and max_dist_perc:
raise ValueError("Either k or max_dist_perc should be defined!")
kdtree = KDTree(coords, leafsize=1)
dists = distances(coords, coords)
r = dists.max() * max_dist_perc
logger.debug("Nearest neighbor distance is: %f", r)
stds = []
for point in coords:
if k is not None:
_, idxs = kdtree.query(point, k=k)
elif r is not None:
idxs = kdtree.query_ball_point(point, r=r)
else:
raise ValueError()
stds.append(num.std(data[idxs], ddof=1))
return num.array(stds)
def toeplitz_covariance_2d(coords, data, max_dist_perc=0.2):
"""
Get Toeplitz banded matrix for given 2d data.
Returns
-------
toeplitz : :class:`numpy.ndarray` 2-d, (size data, size data)
stds : :class:`numpy.ndarray` 1-d, size data
of running windows
max_dist_perc : float
max distance [decimal percent] to select as nearest neighbors
"""
stds = k_nearest_neighbor_rms(coords=coords, data=data, max_dist_perc=max_dist_perc)
coeffs = autocovariance(data / stds)
return toeplitz(coeffs), stds
def non_toeplitz_covariance_2d(coords, data, max_dist_perc):
"""
Get scaled non- Toeplitz covariance matrix, which may be able to account
for non-stationary data-errors. For 2-d geospatial data.
Parameters
----------
data : :class:`numpy.ndarray`
of data to estimate non-stationary error matrix for
max_dist_perc : float
max distance [decimal percent] to select as nearest neighbors
Returns
-------
:class:`numpy.ndarray` (size data, size data)
"""
toeplitz, stds = toeplitz_covariance_2d(coords, data, max_dist_perc)
return toeplitz * stds[:, num.newaxis] * stds[num.newaxis, :]
def init_proposal_covariance(bij, population):
"""
Create initial proposal covariance matrix based on random samples
from the solution space.
"""
test_point = population[0]
q = bij.map(test_point)
population_array = num.zeros((len(population), q.data.size))
for i, point in enumerate(population):
population_array[i, :] = bij.map(point).data
return num.diag(population_array.var(0))
def calc_sample_covariance(buffer, lij, bij, beta):
"""
Calculate trace covariance matrix based on given trace values.
Parameters
----------
lpoints : list
of list points (e.g. buffer of traces)
lij : `beat.utility.ListArrayOrdering`
that holds orderings of RVs
beta : float
tempering parameter of the trace
Returns
-------
cov : :class:`numpy.ndarray`
weighted covariances (NumPy > 1.10. required)
"""
n_points = len(buffer)
point = lij.l2d(buffer[0][0])
point_array = bij.map(point).data
like_idx = lij.ordering["like"].list_ind
weights = num.array([lpoint[like_idx] for lpoint, _ in buffer])
temp_weights = num.exp((weights - weights.max())).ravel()
norm_weights = temp_weights / num.sum(temp_weights)
population_array = num.zeros((n_points, point_array.size))
for i, (lpoint, _) in enumerate(buffer):
point = lij.l2d(lpoint)
population_array[i, :] = bij.map(point).data
cov = num.cov(population_array, aweights=norm_weights, bias=False, rowvar=0)
cov = ensure_cov_psd(cov)
if num.isnan(cov).any() or num.isinf(cov).any():
logger.warn(
"Proposal covariances contain Inf or NaN! "
"For chain with beta: %f "
"Buffer size maybe too small! Keeping previous proposal." % beta
)
cov = None
return cov