"""
This module provides a namespace for various functions:
coordinate transformations,
loading and storing objects,
book-keeping of indexes in arrays that relate to defined variable names,
manipulation of various pyrocko objects
and many more ...
"""
import collections
import copy
import logging
import os
import re
from functools import wraps
from timeit import Timer
import cloudpickle as pickle
import numpy as num
from pyrocko import catalog, orthodrome, util
from pyrocko.cake import LayeredModel, m2d, read_nd_model_str
from pyrocko.guts import Float, Int, Object
from pytensor import config as tconfig
logger = logging.getLogger("utility")
DataMap = collections.namedtuple("DataMap", "list_ind, slc, shp, dtype, name")
locationtypes = {
"east_shift",
"north_shift",
"depth",
"distance",
"delta_depth",
"delta_east_shift_bottom",
"delta_north_shift_bottom",
"depth_bottom",
}
dimensiontypes = {
"length",
"width",
"diameter",
"a_half_axis",
"b_half_axis",
"a_half_axis_bottom",
"b_half_axis_bottom",
}
mttypes = {"mnn", "mee", "mdd", "mne", "mnd", "med"}
degtypes = {"strike", "dip", "rake"}
nucleationtypes = {"nucleation_x", "nucleation_y"}
patch_anchor_points = {"center", "bottom_depth", "bottom_left"}
patypes = {"traction", "strike_traction", "dip_traction", "tensile_traction"}
kmtypes = set.union(locationtypes, dimensiontypes, patch_anchor_points)
grouped_vars = set.union(kmtypes, mttypes, degtypes, nucleationtypes)
unit_sets = {
"locationtypes": locationtypes,
"dimensiontypes": dimensiontypes,
"mttypes": mttypes,
"degtypes": degtypes,
"nucleationtypes": nucleationtypes,
}
seconds_str = "00:00:00"
sphr = 3600.0
hrpd = 24.0
d2r = num.pi / 180.0
km = 1000.0
def argsorted(seq, reverse=False):
# http://stackoverflow.com/questions/3382352/equivalent-of-numpy-argsort-in-basic-python/3382369#3382369
# by unutbu
return sorted(range(len(seq)), key=seq.__getitem__, reverse=reverse)
[docs]
class Counter(object):
"""
Counts calls of types with string_ids. Repeated calls with the same
string id increase the count.
"""
def __init__(self):
self.d = dict()
def __call__(self, string, multiplier=1):
if string not in self.d:
self.d[string] = 0
else:
self.d[string] += 1 * multiplier
return self.d[string]
def __getitem__(self, key):
try:
return self.d[key]
except ValueError:
raise KeyError(
'type "%s" is not listed in the counter!'
" Counted types are: %s" % (key, list2string(list(self.d.keys())))
)
def reset(self, string=None):
if string is None:
self.d = dict()
else:
self.d[string] = 0
[docs]
class ListArrayOrdering(object):
"""
An ordering for a list to an array space. Takes also non pytensor.tensors.
Modified from pymc blocking.
Parameters
----------
list_arrays : list
:class:`numpy.ndarray` or :class:`pytensor.tensor.Tensor`
intype : str
defining the input type 'tensor' or 'numpy'
"""
def __init__(self, list_arrays, intype="numpy"):
self.vmap = []
dim = 0
count = 0
for array in list_arrays:
if intype == "tensor":
name = array.name
array = array.get_test_value()
elif intype == "numpy":
name = "numpy"
slc = slice(dim, dim + array.size)
vm = DataMap(count, slc, array.shape, array.dtype, name)
self.vmap.append(vm)
dim += array.size
count += 1
self.size = dim
self._keys = None
def __getitem__(self, key):
try:
return self.vmap[self.variables.index(key)]
except ValueError:
raise KeyError(
'Variable "%s" is not in the mapping!'
" Mapped Variables: %s" % (key, list2string(self.variables))
)
def __iter__(self):
return iter(self.variables)
@property
def variables(self):
if self._keys is None:
self._keys = [vmap.name for vmap in self.vmap]
return self._keys
[docs]
class ListToArrayBijection(object):
"""
A mapping between a List of arrays and an array space
Parameters
----------
ordering : :class:`ListArrayOrdering`
list_arrays : list
of :class:`numpy.ndarray`
"""
def __init__(self, ordering, list_arrays, blacklist=[]):
self.ordering = ordering
self.list_arrays = list_arrays
self.dummy = -9.0e40
self.blacklist = blacklist
[docs]
def d2l(self, dpt):
"""
Maps values from dict space to List space
If variable expected from ordering is not in point
it is filled with a low dummy value -999999.
Parameters
----------
dpt : list
of :class:`numpy.ndarray`
Returns
-------
lpoint
"""
a_list = copy.copy(self.list_arrays)
for list_ind, _, shp, _, var in self.ordering.vmap:
try:
a_list[list_ind] = dpt[var].ravel()
except KeyError:
# Needed for initialisation of chain_l_point in Metropolis
a_list[list_ind] = num.atleast_1d(num.ones(shp) * self.dummy).ravel()
return a_list
[docs]
def l2d(self, a_list):
"""
Maps values from List space to dict space
Parameters
----------
list_arrays : list
of :class:`numpy.ndarray`
Returns
-------
:class:`pymc.model.Point`
"""
point = {}
for list_ind, _, _, _, var in self.ordering.vmap:
if var not in self.blacklist:
point[var] = a_list[list_ind].ravel()
return point
[docs]
def l2a(self, list_arrays):
"""
Maps values from List space to array space
Parameters
----------
list_arrays : list
of :class:`numpy.ndarray`
Returns
-------
array : :class:`numpy.ndarray`
single array comprising all the input arrays
"""
array = num.empty(self.ordering.size)
for list_ind, slc, _, _, _ in self.ordering.vmap:
array[slc] = list_arrays[list_ind].ravel()
return array
[docs]
def f3map(self, list_arrays):
"""
Maps values from List space to array space with 3 columns
Parameters
----------
list_arrays : list
of :class:`numpy.ndarray` with size: n x 3
Returns
-------
array : :class:`numpy.ndarray`
single array comprising all the input arrays
"""
array = num.empty((self.ordering.size, 3))
for list_ind, slc, _, _, _ in self.ordering.vmap:
array[slc, :] = list_arrays[list_ind]
return array
[docs]
def a2l(self, array):
"""
Maps value from array space to List space
Inverse operation of fmap.
Parameters
----------
array : :class:`numpy.ndarray`
Returns
-------
a_list : list
of :class:`numpy.ndarray`
"""
a_list = copy.copy(self.list_arrays)
for list_ind, slc, shp, dtype, _ in self.ordering.vmap:
try:
a_list[list_ind] = num.atleast_1d(array)[slc].reshape(shp).astype(dtype)
except ValueError: # variable does not exist in array use dummy
a_list[list_ind] = num.atleast_1d(num.ones(shp) * self.dummy).ravel()
return a_list
[docs]
def a_nd2l(self, array):
"""
Maps value from ndarray space (ndims, data) to List space
Inverse operation of fmap. Nd
Parameters
----------
array : :class:`numpy.ndarray`
Returns
-------
a_list : list
of :class:`numpy.ndarray`
"""
a_list = copy.copy(self.list_arrays)
nd = array.ndim
if nd != 2:
raise ValueError(
"Input array has wrong dimensions! Needed 2d array! Got %i" % nd
)
for list_ind, slc, shp, dtype, _ in self.ordering.vmap:
shpnd = (array.shape[0],) + shp
try:
a_list[list_ind] = (
num.atleast_2d(array)[:, slc].reshape(shpnd).astype(dtype)
)
except ValueError: # variable does not exist in array use dummy
a_list[list_ind] = num.atleast_2d(num.ones(shpnd) * self.dummy)
return a_list
[docs]
def srmap(self, tarray):
"""
Maps value from symbolic variable array space to List space
Parameters
----------
tarray : :class:`pytensor.tensor.Tensor`
Returns
-------
a_list : list
of :class:`pytensor.tensor.Tensor`
"""
a_list = copy.copy(self.list_arrays)
for list_ind, slc, shp, dtype, _ in self.ordering.vmap:
a_list[list_ind] = tarray[slc].reshape(shp).astype(dtype.name)
return a_list
[docs]
def apply_station_blacklist(stations, blacklist):
"""
Weed stations listed in the blacklist.
Parameters
----------
stations : list
:class:`pyrocko.model.Station`
blacklist : list
strings of station names
Returns
-------
stations : list of :class:`pyrocko.model.Station`
"""
outstations = []
for st in stations:
station_name = get_ns_id((st.network, st.station))
if station_name not in blacklist:
outstations.append(st)
return outstations
[docs]
def weed_data_traces(data_traces, stations):
"""
Throw out data traces belonging to stations that are not in the
stations list. Keeps list orders!
Parameters
----------
data_traces : list
of :class:`pyrocko.trace.Trace`
stations : list
of :class:`pyrocko.model.Station`
Returns
-------
weeded_data_traces : list
of :class:`pyrocko.trace.Trace`
"""
station_names = [
get_ns_id((station.network, station.station)) for station in stations
]
weeded_data_traces = []
for tr in data_traces:
trace_name = get_ns_id(tr.nslc_id)
if trace_name in station_names:
weeded_data_traces.append(tr)
return weeded_data_traces
[docs]
def weed_targets(targets, stations, discard_targets=[]):
"""
Throw out targets belonging to stations that are not in the
stations list. Keeps list orders and returns new list!
Parameters
----------
targets : list
of :class:`pyrocko.gf.targets.Target`
stations : list
of :class:`pyrocko.model.Station`
Returns
-------
weeded_targets : list
of :class:`pyrocko.gf.targets.Target`
"""
station_names = [
get_ns_id((station.network, station.station)) for station in stations
]
weeded_targets = []
for target in targets:
target_name = get_ns_id((target.codes[0], target.codes[1]))
if target_name in station_names:
if target in discard_targets:
pass
else:
weeded_targets.append(target)
return weeded_targets
[docs]
def downsample_trace(data_trace, deltat=None, snap=False):
"""
Downsample data_trace to given sampling interval 'deltat'.
Parameters
----------
data_trace : :class:`pyrocko.trace.Trace`
deltat : sampling interval [s] to which trace should be downsampled
Returns
-------
:class:`pyrocko.trace.Trace`
new instance
"""
tr = data_trace.copy()
if deltat is not None:
if num.abs(tr.deltat - deltat) > 1.0e-6:
try:
tr.downsample_to(deltat, snap=snap, allow_upsample_max=5, demean=False)
tr.deltat = deltat
if snap:
tr.snap()
except util.UnavailableDecimation as e:
logger.error("Cannot downsample %s.%s.%s.%s: %s" % (tr.nslc_id + (e,)))
elif snap:
if tr.tmin / tr.deltat > 1e-6 or tr.tmax / tr.deltat > 1e-6:
tr.snap()
else:
raise ValueError("Need to provide target sample rate!")
return tr
[docs]
def weed_stations(stations, event, distances=(30.0, 90.0), remove_duplicate=False):
"""
Weed stations, that are not within the given distance range(min, max) to
a reference event.
Parameters
----------
stations : list
of :class:`pyrocko.model.Station`
event
:class:`pyrocko.model.Event`
distances : tuple
of minimum and maximum distance [deg] for station-event pairs
Returns
-------
weeded_stations : list
of :class:`pyrocko.model.Station`
"""
weeded_stations = []
logger.debug("Valid distance range: [%f, %f]!" % (distances[0], distances[1]))
check_duplicate = []
for station in stations:
distance = orthodrome.distance_accurate50m(event, station) * m2d
logger.debug("Distance of station %s: %f [deg]" % (station.station, distance))
if distance >= distances[0] and distance <= distances[1]:
logger.debug("Inside defined distance range!")
ns_str = get_ns_id((station.network, station.station))
if ns_str in check_duplicate and remove_duplicate:
logger.warning(
"Station %s already in wavemap! Multiple "
"locations not supported yet! "
"Discarding duplicate ..." % ns_str
)
else:
weeded_stations.append(station)
check_duplicate.append(ns_str)
else:
logger.debug("Outside defined distance range!")
return weeded_stations
[docs]
def adjust_point_units(point):
"""
Transform variables with [km] units to [m]
Parameters
----------
point : dict
:func:`pymc.model.Point` of model parameter units as keys
Returns
-------
mpoint : dict
:func:`pymc.model.Point`
"""
mpoint = {}
for key, value in point.items():
if key in kmtypes:
mpoint[key] = value * km
elif key in patypes:
mpoint[key] = value * km * km
else:
mpoint[key] = value
return mpoint
[docs]
def split_point(point, mapping=None, n_sources_total=None, weed_params=False):
"""
Split point in solution space into List of dictionaries with source
parameters for each source.
Parameters
----------
point : dict
:func:`pymc.model.Point`
mapping : :class: `beat.config.DatatypeParameterMapping`
n_sources_total : int
total number of sources for each type in setup
weed_params: bool
if True only source related parameters are kept in the point
if False it may raise an error.
Returns
-------
source_points : list
of :func:`pymc.model.Point`
"""
if mapping is not None and n_sources_total is not None:
raise ValueError("Must provide either mapping or n_sources_total")
if mapping is None and n_sources_total is None:
raise ValueError("Must provide either mapping or n_sources_total")
if mapping is not None:
point_to_sources = mapping.point_to_sources_mapping()
n_sources_total = mapping.n_sources
else:
point_to_sources = None
if weed_params:
source_parameter_names = mapping.point_variable_names()
for param in list(point.keys()):
if param not in source_parameter_names:
point.pop(param)
source_points = [{} for i in range(n_sources_total)]
for param, values in point.items():
if point_to_sources:
source_idxs = point_to_sources[param]
else:
source_idxs = range(n_sources_total)
for value, idx in zip(values, source_idxs):
try:
source_points[idx][param] = float(value)
except IndexError:
raise IndexError(
"Tried to set index %i for parameter %s, but does not exist."
% (idx, param)
)
return source_points
[docs]
def join_points(ldicts):
"""
Join list of dicts into one dict with concatenating
values of keys that are present in multiple dicts.
"""
keys = set([k for d in ldicts for k in d.keys()])
jpoint = {}
for k in keys:
jvar = []
for d in ldicts:
jvar.append(d[k])
jpoint[k] = num.array(jvar)
return jpoint
[docs]
def check_point_keys(point, phrase):
"""
Searches point keys for a phrase, returns list of keys with the phrase.
"""
from fnmatch import fnmatch
keys = list(point.keys())
contains = False
contained_keys = []
for k in keys:
if fnmatch(k, phrase):
contains = True
contained_keys.append(k)
return contains, contained_keys
[docs]
def update_source(source, **point):
"""
Update source keeping stf and source params separate.
Modifies input source Object!
Parameters
----------
source : :class:`pyrocko.gf.seismosizer.Source`
point : dict
:func:`pymc.model.Point`
"""
for k, v in point.items():
if k not in source.keys():
if source.stf is not None:
try:
source.stf[k] = float(v)
except (KeyError, TypeError):
logger.warning("Not updating source with %s" % k)
else:
raise AttributeError(
"Please set a STF before updating its" " parameters."
)
else:
source[k] = float(v)
[docs]
def setup_logging(project_dir, levelname, logfilename="BEAT_log.txt"):
"""
Setup function for handling BEAT logging. The logfile 'BEAT_log.txt' is
saved in the 'project_dir'.
Parameters
----------
project_dir : str
absolute path to the output directory for the Log file
levelname : str
defining the level of logging
"""
levels = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
filename = os.path.join(project_dir, logfilename)
logger = logging.getLogger()
# remove existing handlers
for handler in logger.handlers:
logger.removeHandler(handler)
# setup file handler
fhandler = logging.FileHandler(filename=filename, mode="a")
fformatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
fhandler.setFormatter(fformatter)
fhandler.setLevel(levels[levelname])
logger.addHandler(fhandler)
# setup screen handler
console = logging.StreamHandler()
console.setLevel(levels[levelname])
cformatter = logging.Formatter("%(name)-12s - %(levelname)-8s %(message)s")
console.setFormatter(cformatter)
logger.addHandler(console)
logger.setLevel(levels[levelname])
[docs]
def search_catalog(date, min_magnitude, dayrange=1.0):
"""
Search the gcmt catalog for the specified date (+- 1 day), filtering the
events with given magnitude threshold.
Parameters
----------
date : str
'YYYY-MM-DD', date of the event
min_magnitude : float
approximate minimum Mw of the event
dayrange : float
temporal search interval [days] around date
Returns
-------
event : :class:`pyrocko.model.Event`
"""
gcmt = catalog.GlobalCMT()
time_s = util.stt(date + " " + seconds_str)
d1 = time_s - (dayrange * (sphr * hrpd))
d2 = time_s + (dayrange * (sphr * hrpd))
logger.info(
"Getting relevant events from the gCMT catalog for the dates:"
"%s - %s \n" % (util.tts(d1), util.tts(d2))
)
events = gcmt.get_events((d1, d2), magmin=min_magnitude)
if len(events) < 1:
logger.warn("Found no event information in the gCMT catalog.")
event = None
if len(events) > 1:
logger.info(
"More than one event from that date with specified magnitude "
"found! Please copy the relevant event information to the "
"configuration file!"
)
for event in events:
print(event)
event = events[0]
elif len(events) == 1:
event = events[0]
return event
[docs]
def RS_dipvector(source):
"""
Get 3 dimensional dip-vector of a planar fault.
Parameters
----------
source : RectangularSource
Returns
-------
:class:`numpy.ndarray`
"""
return num.array(
[
num.cos(source.dip * d2r) * num.cos(source.strike * d2r),
-num.cos(source.dip * d2r) * num.sin(source.strike * d2r),
num.sin(source.dip * d2r),
]
)
def strike_vector(strike, order="ENZ"):
if order == "ENZ":
return num.array([num.sin(strike * d2r), num.cos(strike * d2r), 0.0])
elif order == "NEZ":
return num.array([num.cos(strike * d2r), num.sin(strike * d2r), 0.0])
else:
raise Exception("Order %s not implemented!" % order)
[docs]
def RS_strikevector(source):
"""
Get 3 dimensional strike-vector of a planar fault.
Parameters
----------
source : RedctangularSource
Returns
-------
:class:`numpy.ndarray`
"""
return strike_vector(source.strike)
[docs]
def RS_center(source):
"""
Get 3d fault center coordinates. Depth attribute is top depth!
Parameters
----------
source : RedctangularSource
Returns
-------
:class:`numpy.ndarray` with x, y, z coordinates of the center of the
fault
"""
return num.array(
[source.east_shift, source.north_shift, source.depth]
) + 0.5 * source.width * RS_dipvector(source)
[docs]
def adjust_fault_reference(source, input_depth="top"):
"""
Adjusts source depth and east/north-shifts variables of fault according to
input_depth mode 'top/center'.
Parameters
----------
source : :class:`RectangularSource` or :class:`pscmp.RectangularSource` or
:class:`pyrocko.gf.seismosizer.RectangularSource`
input_depth : string
if 'top' the depth in the source is interpreted as top depth
if 'center' the depth in the source is interpreted as center depth
Returns
-------
Updated input source object
"""
if input_depth == "top":
center = RS_center(source)
elif input_depth == "center":
center = num.array([source.east_shift, source.north_shift, source.depth])
else:
raise Exception("input_depth %s not supported!" % input_depth)
source.east_shift = float(center[0])
source.north_shift = float(center[1])
source.depth = float(center[2])
[docs]
def dump_objects(outpath, outlist):
"""
Dump objects in outlist into pickle file.
Parameters
----------
outpath : str
absolute path and file name for the file to be stored
outlist : list
of objects to save pickle
"""
with open(outpath, "wb") as f:
pickle.dump(outlist, f, protocol=4)
[docs]
def load_objects(loadpath):
"""
Load (unpickle) saved (pickled) objects from specified loadpath.
Parameters
----------
loadpath : absolute path and file name to the file to be loaded
Returns
-------
objects : list
of saved objects
"""
try:
objects = pickle.load(open(loadpath, "rb"))
except UnicodeDecodeError:
objects = pickle.load(open(loadpath, "rb"), encoding="latin1")
except IOError:
raise Exception("File %s does not exist!" % loadpath)
return objects
[docs]
def ensure_cov_psd(cov):
"""
Ensure that the input covariance matrix is positive definite.
If not, find the nearest positive semi-definite matrix.
Parameters
----------
cov : :class:`numpy.ndarray`
symmetric covariance matrix
Returns
-------
cov : :class:`numpy.ndarray`
positive definite covariance matrix
"""
try:
num.linalg.cholesky(cov)
except num.linalg.LinAlgError:
logger.debug("Cov_pv not positive definite!" " Finding nearest psd matrix...")
cov = repair_covariance(cov)
return cov
[docs]
def near_psd(x, epsilon=num.finfo(num.float64).eps):
"""
Calculates the nearest positive semi-definite matrix for a correlation/
covariance matrix
Parameters
----------
x : :class:`numpy.ndarray`
Covariance/correlation matrix
epsilon : float
Eigenvalue limit
here set to accuracy of numbers in numpy, otherwise the resulting
matrix, likely is still not going to be positive definite
Returns
-------
near_cov : :class:`numpy.ndarray`
closest positive definite covariance/correlation matrix
Notes
-----
Numpy number precision not high enough to resolve this for low valued
covariance matrixes! The result will have very small negative eigvals!!!
See repair_covariance below for a simpler implementation that can resolve
the numbers!
Algorithm after Rebonato & Jaekel 1999
"""
if min(num.linalg.eigvals(x)) > epsilon:
return x
# Removing scaling factor of covariance matrix
n = x.shape[0]
scaling = num.sqrt(num.diag(x))
a, b = num.meshgrid(scaling, scaling)
y = x / (a * b)
# getting the nearest correlation matrix
eigval, eigvec = num.linalg.eigh(y)
val = num.maximum(eigval, epsilon)
vec = num.matrix(eigvec)
T = 1.0 / (num.multiply(vec, vec) * val.T)
T = num.matrix(num.sqrt(num.diag(num.array(T).reshape((n)))))
B = T * vec * num.diag(num.array(num.sqrt(val)).reshape((n)))
near_corr = num.array(B * B.T)
# returning the scaling factors
return near_corr * a * b
[docs]
def repair_covariance(x, epsilon=num.finfo(num.float64).eps):
"""
Make covariance input matrix A positive definite.
Setting eigenvalues that are lower than the of numpy floats to
at least that precision and backtransform.
Parameters
----------
x : :class:`numpy.ndarray`
Covariance/correlation matrix
epsilon : float
Eigenvalue limit
here set to accuracy of numbers in numpy, otherwise the resulting
matrix, likely is still not going to be positive definite
Returns
-------
near_cov : :class:`numpy.ndarray`
closest positive definite covariance/correlation matrix
Notes
-----
Algorithm after Gilbert Strange, 'Introduction to linear Algebra'
"""
eigval, eigvec = num.linalg.eigh(x)
val = num.maximum(eigval, epsilon)
return eigvec.dot(num.diag(val)).dot(eigvec.T)
[docs]
def running_window_rms(data, window_size, mode="valid"):
"""
Calculate the standard deviations of a running window over data.
Parameters
----------
data : :class:`numpy.ndarray` 1-d
containing data to calculate stds from
window_size : int
sample size of running window
mode : str
see numpy.convolve for modes
Returns
-------
:class:`numpy.ndarray` 1-d
with stds, size data.size - window_size + 1
"""
data2 = num.power(data, 2)
window = num.ones(window_size) / float(window_size)
return num.sqrt(num.convolve(data2, window, mode))
[docs]
def slice2string(slice_obj):
"""
Wrapper for better formatted string method for slices.
Returns
-------
str
"""
if isinstance(slice_obj, slice):
if slice_obj.step:
return "{}:{}:{}".format(slice_obj.start, slice_obj.stop, slice_obj.step)
else:
return "{}:{}".format(slice_obj.start, slice_obj.stop)
else:
return slice_obj
[docs]
def list2string(any_list, fill=", "):
"""
Convert list of string to single string.
Parameters
----------
l: list
of strings
"""
return fill.join("%s" % slice2string(listentry) for listentry in any_list)
[docs]
def string2slice(slice_string):
"""
Convert string of slice form to python slice object.
Parameters
----------
slice_string: str
of form "0:2" i.e. two integer numbers separated by colon
"""
return slice(*[int(idx) for idx in slice_string.split(":")])
[docs]
def unique_list(any_list):
"""
Find unique entries in list and return them in a list.
Keeps variable order.
Parameters
----------
l : list
Returns
-------
list with only unique elements
"""
used = []
return [x for x in any_list if x not in used and (used.append(x) or True)]
[docs]
def join_models(global_model, crustal_model):
"""
Replace the part of the 'global model' that is covered by 'crustal_model'.
Parameters
----------
global_model : :class:`pyrocko.cake.LayeredModel`
crustal_model : :class:`pyrocko.cake.LayeredModel`
Returns
-------
joined_model : cake.LayeredModel
"""
max_depth = crustal_model.max("z")
cut_model = global_model.extract(depth_min=max_depth)
joined_model = copy.deepcopy(crustal_model)
for element in cut_model.elements():
joined_model.append(element)
return joined_model
[docs]
def split_off_list(any_list, off_length):
"""
Split a list with length 'off_length' from the beginning of an input
list l.
Modifies input list!
Parameters
----------
l : list
of objects to be separated
off_length : int
number of elements from l to be split off
Returns
-------
list
"""
return [any_list.pop(0) for i in range(off_length)]
[docs]
def mod_i(i, cycle):
"""
Calculates modulus of a function and returns number of full cycles and the
rest.
Parameters
----------
i : int or float
Number to be cycled over
cycle : int o float
Cycle length
Returns
-------
fullc : int or float depending on input
rest : int or float depending on input
"""
fullc = i // cycle
rest = i % cycle
return fullc, rest
[docs]
def biggest_common_divisor(a, b):
"""
Find the biggest common divisor of two float numbers a and b.
Parameters
----------
a, b: float
Returns
-------
int
"""
while b > 0:
rest = a % b
a = b
b = rest
return int(a)
[docs]
def gather(any_list, key, sort=None, filter=None):
"""
Return dictionary of input l grouped by key.
"""
d = {}
for x in any_list:
if filter is not None and not filter(x):
continue
k = key(x)
if k not in d:
d[k] = []
d[k].append(x)
if sort is not None:
for v in d.values():
v.sort(key=sort)
return d
[docs]
def get_fit_indexes(llk):
"""
Find indexes of various likelihoods in a likelihood distribution.
Parameters
----------
llk : :class:`numpy.ndarray`
Returns
-------
dict with array indexes
"""
mean_idx = (num.abs(llk - llk.mean())).argmin()
min_idx = (num.abs(llk - llk.min())).argmin()
max_idx = (num.abs(llk - llk.max())).argmin()
posterior_idxs = {"mean": mean_idx, "min": min_idx, "max": max_idx}
return posterior_idxs
[docs]
def check_hyper_flag(problem):
"""
Check problem setup for type of model standard/hyperparameters.
Parameters
----------
:class:`models.Problem`
Returns
-------
flag : boolean
"""
if os.path.basename(problem.outfolder) == "hypers":
return True
else:
return False
[docs]
def error_not_whole(f, errstr=""):
"""
Test if float is a whole number, if not raise Error.
"""
if f.is_integer():
return int(f)
else:
raise ValueError("%s : %f is not a whole number!" % (errstr, f))
def scalar2floatX(a, floatX=tconfig.floatX):
if floatX == "float32":
return num.float32(a)
elif floatX == "float64":
return num.float64(a)
def scalar2int(a, floatX=tconfig.floatX):
if floatX == "float32":
return num.int16(a)
elif floatX == "float64":
return num.int64(a)
[docs]
def PsGrnArray2LayeredModel(psgrn_input_path):
"""
Read PsGrn Input file and return velocity model.
Parameters
----------
psgrn_input_path : str
Absolute path to the psgrn input file.
Returns
-------
:class:`LayeredModel`
"""
a = num.loadtxt(psgrn_input_path, skiprows=136)
b = a[:, 1:-1]
b[:, 3] /= 1000.0
return LayeredModel.from_scanlines(
read_nd_model_str(
re.sub(
"[\[\]]",
"",
num.array2string(
b, precision=4, formatter={"float_kind": lambda x: "%.3f" % x}
),
)
)
)
[docs]
def swap_columns(array, index1, index2):
"""
Swaps the column of the input array based on the given indexes.
"""
array[:, index1], array[:, index2] = array[:, index2], array[:, index1].copy()
return array
[docs]
def line_intersect(e1, e2, n1, n2):
"""
Get intersection point of n-lines.
Parameters
----------
end points of each line in (n x 2) arrays
e1 : :class:`numpy.array` (n x 2)
east coordinates of first line
e2 : :class:`numpy.array` (n x 2)
east coordinates of second line
n1 : :class:`numpy.array` (n x 2)
north coordinates of first line
n2 : :class:`numpy.array` (n x 2)
east coordinates of second line
Returns
-------
:class:`numpy.array` (n x 2) of intersection points (easts, norths)
"""
perp = num.array([[0, -1], [1, 0]])
de = num.atleast_2d(e2 - e1)
dn = num.atleast_2d(n2 - n1)
dp = num.atleast_2d(e1 - n1)
dep = num.dot(de, perp)
denom = num.sum(dep * dn, axis=1)
if denom == 0:
logger.warn("Lines are parallel! No intersection point!")
return None
tmp = num.sum(dep * dp, axis=1)
return num.atleast_2d(tmp / denom).T * dn + n1
[docs]
def get_rotation_matrix(axes=["x", "y", "z"]):
"""
Return a function for 3-d rotation matrix for a specified axis.
Parameters
----------
axes : str or list of str
x, y or z for the axis
Returns
-------
func that takes an angle [rad]
"""
ax_avail = ["x", "y", "z"]
for ax in axes:
if ax not in ax_avail:
raise TypeError(
"Rotation axis %s not supported!"
" Available axes: %s" % (ax, list2string(ax_avail))
)
def rotx(angle):
cos_angle = num.cos(angle)
sin_angle = num.sin(angle)
return num.array(
[[1, 0, 0], [0, cos_angle, -sin_angle], [0, sin_angle, cos_angle]],
dtype="float64",
)
def roty(angle):
cos_angle = num.cos(angle)
sin_angle = num.sin(angle)
return num.array(
[[cos_angle, 0, sin_angle], [0, 1, 0], [-sin_angle, 0, cos_angle]],
dtype="float64",
)
def rotz(angle):
cos_angle = num.cos(angle)
sin_angle = num.sin(angle)
return num.array(
[[cos_angle, -sin_angle, 0], [sin_angle, cos_angle, 0], [0, 0, 1]],
dtype="float64",
)
R = {"x": rotx, "y": roty, "z": rotz}
if isinstance(axes, list):
return R
elif isinstance(axes, str):
return R[axes]
else:
raise Exception("axis has to be either string or list of strings!")
[docs]
def positions2idxs(positions, cell_size, min_pos=0.0, backend=num, dtype="int16"):
"""
Return index to a grid with a given cell size.npatches
Parameters
----------
positions : :class:`numpy.NdArray` float
of positions [km]
cell_size : float
size of grid cells
backend : str
dtype : str
data type of returned array, default: int16
"""
return backend.round((positions - min_pos - (cell_size / 2.0)) / cell_size).astype(
dtype
)
def rotate_coords_plane_normal(coords, sf):
coords -= sf.bottom_left / km
rots = get_rotation_matrix()
rotz = coords.dot(rots["z"](d2r * -sf.strike))
roty = rotz.dot(rots["y"](d2r * -sf.dip))
roty[:, 0] *= -1.0
return roty
def get_ns_id(nslc_id):
return "{}.{}".format(nslc_id[0], nslc_id[1])
def time_method(loop=10000):
def timer_decorator(func):
@wraps(func)
def wrap_func(*args, **kwargs):
total_time = Timer(lambda: func(*args, **kwargs)).timeit(number=loop)
print(
"Method {name} run {loop} times".format(name=func.__name__, loop=loop)
)
print(
"It took: {time} s, Mean: {mean_time} s".format(
mean_time=total_time / loop, time=total_time
)
)
# return func(*args, **kwargs)
return wrap_func
return timer_decorator
def is_odd(value):
return (value & 1) == 1
def is_even(value):
return (value & 1) == 0
[docs]
def get_valid_spectrum_data(deltaf, taper_frequencies=[0, 1.0]):
"""extract valid frequency range of spectrum"""
lower_f, upper_f = taper_frequencies
lower_idx = int(num.floor(lower_f / deltaf))
upper_idx = int(num.ceil(upper_f / deltaf))
return lower_idx, upper_idx
[docs]
def get_data_radiant(data):
"""
Data needs to be [n, 2]
"""
return num.arctan2(
data[:, 1].max() - data[:, 1].min(), data[:, 0].max() - data[:, 0].min()
)
[docs]
def find_elbow(data, theta=None, rotate_left=False):
"""
Get point closest to turning point in data by rotating it by theta.
Adapted from:
https://datascience.stackexchange.com/questions/57122/in-elbow-curve-
how-to-find-the-point-from-where-the-curve-starts-to-rise
Parameters
----------
data : array like,
[n, 2]
theta : rotation angle
Returns
-------
Index : int
closest to elbow.
rotated_data : array-like [n, 2]
"""
if theta is None:
theta = get_data_radiant(data)
if rotate_left:
theta = 2 * num.pi - theta
# make rotation matrix
co = num.cos(theta)
si = num.sin(theta)
rotation_matrix = num.array(((co, -si), (si, co)))
# rotate data vector
rotated_data = data.dot(rotation_matrix)
return rotated_data[:, 1].argmin(), rotated_data
[docs]
class StencilOperator(Object):
h = Float.T(default=0.1, help="step size left and right of the reference value")
order = Int.T(default=3, help="number of points of central differences")
def __init__(self, **kwargs):
stencil_order = kwargs["order"]
if stencil_order not in [3, 5]:
raise ValueError(
"Only stencil orders 3 and 5 implemented."
" Requested: %i" % stencil_order
)
self._coeffs = {3: num.array([1.0, -1.0]), 5: num.array([1.0, 8.0, -8.0, -1.0])}
self._denominator = {3: 2.0, 5: 12.0}
self._hsteps = {3: num.array([-1, 1]), 5: num.array([-2, -1, 1, 2])}
Object.__init__(self, **kwargs)
@property
def coefficients(self):
coeffs = self._coeffs[self.order]
return coeffs.reshape((coeffs.size, 1, 1))
def __len__(self):
return self.coefficients.size
@property
def denominator(self):
return self._denominator[self.order] * self.h
@property
def hsteps(self):
return self._hsteps[self.order] * self.h
[docs]
def distances(points, ref_points):
"""
Calculate distances in Cartesian coordinates between points and reference
points in N-D.
Parameters
----------
points: :class:`numpy.Ndarray` (n points x n spatial dimensions)
ref_points: :class:`numpy.Ndarray` (m points x n spatial dimensions)
Returns
-------
ndarray (n_points x n_ref_points)
"""
nref_points = ref_points.shape[0]
ndim = points.shape[1]
ndim_ref = ref_points.shape[1]
if ndim != ndim_ref:
raise TypeError(
"Coordinates to calculate differences must have the same number "
"of dimensions! Given dimensions are {} and {}".format(ndim, ndim_ref)
)
points_rep = num.tile(points, nref_points).reshape(
points.shape[0], nref_points, ndim
)
distances = num.sqrt(num.power(points_rep - ref_points, 2).sum(axis=2))
return distances