Source code for grond.targets.waveform.target

# https://pyrocko.org/grond - GPLv3
#
# The Grond Developers, 21st Century
import logging
import math
import numpy as num

from pyrocko import gf, trace, weeding, util
from pyrocko.guts import (Object, String, Float, Bool, Int, StringChoice,
                          Timestamp, List, Dict)
from pyrocko.guts_array import Array
from pyrocko.gui.snuffler.marker import PhaseMarker

from grond.dataset import NotFound
from grond.meta import GrondError, store_t, nslcs_to_patterns

from ..base import (MisfitConfig, MisfitTarget, MisfitResult, TargetGroup)
from grond.meta import has_get_plot_classes

from pyrocko import crust2x2
from string import Template

guts_prefix = 'grond'
logger = logging.getLogger('grond.targets.waveform.target')

g_differentiation_response_1 = trace.DifferentiationResponse(1)
g_differentiation_response_2 = trace.DifferentiationResponse(2)


class StoreIDSelectorError(GrondError):
    pass


class StoreIDSelector(Object):
    '''
    Base class for GF store selectors.

    GF store selectors can be implemented to select different stores, based on
    station location, source location or other characteristics.
    '''

    pass


class Crust2StoreIDSelector(StoreIDSelector):
    '''
    Store ID selector picking CRUST 2.0 model based on event location.
    '''

    template = String.T(
        help="Template for the GF store ID. For example ``'crust2_${id}'`` "
             "where ``'${id}'`` will be replaced with the corresponding CRUST "
             "2.0 profile identifier for the source location.")

    def get_store_id(self, event, st, cha):
        s = Template(self.template)
        return s.substitute(id=(
            crust2x2.get_profile(event.lat, event.lon)._ident).lower())


class StationDictStoreIDSelector(StoreIDSelector):
    '''
    Store ID selector using a manual station to store ID mapping.
    '''

    mapping = Dict.T(
        String.T(), gf.StringID.T(),
        help='Dictionary with station to store ID pairs, keys are NET.STA. '
             "Add a fallback store ID under the key ``'others'``.")

    def get_store_id(self, event, st, cha):
        try:
            store_id = self.mapping['%s.%s' % (st.network, st.station)]
        except KeyError:
            try:
                store_id = self.mapping['others']
            except KeyError:
                raise StoreIDSelectorError(
                    'No store ID found for station "%s.%s".' % (
                        st.network, st.station))

        return store_id


class DepthRangeToStoreID(Object):
    depth_min = Float.T()
    depth_max = Float.T()
    store_id = gf.StringID.T()


class StationDepthStoreIDSelector(StoreIDSelector):
    '''
    Store ID selector using a mapping from station depth range to store ID.
    '''

    depth_ranges = List.T(DepthRangeToStoreID.T())

    def get_store_id(self, event, st, cha):
        for r in self.depth_ranges:
            if r.depth_min <= st.depth < r.depth_max:
                return r.store_id

        raise StoreIDSelectorError(
            'No store ID found for station "%s.%s" at %g m depth.' % (
                st.network, st.station, st.depth))


class DomainChoice(StringChoice):
    choices = [
        'time_domain',
        'frequency_domain',
        'log_frequency_domain',
        'envelope',
        'absolute',
        'cc_max_norm']


