Source code for theanof

"""
Package for wrapping various functions into Theano-Ops to be able to include
them into theano graphs as is needed by the pymc3 models.

Far future:
    include a 'def grad:' -method to each Op in order to enable the use of
    gradient based optimization algorithms
"""
import copy
import logging
from collections import OrderedDict

import numpy as num
import theano
import theano.tensor as tt
from pymc3.model import FreeRV
from pyrocko.trace import nextpow2

from beat import heart, interseismic, utility
from beat.fast_sweeping import fast_sweep

km = 1000.0
logger = logging.getLogger("theanof")


[docs]class GeoSynthesizer(theano.Op): """ Theano wrapper for a geodetic forward model with synthetic displacements. Uses pyrocko engine and fomosto GF stores. Input order does not matter anymore! Did in previous version. Parameters ---------- engine : :class:`pyrocko.gf.seismosizer.LocalEngine` sources : List containing :class:`pyrocko.gf.seismosizer.Source` Objects targets : List containing :class:`pyrocko.gf.targets.StaticTarget` Objects """ __props__ = ("engine", "sources", "targets") def __init__(self, engine, sources, targets): self.engine = engine self.sources = tuple(sources) self.targets = tuple(targets) self.nobs = sum([target.lats.size for target in self.targets]) def __getstate__(self): self.engine.close_cashed_stores() return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
[docs] def make_node(self, inputs): """ Transforms theano tensors to node and allocates variables accordingly. Parameters ---------- inputs : dict keys being strings of source attributes of the :class:`pscmp.RectangularSource` that was used to initialise the Operator values are :class:`theano.tensor.Tensor` """ inlist = [] self.varnames = list(inputs.keys()) for i in inputs.values(): inlist.append(tt.as_tensor_variable(i)) outm = tt.as_tensor_variable(num.zeros((2, 2))) outlist = [outm.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): """ Perform method of the Operator to calculate synthetic displacements. Parameters ---------- inputs : list of :class:`numpy.ndarray` output : list 1) of synthetic waveforms of :class:`numpy.ndarray` (n x nsamples) 2) of start times of the first waveform samples :class:`numpy.ndarray` (n x 1) """ synths = output[0] point = {vname: i for vname, i in zip(self.varnames, inputs)} mpoint = utility.adjust_point_units(point) source_points = utility.split_point(mpoint) for i, source in enumerate(self.sources): utility.update_source(source, **source_points[i]) # reset source time may result in store error otherwise source.time = 0.0 synths[0] = heart.geo_synthetics( engine=self.engine, targets=self.targets, sources=self.sources, outmode="stacked_array", )
def infer_shape(self, node, input_shapes): return [(self.nobs, 3)]
[docs]class GeoLayerSynthesizerPsCmp(theano.Op): """ Theano wrapper for a geodetic forward model for static observation points. Direct call to PsCmp, needs PsGrn Greens Function store! Deprecated, currently not used in composites. Parameters ---------- lats : n x 1 :class:`numpy.ndarray` with latitudes of observation points lons : n x 1 :class:`numpy.ndarray` with longitudes of observation points store_superdir : str with absolute path to the GF store super directory crust_ind : int with the index to the GF store sources : :class:`pscmp.RectangularSource` to be used in generating the synthetic displacements """ __props__ = ("lats", "lons", "store_superdir", "crust_ind", "sources") def __init__(self, lats, lons, store_superdir, crust_ind, sources): self.lats = tuple(lats) self.lons = tuple(lons) self.store_superdir = store_superdir self.crust_ind = crust_ind self.sources = tuple(sources) def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
[docs] def make_node(self, inputs): """ Transforms theano tensors to node and allocates variables accordingly. Parameters ---------- inputs : dict keys being strings of source attributes of the :class:`pscmp.RectangularSource` that was used to initialise the Operator values are :class:`theano.tensor.Tensor` """ inlist = [] self.varnames = list(inputs.keys()) for i in inputs.values(): inlist.append(tt.as_tensor_variable(i)) out = tt.as_tensor_variable(num.zeros((2, 2))) outlist = [out.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): """ Perform method of the Operator to calculate synthetic displacements. Parameters ---------- inputs : list of :class:`numpy.ndarray` output : list of synthetic displacements of :class:`numpy.ndarray` (n x 1) """ z = output[0] point = {vname: i for vname, i in zip(self.varnames, inputs)} point = utility.adjust_point_units(point) source_points = utility.split_point(point) for i, source in enumerate(self.sources): source.update(**source_points[i]) z[0] = heart.geo_layer_synthetics_pscmp( store_superdir=self.store_superdir, crust_ind=self.crust_ind, lons=self.lons, lats=self.lats, sources=self.sources, )
def infer_shape(self, node, input_shapes): return [(len(self.lats), 3)]
[docs]class GeoInterseismicSynthesizer(theano.Op): """ Theano wrapper to transform the parameters of block model to parameters of a fault. """ __props__ = ("lats", "lons", "engine", "targets", "sources", "reference") def __init__(self, lats, lons, engine, targets, sources, reference): self.lats = tuple(lats) self.lons = tuple(lons) self.engine = engine self.targets = tuple(targets) self.sources = tuple(sources) self.reference = reference def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
[docs] def make_node(self, inputs): """ Transforms theano tensors to node and allocates variables accordingly. Parameters ---------- inputs : dict keys being strings of source attributes of the :class:`pyrocko.gf.seismosizer.RectangularSource` that was used to initialise the Operator. values are :class:`theano.tensor.Tensor` """ inlist = [] self.fixed_values = {} self.varnames = [] for k, v in inputs.items(): if isinstance(v, FreeRV): self.varnames.append(k) inlist.append(tt.as_tensor_variable(v)) else: self.fixed_values[k] = v out = tt.as_tensor_variable(num.zeros((2, 2))) outlist = [out.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): """ Perform method of the Operator to calculate synthetic displacements. Parameters ---------- inputs : list of :class:`numpy.ndarray` output : list of synthetic displacements of :class:`numpy.ndarray` (n x 3) """ z = output[0] point = {vname: i for vname, i in zip(self.varnames, inputs)} point.update(self.fixed_values) point = utility.adjust_point_units(point) spoint, bpoint = interseismic.seperate_point(point) source_points = utility.split_point(spoint) for i, source_point in enumerate(source_points): self.sources[i].update(**source_point) z[0] = interseismic.geo_backslip_synthetics( engine=self.engine, targets=self.targets, sources=self.sources, lons=num.array(self.lons), lats=num.array(self.lats), reference=self.reference, **bpoint ) def infer_shape(self, node, input_shapes): return [(len(self.lats), 3)]
[docs]class SeisSynthesizer(theano.Op): """ Theano wrapper for a seismic forward model with synthetic waveforms. Input order does not matter anymore! Did in previous version. Parameters ---------- engine : :class:`pyrocko.gf.seismosizer.LocalEngine` sources : List containing :class:`pyrocko.gf.seismosizer.Source` Objects targets : List containing :class:`pyrocko.gf.seismosizer.Target` Objects arrival_taper : :class:`heart.ArrivalTaper` arrival_times : :class:`ǹumpy.NdArray` with synthetic arrival times wrt reference event filterer : :class:`heart.Filterer` """ __props__ = ( "engine", "sources", "targets", "event", "arrival_taper", "arrival_times", "wavename", "filterer", "pre_stack_cut", "station_corrections", "domain", ) def __init__( self, engine, sources, targets, event, arrival_taper, arrival_times, wavename, filterer, pre_stack_cut, station_corrections, domain, ): self.engine = engine self.sources = tuple(sources) self.targets = tuple(targets) self.event = event self.arrival_taper = arrival_taper self.arrival_times = tuple(arrival_times.tolist()) self.wavename = wavename self.filterer = tuple(filterer) self.pre_stack_cut = pre_stack_cut self.station_corrections = station_corrections self.domain = domain self.sample_rate = self.engine.get_store( self.targets[0].store_id ).config.sample_rate if self.domain == "spectrum": nsamples = nextpow2(self.arrival_taper.nsamples(self.sample_rate)) taper_frequencies = heart.filterer_minmax(filterer) deltaf = self.sample_rate / nsamples self.valid_spectrum_indices = utility.get_valid_spectrum_data( deltaf, taper_frequencies ) def __getstate__(self): self.engine.close_cashed_stores() return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
[docs] def make_node(self, inputs): """ Transforms theano tensors to node and allocates variables accordingly. Parameters ---------- inputs : dict keys being strings of source attributes of the :class:`pscmp.RectangularSource` that was used to initialise the Operator values are :class:`theano.tensor.Tensor` """ inlist = [] self.varnames = list(inputs.keys()) for i in inputs.values(): inlist.append(tt.as_tensor_variable(i)) outm = tt.as_tensor_variable(num.zeros((2, 2))) outv = tt.as_tensor_variable(num.zeros((2))) outlist = [outm.type(), outv.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): """ Perform method of the Operator to calculate synthetic displacements. Parameters ---------- inputs : list of :class:`numpy.ndarray` output : list 1) of synthetic waveforms of :class:`numpy.ndarray` (n x nsamples) 2) of start times of the first waveform samples :class:`numpy.ndarray` (n x 1) """ synths = output[0] tmins = output[1] point = {vname: i for vname, i in zip(self.varnames, inputs)} mpoint = utility.adjust_point_units(point) if self.station_corrections: time_shifts = mpoint.pop("time_shift").ravel() arrival_times = num.array(self.arrival_times) + time_shifts else: arrival_times = num.array(self.arrival_times) source_points = utility.split_point(mpoint) for i, source in enumerate(self.sources): utility.update_source(source, **source_points[i]) source.time += self.event.time synthetics, tmins[0] = heart.seis_synthetics( engine=self.engine, sources=self.sources, targets=self.targets, arrival_taper=self.arrival_taper, wavename=self.wavename, filterer=self.filterer, pre_stack_cut=self.pre_stack_cut, arrival_times=arrival_times, ) if self.domain == "time": synths[0] = synthetics elif self.domain == "spectrum": synths[0] = heart.fft_transforms( time_domain_signals=synthetics, valid_spectrum_indices=self.valid_spectrum_indices, outmode="array", pad_to_pow2=True, ) else: ValueError('Domain "%" not supported!' % self.domain)
def infer_shape(self, node, input_shapes): nrow = len(self.targets) if self.domain == "time": ncol = self.arrival_taper.nsamples(self.sample_rate) elif self.domain == "spectrum": ncol = self.valid_spectrum_indices[1] - self.valid_spectrum_indices[0] return [(nrow, ncol), (nrow,)]
[docs]class PolaritySynthesizer(theano.Op): __props__ = ("engine", "source", "pmap", "is_location_fixed", "always_raytrace") def __init__(self, engine, source, pmap, is_location_fixed, always_raytrace): self.engine = engine self.source = source self.pmap = pmap self.is_location_fixed = is_location_fixed self.always_raytrace = always_raytrace def __getstate__(self): self.engine.close_cashed_stores() return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
[docs] def make_node(self, inputs): inlist = [] self.varnames = list(inputs.keys()) for i in inputs.values(): inlist.append(tt.as_tensor_variable(i)) out = tt.as_tensor_variable(num.zeros((2))) outlist = [out.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): synths = output[0] point = {vname: i for vname, i in zip(self.varnames, inputs)} mpoint = utility.adjust_point_units(point) source_points = utility.split_point(mpoint) utility.update_source(self.source, **source_points[self.pmap.config.event_idx]) if not self.is_location_fixed: self.pmap.update_targets( self.engine, self.source, always_raytrace=self.always_raytrace, check=False, ) self.pmap.update_radiation_weights() synths[0] = heart.pol_synthetics( self.source, radiation_weights=self.pmap.get_radiation_weights() )
def infer_shape(self, node, input_shapes): return [(self.pmap.n_t,)]
[docs]class SeisDataChopper(theano.Op): """ Deprecated! """ __props__ = ("sample_rate", "traces", "arrival_taper", "filterer") def __init__(self, sample_rate, traces, arrival_taper, filterer): self.sample_rate = sample_rate self.traces = tuple(traces) self.arrival_taper = arrival_taper self.filterer = tuple(filterer)
[docs] def make_node(self, *inputs): inlist = [] for i in inputs: inlist.append(tt.as_tensor_variable(i)) outm = tt.as_tensor_variable(num.zeros((2, 2))) outlist = [outm.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): tmins = inputs[0] z = output[0] z[0] = heart.taper_filter_traces( self.traces, self.arrival_taper, self.filterer, tmins, outmode="array" )
def infer_shape(self, node, input_shapes): nrow = len(self.traces) ncol = self.arrival_taper.nsamples(self.sample_rate) return [(nrow, ncol)]
[docs]class Sweeper(theano.Op): """ Theano Op for C implementation of the fast sweep algorithm. Parameters ---------- patch_size : float size of fault patches [km] n_patch_strike : int number of patches in strike direction n_patch_dip : int number of patches in dip-direction """ __props__ = ("patch_size", "n_patch_dip", "n_patch_strike", "implementation") def __init__(self, patch_size, n_patch_dip, n_patch_strike, implementation): self.patch_size = num.float64(patch_size) self.n_patch_dip = n_patch_dip self.n_patch_strike = n_patch_strike self.implementation = implementation
[docs] def make_node(self, *inputs): inlist = [] for i in inputs: inlist.append(tt.as_tensor_variable(i)) outv = tt.as_tensor_variable(num.zeros((2))) outlist = [outv.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): """ Return start-times of rupturing patches with respect to given hypocenter. Parameters ---------- slownesses : float, vector inverse of the rupture velocity across each patch nuc_dip : int, scalar rupture nucleation point on the fault in dip-direction, index to patch nuc_strike : int, scalar rupture nucleation point on the fault in strike-direction, index to patch Returns ------- starttimes : float, vector Notes ----- Here we call the C-implementation on purpose with swapped strike and dip directions, because we need the fault dipping in row directions of the array. The C-implementation has it along columns!!! """ slownesses, nuc_dip, nuc_strike = inputs z = output[0] logger.debug("Fast sweeping ..%s." % self.implementation) if self.implementation == "c": # z[0] = fast_sweep.fast_sweep_ext.fast_sweep( slownesses, self.patch_size, int(nuc_dip), int(nuc_strike), self.n_patch_dip, self.n_patch_strike, ) elif self.implementation == "numpy": z[0] = fast_sweep.get_rupture_times_numpy( slownesses.reshape((self.n_patch_dip, self.n_patch_strike)), self.patch_size, self.n_patch_strike, self.n_patch_dip, nuc_strike, nuc_dip, ).flatten() else: raise NotImplementedError( "Fast sweeping for implementation %s not" " implemented!" % self.implementation ) logger.debug("Done sweeping!")
def infer_shape(self, node, input_shapes): return [(self.n_patch_dip * self.n_patch_strike,)]
[docs]class EulerPole(theano.Op): """ Theano Op for rotation of geodetic observations around Euler Pole. Parameters ---------- lats : float, vector of Latitudes [deg] of points to be rotated lons : float, vector of Longitudes [deg] of points to be rotated data_mask : bool, vector of indexes to points to be masked """ __props__ = ("lats", "lons", "data_mask") def __init__(self, lats, lons, data_mask): self.lats = tuple(lats) self.lons = tuple(lons) self.data_mask = tuple(data_mask)
[docs] def make_node(self, inputs): inlist = [] self.fixed_values = OrderedDict() self.varnames = [] for k, v in inputs.items(): varname = k.split("_")[-1] # split of dataset naming if isinstance(v, FreeRV): self.varnames.append(varname) inlist.append(tt.as_tensor_variable(v)) else: self.fixed_values[varname] = v outv = tt.as_tensor_variable(num.zeros((2, 2))) outlist = [outv.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): z = output[0] point = {vname: i for vname, i in zip(self.varnames, inputs)} point.update(self.fixed_values) pole_lat = point["lat"] # pole parameters pole_lon = point["lon"] omega = point["omega"] velocities = heart.velocities_from_pole( num.array(self.lats), num.array(self.lons), pole_lat, pole_lon, omega ) if self.data_mask: velocities[num.array(self.data_mask), :] = 0.0 z[0] = velocities
def infer_shape(self, node, input_shapes): return [(len(self.lats), 3)]
[docs]class StrainRateTensor(theano.Op): """ TheanoOp for internal block deformation through 2d area strain rate tensor. Parameters ---------- lats : float, vector of Latitudes [deg] of points to be strained lons : float, vector of Longitudes [deg] of points to be strained data_mask : bool, vector of indexes to points to be masked """ __props__ = ("lats", "lons", "data_mask") def __init__(self, lats, lons, data_mask): self.lats = tuple(lats) self.lons = tuple(lons) self.data_mask = tuple(data_mask) self.ndata = len(self.lats) station_idxs = [ station_idx for station_idx in range(self.ndata) if station_idx not in data_mask ] self.station_idxs = tuple(station_idxs)
[docs] def make_node(self, inputs): inlist = [] self.fixed_values = OrderedDict() self.varnames = [] for k, v in inputs.items(): varname = k.split("_")[-1] # split of dataset naming if isinstance(v, FreeRV): self.varnames.append(varname) inlist.append(tt.as_tensor_variable(v)) else: self.fixed_values[varname] = v outv = tt.as_tensor_variable(num.zeros((2, 2))) outlist = [outv.type()] return theano.Apply(self, inlist, outlist)
[docs] def perform(self, node, inputs, output): z = output[0] point = {vname: i for vname, i in zip(self.varnames, inputs)} point.update(self.fixed_values) exx = point["exx"] # tensor params eyy = point["eyy"] exy = point["exy"] rotation = point["rotation"] valid = num.array(self.station_idxs) v_xyz = heart.velocities_from_strain_rate_tensor( num.array(self.lats)[valid], num.array(self.lons)[valid], exx=exx, eyy=eyy, exy=exy, rotation=rotation, ) v_xyz_all = num.zeros((self.ndata, 3)) v_xyz_all[valid, :] = v_xyz z[0] = v_xyz_all
def infer_shape(self, node, input_shapes): return [(self.ndata, 3)]