"""
Package for wrapping various functions into pytensor-Ops to be able to include
them into pytensor graphs as is needed by the pymc models.
Far future:
include a 'def grad:' -method to each Op in order to enable the use of
gradient based optimization algorithms
"""
import logging
from collections import OrderedDict
import numpy as num
import pytensor.tensor as tt
from pyrocko.gf import LocalEngine
from pyrocko.trace import nextpow2
from pytensor.graph import Apply
from beat import heart, utility
from beat.fast_sweeping import fast_sweep
km = 1000.0
logger = logging.getLogger("pytensorf")
[docs]
class GeoSynthesizer(tt.Op):
"""
pytensor wrapper for a geodetic forward model with synthetic displacements.
Uses pyrocko engine and fomosto GF stores.
Input order does not matter anymore! Did in previous version.
Parameters
----------
engine : :class:`pyrocko.gf.seismosizer.LocalEngine`
sources : List
containing :class:`pyrocko.gf.seismosizer.Source` Objects
targets : List
containing :class:`pyrocko.gf.targets.StaticTarget` Objects
mapping : Dict
variable names and list of integers how they map to source objects
"""
__props__ = ("engine", "sources", "targets", "mapping")
def __init__(self, engine, sources, targets, mapping):
if isinstance(engine, LocalEngine):
self.outmode = "stacked_array"
else:
self.outmode = "array"
self.engine = engine
self.sources = tuple(sources)
self.targets = tuple(targets)
self.nobs = sum([target.lats.size for target in self.targets])
self.mapping = mapping
def __getstate__(self):
if isinstance(self.engine, LocalEngine):
self.engine.close_cashed_stores()
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)
[docs]
def make_node(self, inputs):
"""
Transforms pytensor tensors to node and allocates variables accordingly.
Parameters
----------
inputs : dict
keys being strings of source attributes of the
:class:`pscmp.RectangularSource` that was used to initialise
the Operator
values are :class:`pytensor.tensor.Tensor`
"""
inlist = []
self.varnames = list(inputs.keys())
for i in inputs.values():
inlist.append(tt.as_tensor_variable(i))
outm_shape = self.infer_shape()[0]
outm = tt.as_tensor_variable(num.zeros(outm_shape))
outlist = [outm.type()]
return Apply(self, inlist, outlist)
def infer_shape(self, fgraph=None, node=None, input_shapes=None):
return [(self.nobs, 3)]
[docs]
class SeisSynthesizer(tt.Op):
"""
pytensor wrapper for a seismic forward model with synthetic waveforms.
Input order does not matter anymore! Did in previous version.
Parameters
----------
engine : :class:`pyrocko.gf.seismosizer.LocalEngine`
sources : List
containing :class:`pyrocko.gf.seismosizer.Source` Objects
targets : List
containing :class:`pyrocko.gf.seismosizer.Target` Objects
arrival_taper : :class:`heart.ArrivalTaper`
arrival_times : :class:`ǹumpy.NdArray`
with synthetic arrival times wrt reference event
filterer : :class:`heart.Filterer`
"""
__props__ = (
"engine",
"sources",
"mapping",
"targets",
"event",
"arrival_taper",
"arrival_times",
"wavename",
"filterer",
"pre_stack_cut",
"station_corrections",
"domain",
)
def __init__(
self,
engine,
sources,
mapping,
targets,
event,
arrival_taper,
arrival_times,
wavename,
filterer,
pre_stack_cut,
station_corrections,
domain,
):
self.engine = engine
self.sources = tuple(sources)
self.targets = tuple(targets)
self.event = event
self.arrival_taper = arrival_taper
self.arrival_times = tuple(arrival_times.tolist())
self.wavename = wavename
self.filterer = tuple(filterer)
self.pre_stack_cut = pre_stack_cut
self.station_corrections = station_corrections
self.domain = domain
self.sample_rate = self.engine.get_store(
self.targets[0].store_id
).config.sample_rate
self.mapping = mapping
if self.domain == "spectrum":
nsamples = nextpow2(self.arrival_taper.nsamples(self.sample_rate))
taper_frequencies = heart.filterer_minmax(filterer)
deltaf = self.sample_rate / nsamples
self.valid_spectrum_indices = utility.get_valid_spectrum_data(
deltaf, taper_frequencies
)
def __getstate__(self):
self.engine.close_cashed_stores()
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)
[docs]
def make_node(self, inputs):
"""
Transforms pytensor tensors to node and allocates variables accordingly.
Parameters
----------
inputs : dict
keys being strings of source attributes of the
:class:`pscmp.RectangularSource` that was used to initialise
the Operator
values are :class:`pytensor.tensor.Tensor`
"""
inlist = []
self.varnames = list(inputs.keys())
for i in inputs.values():
inlist.append(tt.as_tensor_variable(i))
outm_shape, outv_shape = self.infer_shape()
outm = tt.as_tensor_variable(num.zeros(outm_shape))
outv = tt.as_tensor_variable(num.zeros(outv_shape))
outlist = [outm.type(), outv.type()]
return Apply(self, inlist, outlist)
def infer_shape(self, fgraph=None, node=None, input_shapes=None):
nrow = len(self.targets)
if self.domain == "time":
ncol = self.arrival_taper.nsamples(self.sample_rate)
elif self.domain == "spectrum":
ncol = self.valid_spectrum_indices[1] - self.valid_spectrum_indices[0]
return [(nrow, ncol), (nrow,)]
[docs]
class PolaritySynthesizer(tt.Op):
__props__ = ("engine", "source", "pmap", "is_location_fixed", "always_raytrace")
def __init__(self, engine, source, pmap, is_location_fixed, always_raytrace):
self.engine = engine
self.source = source
self.pmap = pmap
self.is_location_fixed = is_location_fixed
self.always_raytrace = always_raytrace
# TODO check if mutli-source-type refactoring did not break anything
def __getstate__(self):
self.engine.close_cashed_stores()
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)
[docs]
def make_node(self, inputs):
inlist = []
self.varnames = list(inputs.keys())
for i in inputs.values():
inlist.append(tt.as_tensor_variable(i))
outv_shape = self.infer_shape()[0]
outv = tt.as_tensor_variable(num.zeros(outv_shape))
outlist = [outv.type()]
return Apply(self, inlist, outlist)
def infer_shape(self, fgraph=None, node=None, input_shapes=None):
return [(self.pmap.n_t,)]
[docs]
class SeisDataChopper(tt.Op):
"""
Deprecated!
"""
__props__ = ("sample_rate", "traces", "arrival_taper", "filterer")
def __init__(self, sample_rate, traces, arrival_taper, filterer):
self.sample_rate = sample_rate
self.traces = tuple(traces)
self.arrival_taper = arrival_taper
self.filterer = tuple(filterer)
[docs]
def make_node(self, *inputs):
inlist = []
for i in inputs:
inlist.append(tt.as_tensor_variable(i))
outm_shape = self.infer_shape()[0]
outm = tt.as_tensor_variable(num.zeros(outm_shape))
outlist = [outm.type()]
return Apply(self, inlist, outlist)
def infer_shape(self, fgraph=None, node=None, input_shapes=None):
nrow = len(self.traces)
ncol = self.arrival_taper.nsamples(self.sample_rate)
return [(nrow, ncol)]
[docs]
class Sweeper(tt.Op):
"""
pytensor Op for C implementation of the fast sweep algorithm.
Parameters
----------
patch_size : float
size of fault patches [km]
n_patch_strike : int
number of patches in strike direction
n_patch_dip : int
number of patches in dip-direction
"""
__props__ = ("patch_size", "n_patch_dip", "n_patch_strike", "implementation")
def __init__(self, patch_size, n_patch_dip, n_patch_strike, implementation):
self.patch_size = num.float64(patch_size)
self.n_patch_dip = n_patch_dip
self.n_patch_strike = n_patch_strike
self.implementation = implementation
[docs]
def make_node(self, *inputs):
inlist = []
for i in inputs:
inlist.append(tt.as_tensor_variable(i))
outv_shape = self.infer_shape()[0]
outv = tt.as_tensor_variable(num.zeros(outv_shape))
outlist = [outv.type()]
return Apply(self, inlist, outlist)
def infer_shape(self, fgraph=None, node=None, input_shapes=None):
return [(self.n_patch_dip * self.n_patch_strike,)]
[docs]
class EulerPole(tt.Op):
"""
pytensor Op for rotation of geodetic observations around Euler Pole.
Parameters
----------
lats : float, vector
of Latitudes [deg] of points to be rotated
lons : float, vector
of Longitudes [deg] of points to be rotated
data_mask : bool, vector
of indexes to points to be masked
"""
__props__ = ("lats", "lons", "data_mask")
def __init__(self, lats, lons, data_mask):
self.lats = tuple(lats)
self.lons = tuple(lons)
self.data_mask = tuple(data_mask)
[docs]
def make_node(self, inputs):
inlist = []
self.fixed_values = OrderedDict()
self.varnames = []
for k, v in inputs.items():
varname = k.split("_")[-1] # split of dataset naming
if isinstance(v, tt.TensorVariable):
self.varnames.append(varname)
inlist.append(tt.as_tensor_variable(v))
else:
self.fixed_values[varname] = v
outm_shape = self.infer_shape()[0]
outm = tt.as_tensor_variable(num.zeros(outm_shape))
outlist = [outm.type()]
return Apply(self, inlist, outlist)
def infer_shape(self, fgraph=None, node=None, input_shapes=None):
return [(len(self.lats), 3)]
[docs]
class StrainRateTensor(tt.Op):
"""
pytensorOp for internal block deformation through 2d area strain rate tensor.
Parameters
----------
lats : float, vector
of Latitudes [deg] of points to be strained
lons : float, vector
of Longitudes [deg] of points to be strained
data_mask : bool, vector
of indexes to points to be masked
"""
__props__ = ("lats", "lons", "data_mask")
def __init__(self, lats, lons, data_mask):
self.lats = tuple(lats)
self.lons = tuple(lons)
self.data_mask = tuple(data_mask)
self.ndata = len(self.lats)
[docs]
def make_node(self, inputs):
inlist = []
self.fixed_values = OrderedDict()
self.varnames = []
for k, v in inputs.items():
varname = k.split("_")[-1] # split of dataset naming
if isinstance(v, tt.TensorVariable):
self.varnames.append(varname)
inlist.append(tt.as_tensor_variable(v))
else:
self.fixed_values[varname] = v
outm_shape = self.infer_shape()[0]
outm = tt.as_tensor_variable(num.zeros(outm_shape))
outlist = [outm.type()]
return Apply(self, inlist, outlist)
def infer_shape(self, fgraph=None, node=None, input_shapes=None):
return [(self.ndata, 3)]