[docs] class WaveformMisfitConfig(MisfitConfig): quantity = gf.QuantityType.T(default='displacement') fmin = Float.T(default=0.0, help='minimum frequency of bandpass filter') fmax = Float.T(help='maximum frequency of bandpass filter') ffactor = Float.T(default=1.5) tmin = gf.Timing.T( optional=True, help='Start of main time window used for waveform fitting.') tmax = gf.Timing.T( optional=True, help='End of main time window used for waveform fitting.') tfade = Float.T( optional=True, help='Decay time of taper prepended and appended to main time window ' 'used for waveform fitting [s].') pick_synthetic_traveltime = gf.Timing.T( optional=True, help='Synthetic phase arrival definition for alignment of observed ' 'and synthetic traces.') pick_phasename = String.T( optional=True, help='Name of picked phase for alignment of observed and synthetic ' 'traces.') domain = DomainChoice.T( default='time_domain', help='Type of data characteristic to be fitted.\n\nAvailable choices ' 'are: %s' % ', '.join("``'%s'``" % s for s in DomainChoice.choices)) norm_exponent = Int.T( default=2, help='Exponent to use in norm (1: L1-norm, 2: L2-norm)') tautoshift_max = Float.T( default=0.0, help='If non-zero, allow synthetic and observed traces to be shifted ' 'against each other by up to +/- the given value [s].') autoshift_penalty_max = Float.T( default=0.0, help='If non-zero, a penalty misfit is added for non-zero shift ' 'values.\n\nThe penalty value is computed as ' '``autoshift_penalty_max * normalization_factor * tautoshift**2 ' '/ tautoshift_max**2``') ranges = {} def get_full_frequency_range(self): return self.fmin / self.ffactor, self.fmax * self.ffactor
def log_exclude(target, reason): logger.debug('Excluding potential target %s: %s' % ( target.string_id(), reason))
[docs] class WaveformTargetGroup(TargetGroup): '''Handles seismogram targets or other targets of dynamic ground motion. ''' distance_min = Float.T( optional=True, help='excludes targets nearer to source, along a great circle') distance_max = Float.T( optional=True, help='excludes targets farther from source, along a great circle') distance_3d_min = Float.T( optional=True, help='excludes targets nearer from source (direct distance)') distance_3d_max = Float.T( optional=True, help='excludes targets farther from source (direct distance)') depth_min = Float.T( optional=True, help='excludes targets with smaller depths') depth_max = Float.T( optional=True, help='excludes targets with larger depths') include = List.T( String.T(), optional=True, help='If not None, list of stations/components to include according ' 'to their STA, NET.STA, NET.STA.LOC, or NET.STA.LOC.CHA codes.') exclude = List.T( String.T(), help='Stations/components to be excluded according to their STA, ' 'NET.STA, NET.STA.LOC, or NET.STA.LOC.CHA codes.') only_use_stations_with_picks = Bool.T( optional=True, default=False, help='Choose if targets without arrival picks should be excluded ' '(True) or should be used without time shifts (False).') limit = Int.T(optional=True) channels = List.T( String.T(), optional=True, help="set channels to include, e.g. ['Z', 'T']") misfit_config = WaveformMisfitConfig.T() store_id_selector = StoreIDSelector.T( optional=True, help='select GF store based on event-station geometry.') def get_targets(self, ds, event, default_path='none'): logger.debug('Selecting waveform targets...') origin = event targets = [] stations = ds.get_stations() if len(stations) == 0: logger.warning( 'No stations found to create waveform target group.') for st in ds.get_stations(): logger.debug('Selecting waveforms for station %s.%s.%s' % st.nsl()) for cha in self.channels: nslc = st.nsl() + (cha,) logger.debug('Selecting waveforms for %s.%s.%s.%s' % nslc) if self.store_id_selector: store_id = self.store_id_selector.get_store_id( event, st, cha) else: store_id = self.store_id logger.debug('Selecting waveforms for %s.%s.%s.%s' % nslc) if self.misfit_config.quantity in [ 'displacement', 'velocity', 'acceleration']: quantity = 'displacement' elif self.misfit_config.quantity in [ 'rotation_displacement', 'rotation_velocity', 'rotation_acceleration']: quantity = 'rotation_displacement' else: quantity = self.misfit_config.quantity target = WaveformMisfitTarget( quantity=quantity, codes=nslc, lat=st.lat, lon=st.lon, north_shift=st.north_shift, east_shift=st.east_shift, depth=st.depth, interpolation=self.interpolation, store_id=store_id, misfit_config=self.misfit_config, manual_weight=self.weight, normalisation_family=self.normalisation_family, path=self.path or default_path, only_use_stations_with_picks=self.only_use_stations_with_picks) # noqa if ds.is_excluded(nslc): log_exclude(target, 'excluded by dataset') continue if util.match_nslc( nslcs_to_patterns(self.exclude), nslc): log_exclude(target, 'excluded by target group') continue if self.include is not None and not util.match_nslc( nslcs_to_patterns(self.include), nslc): log_exclude(target, 'excluded by target group') continue if self.distance_min is not None and \ target.distance_to(origin) < self.distance_min: log_exclude(target, 'distance < distance_min') continue if self.distance_max is not None and \ target.distance_to(origin) > self.distance_max: log_exclude(target, 'distance > distance_max') continue if self.distance_3d_min is not None and \ target.distance_3d_to(origin) < self.distance_3d_min: log_exclude(target, 'distance_3d < distance_3d_min') continue if self.distance_3d_max is not None and \ target.distance_3d_to(origin) > self.distance_3d_max: log_exclude(target, 'distance_3d > distance_3d_max') continue if self.depth_min is not None and \ target.depth < self.depth_min: log_exclude(target, 'depth < depth_min') continue if self.depth_max is not None and \ target.depth > self.depth_max: log_exclude(target, 'depth > depth_max') continue if self.only_use_stations_with_picks: all_picks_nsl_ids = set( p.nslc_ids[0][:3] for p in ds.pick_markers if isinstance(p, PhaseMarker)) if st.nsl() not in all_picks_nsl_ids: log_exclude(target, 'no pick for nsl') continue azi, _ = target.azibazi_to(origin) if cha == 'R': target.azimuth = azi - 180. target.dip = 0. elif cha == 'T': target.azimuth = azi - 90. target.dip = 0. elif cha == 'Z': target.azimuth = 0. target.dip = -90. target.set_dataset(ds) targets.append(target) if self.limit: return weed(origin, targets, self.limit)[0] else: return targets def is_gf_store_appropriate(self, store, depth_range): ok = TargetGroup.is_gf_store_appropriate( self, store, depth_range) ok &= self._is_gf_store_appropriate_check_extent( store, depth_range) ok &= self._is_gf_store_appropriate_check_sample_rate( store, depth_range) return ok
class TraceSpectrum(Object): network = String.T() station = String.T() location = String.T() channel = String.T() deltaf = Float.T(default=1.0) fmin = Float.T(default=0.0) ydata = Array.T( shape=(None,), dtype=num.complex128, serialize_dtype='<c16', serialize_as='base64+meta') def get_ydata(self): return self.ydata def get_xdata(self): return self.fmin + num.arange(self.ydata.size) * self.deltaf class WaveformPiggybackSubtarget(Object): piggy_id = Int.T() _next_piggy_id = 0 @classmethod def new_piggy_id(cls): piggy_id = WaveformPiggybackSubtarget._next_piggy_id WaveformPiggybackSubtarget._next_piggy_id += 1 return piggy_id def __init__(self, piggy_id=None, **kwargs): if piggy_id is None: piggy_id = self.new_piggy_id() Object.__init__(self, piggy_id=piggy_id, **kwargs) def evaluate( self, tr_proc_obs, trspec_proc_obs, tr_proc_syn, trspec_proc_syn): raise NotImplementedError() class WaveformPiggybackSubresult(Object): piggy_id = Int.T()
[docs] class WaveformMisfitResult(gf.Result, MisfitResult): '''Carries the observations for a target and corresponding synthetics. A number of different waveform or phase representations are possible. ''' processed_obs = trace.Trace.T(optional=True) processed_syn = trace.Trace.T(optional=True) filtered_obs = trace.Trace.T(optional=True) filtered_syn = trace.Trace.T(optional=True) spectrum_obs = TraceSpectrum.T(optional=True) spectrum_syn = TraceSpectrum.T(optional=True) taper = trace.Taper.T(optional=True) tobs_shift = Float.T(optional=True) tsyn_pick = Timestamp.T(optional=True) tshift = Float.T(optional=True) cc = trace.Trace.T(optional=True) piggyback_subresults = List.T(WaveformPiggybackSubresult.T())
[docs] @has_get_plot_classes class WaveformMisfitTarget(gf.Target, MisfitTarget): flip_norm = Bool.T(default=False) misfit_config = WaveformMisfitConfig.T() only_use_stations_with_picks = Bool.T(default=False, optional=True) can_bootstrap_weights = True def __init__(self, **kwargs): gf.Target.__init__(self, **kwargs) MisfitTarget.__init__(self, **kwargs) self._piggyback_subtargets = [] def string_id(self): return '.'.join(x for x in (self.path,) + self.codes) @classmethod def get_plot_classes(cls): from . import plot plots = super(WaveformMisfitTarget, cls).get_plot_classes() plots.extend(plot.get_plot_classes()) return plots def get_combined_weight(self): if self._combined_weight is None: w = self.manual_weight for analyser in self.analyser_results.values(): w *= analyser.weight self._combined_weight = num.array([w], dtype=float) return self._combined_weight def get_taper_params(self, engine, source): store = engine.get_store(self.store_id) config = self.misfit_config tmin_fit = source.time + store_t(store, config.tmin, source, self) tmax_fit = source.time + store_t(store, config.tmax, source, self) if config.fmin > 0.0: tfade = 1.0/config.fmin else: tfade = 1.0/config.fmax if config.tfade is None: tfade_taper = tfade else: tfade_taper = config.tfade return tmin_fit, tmax_fit, tfade, tfade_taper def get_backazimuth_for_waveform(self): return backazimuth_for_waveform(self.azimuth, self.codes) @property def backazimuth(self): return self.azimuth - 180. def get_freqlimits(self): config = self.misfit_config return ( config.fmin/config.ffactor, config.fmin, config.fmax, config.fmax*config.ffactor) def get_pick_shift(self, engine, source): config = self.misfit_config tobs = None tsyn = None ds = self.get_dataset() if config.pick_synthetic_traveltime and config.pick_phasename: store = engine.get_store(self.store_id) tsyn = source.time + store.t( config.pick_synthetic_traveltime, source, self) marker = ds.get_pick( source.name, self.codes[:3], config.pick_phasename) if marker: tobs = marker.tmin return tobs, tsyn def get_cutout_timespan(self, tmin, tmax, tfade): if self.misfit_config.fmin > 0: tinc_obs = 1.0 / self.misfit_config.fmin else: tinc_obs = 10.0 / self.misfit_config.fmax tmin_obs = (math.floor( (tmin - tfade) / tinc_obs) - 1.0) * tinc_obs tmax_obs = (math.ceil( (tmax + tfade) / tinc_obs) + 1.0) * tinc_obs return tmin_obs, tmax_obs def post_process(self, engine, source, tr_syn): tr_syn = tr_syn.pyrocko_trace() nslc = self.codes config = self.misfit_config tmin_fit, tmax_fit, tfade, tfade_taper = \ self.get_taper_params(engine, source) ds = self.get_dataset() tobs, tsyn = self.get_pick_shift(engine, source) if None not in (tobs, tsyn): tobs_shift = tobs - tsyn else: if self.only_use_stations_with_picks: err = 'No pick available for target: %s' % self.path logger.debug(err) raise gf.SeismosizerError(err) else: tobs_shift = 0.0 tr_syn.extend( tmin_fit - tfade * 2.0, tmax_fit + tfade * 2.0, fillmethod='repeat') freqlimits = self.get_freqlimits() if config.quantity in ('displacement', 'rotation_displacement'): syn_resp = None elif config.quantity in ('velocity', 'rotation_velocity'): syn_resp = g_differentiation_response_1 elif config.quantity in ('acceleration', 'rotation_acceleration'): syn_resp = g_differentiation_response_2 else: syn_resp = None tr_syn = tr_syn.transfer( freqlimits=freqlimits, tfade=tfade, transfer_function=syn_resp) tr_syn.chop(tmin_fit - 2*tfade, tmax_fit + 2*tfade) tmin_obs, tmax_obs = self.get_cutout_timespan( tmin_fit+tobs_shift, tmax_fit+tobs_shift, tfade) try: tr_obs = ds.get_waveform( nslc, quantity=config.quantity, tinc_cache=1.0/(config.fmin or 0.1*config.fmax), tmin=tmin_fit+tobs_shift-tfade, tmax=tmax_fit+tobs_shift+tfade, tfade=tfade, freqlimits=freqlimits, deltat=tr_syn.deltat, cache=True, backazimuth=self.get_backazimuth_for_waveform()) if tobs_shift != 0.0: tr_obs = tr_obs.copy() tr_obs.shift(-tobs_shift) mr = misfit( tr_obs, tr_syn, taper=trace.CosTaper( tmin_fit - tfade_taper, tmin_fit, tmax_fit, tmax_fit + tfade_taper), domain=config.domain, exponent=config.norm_exponent, flip=self.flip_norm, result_mode=self._result_mode, tautoshift_max=config.tautoshift_max, autoshift_penalty_max=config.autoshift_penalty_max, subtargets=self._piggyback_subtargets) self._piggyback_subtargets = [] mr.tobs_shift = float(tobs_shift) mr.tsyn_pick = float_or_none(tsyn) return mr except (NotFound, util.UnavailableDecimation) as e: logger.debug(str(e)) raise gf.SeismosizerError('No waveform data: %s' % str(e)) def get_plain_targets(self, engine, source): d = dict( (k, getattr(self, k)) for k in gf.Target.T.propnames) return [gf.Target(**d)] def add_piggyback_subtarget(self, subtarget): self._piggyback_subtargets.append(subtarget)
def misfit( tr_obs, tr_syn, taper, domain, exponent, tautoshift_max, autoshift_penalty_max, flip, result_mode='sparse', subtargets=[]): ''' Calculate misfit between observed and synthetic trace. :param tr_obs: observed trace as :py:class:`pyrocko.trace.Trace` :param tr_syn: synthetic trace as :py:class:`pyrocko.trace.Trace` :param taper: taper applied in timedomain as :py:class:`pyrocko.trace.Taper` :param domain: how to calculate difference, see :py:class:`DomainChoice` :param exponent: exponent of Lx type norms :param tautoshift_max: if non-zero, return lowest misfit when traces are allowed to shift against each other by up to +/- ``tautoshift_max`` :param autoshift_penalty_max: if non-zero, a penalty misfit is added for for non-zero shift values. The penalty value is ``autoshift_penalty_max * normalization_factor * \ tautoshift**2 / tautoshift_max**2`` :param flip: ``bool``, if set to ``True``, normalization factor is computed against *tr_syn* rather than *tr_obs* :param result_mode: ``'full'``, include traces and spectra or ``'sparse'``, include only misfit and normalization factor in result :returns: object of type :py:class:`WaveformMisfitResult` ''' trace.assert_same_sampling_rate(tr_obs, tr_syn) deltat = tr_obs.deltat tmin, tmax = taper.time_span() tr_proc_obs, trspec_proc_obs = _process(tr_obs, tmin, tmax, taper, domain) tr_proc_syn, trspec_proc_syn = _process(tr_syn, tmin, tmax, taper, domain) piggyback_results = [] for subtarget in subtargets: piggyback_results.append( subtarget.evaluate( tr_proc_obs, trspec_proc_obs, tr_proc_syn, trspec_proc_syn)) tshift = None ctr = None deltat = tr_proc_obs.deltat if domain in ('time_domain', 'envelope', 'absolute'): a, b = tr_proc_syn.ydata, tr_proc_obs.ydata if flip: b, a = a, b nshift_max = max(0, min(a.size-1, int(math.floor(tautoshift_max / deltat)))) if nshift_max == 0: m, n = trace.Lx_norm(a, b, norm=exponent) else: mns = [] for ishift in range(-nshift_max, nshift_max+1): if ishift < 0: a_cut = a[-ishift:] b_cut = b[:ishift] elif ishift == 0: a_cut = a b_cut = b elif ishift > 0: a_cut = a[:-ishift] b_cut = b[ishift:] mns.append(trace.Lx_norm(a_cut, b_cut, norm=exponent)) ms, ns = num.array(mns).T iarg = num.argmin(ms) tshift = (iarg-nshift_max)*deltat m, n = ms[iarg], ns[iarg] m += autoshift_penalty_max * n * tshift**2 / tautoshift_max**2 elif domain == 'cc_max_norm': ctr = trace.correlate( tr_proc_syn, tr_proc_obs, mode='same', normalization='normal') tshift, cc_max = ctr.max() m = 0.5 - 0.5 * cc_max n = 0.5 elif domain == 'frequency_domain': a, b = trspec_proc_syn.ydata, trspec_proc_obs.ydata if flip: b, a = a, b m, n = trace.Lx_norm(num.abs(a), num.abs(b), norm=exponent) elif domain == 'log_frequency_domain': a, b = trspec_proc_syn.ydata, trspec_proc_obs.ydata if flip: b, a = a, b a = num.abs(a) b = num.abs(b) eps = (num.mean(a) + num.mean(b)) * 1e-7 if eps == 0.0: eps = 1e-7 a = num.log(a + eps) b = num.log(b + eps) m, n = trace.Lx_norm(a, b, norm=exponent) if result_mode == 'full': result = WaveformMisfitResult( misfits=num.array([[m, n]], dtype=float), processed_obs=tr_proc_obs, processed_syn=tr_proc_syn, filtered_obs=tr_obs.copy(), filtered_syn=tr_syn, spectrum_obs=trspec_proc_obs, spectrum_syn=trspec_proc_syn, taper=taper, tshift=tshift, cc=ctr) elif result_mode == 'sparse': result = WaveformMisfitResult( misfits=num.array([[m, n]], dtype=float)) else: assert False result.piggyback_subresults = piggyback_results return result def _extend_extract(tr, tmin, tmax): deltat = tr.deltat itmin_frame = int(math.floor(tmin/deltat)) itmax_frame = int(math.ceil(tmax/deltat)) nframe = itmax_frame - itmin_frame + 1 n = tr.data_len() a = num.empty(nframe, dtype=float) itmin_tr = int(round(tr.tmin / deltat)) itmax_tr = itmin_tr + n icut1 = min(max(0, itmin_tr - itmin_frame), nframe) icut2 = min(max(0, itmax_tr - itmin_frame), nframe) icut1_tr = min(max(0, icut1 + itmin_frame - itmin_tr), n) icut2_tr = min(max(0, icut2 + itmin_frame - itmin_tr), n) a[:icut1] = tr.ydata[0] a[icut1:icut2] = tr.ydata[icut1_tr:icut2_tr] a[icut2:] = tr.ydata[-1] tr = tr.copy(data=False) tr.tmin = itmin_frame * deltat tr.set_ydata(a) return tr def _process(tr, tmin, tmax, taper, domain): tr_proc = _extend_extract(tr, tmin, tmax) tr_proc.taper(taper) df = None trspec_proc = None if domain == 'envelope': tr_proc = tr_proc.envelope(inplace=False) tr_proc.set_ydata(num.abs(tr_proc.get_ydata())) elif domain == 'absolute': tr_proc.set_ydata(num.abs(tr_proc.get_ydata())) elif domain in ('frequency_domain', 'log_frequency_domain'): ndata = tr_proc.ydata.size nfft = trace.nextpow2(ndata) padded = num.zeros(nfft, dtype=float) padded[:ndata] = tr_proc.ydata spectrum = num.fft.rfft(padded) df = 1.0 / (tr_proc.deltat * nfft) trspec_proc = TraceSpectrum( network=tr_proc.network, station=tr_proc.station, location=tr_proc.location, channel=tr_proc.channel, deltaf=df, fmin=0.0, ydata=spectrum) return tr_proc, trspec_proc def backazimuth_for_waveform(azimuth, nslc): if nslc[-1] == 'R': backazimuth = azimuth + 180. elif nslc[-1] == 'T': backazimuth = azimuth + 90. else: backazimuth = None return backazimuth def float_or_none(x): if x is None: return x else: return float(x) def weed(origin, targets, limit, neighborhood=3): azimuths = num.zeros(len(targets)) dists = num.zeros(len(targets)) for i, target in enumerate(targets): _, azimuths[i] = target.azibazi_to(origin) dists[i] = target.distance_to(origin) badnesses = num.ones(len(targets), dtype=float) deleted, meandists_kept = weeding.weed( azimuths, dists, badnesses, nwanted=limit, neighborhood=neighborhood) targets_weeded = [ target for (delete, target) in zip(deleted, targets) if not delete] return targets_weeded, meandists_kept, deleted __all__ = ''' StoreIDSelectorError StoreIDSelector Crust2StoreIDSelector StationDictStoreIDSelector DepthRangeToStoreID StationDepthStoreIDSelector WaveformTargetGroup WaveformMisfitConfig WaveformMisfitTarget WaveformMisfitResult WaveformPiggybackSubtarget WaveformPiggybackSubresult '''.split()