Source code for pyrocko.trace

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
'''This module provides basic signal processing for seismic traces.'''
from __future__ import division, absolute_import

from builtins import zip
from builtins import map
from builtins import range
from builtins import str as newstr

import time
import math
import copy
import logging

import numpy as num
from scipy import signal

from . import util, evalresp, orthodrome, pchain, model
from .util import reuse, hpfloat, UnavailableDecimation
from .guts import Object, Float, Int, String, Complex, Tuple, List, \
    StringChoice
from .guts_array import Array


UnavailableDecimation  # noqa

guts_prefix = 'pf'

logger = logging.getLogger('pyrocko.trace')


[docs]class Trace(object): ''' Create new trace object. A ``Trace`` object represents a single continuous strip of evenly sampled time series data. It is built from a 1D NumPy array containing the data samples and some attributes describing its beginning and ending time, its sampling rate and four string identifiers (its network, station, location and channel code). :param network: network code :param station: station code :param location: location code :param channel: channel code :param tmin: system time of first sample in [s] :param tmax: system time of last sample in [s] (if set to ``None`` it is computed from length of ``ydata``) :param deltat: sampling interval in [s] :param ydata: 1D numpy array with data samples (can be ``None`` when ``tmax`` is not ``None``) :param mtime: optional modification time :param meta: additional meta information (not used, but maintained by the library) The length of the network, station, location and channel codes is not resricted by this software, but data formats like SAC, Mini-SEED or GSE have different limits on the lengths of these codes. The codes set here are silently truncated when the trace is stored ''' cached_frequencies = {} def __init__(self, network='', station='STA', location='', channel='', tmin=0., tmax=None, deltat=1., ydata=None, mtime=None, meta=None): self._growbuffer = None if deltat < 0.001: tmin = hpfloat(tmin) if tmax is not None: tmax = hpfloat(tmax) if mtime is None: mtime = time.time() self.network, self.station, self.location, self.channel = [ reuse(x) for x in (network, station, location, channel)] self.tmin = tmin self.deltat = deltat if tmax is None: if ydata is not None: self.tmax = self.tmin + (ydata.size-1)*self.deltat else: raise Exception( 'fixme: trace must be created with tmax or ydata') else: n = int(round((tmax - self.tmin) / self.deltat)) + 1 self.tmax = self.tmin + (n - 1) * self.deltat self.meta = meta self.ydata = ydata self.mtime = mtime self._update_ids() self.file = None self._pchain = None def __str__(self): fmt = min(9, max(0, -int(math.floor(math.log10(self.deltat))))) s = 'Trace (%s, %s, %s, %s)\n' % self.nslc_id s += ' timerange: %s - %s\n' % ( util.time_to_str(self.tmin, format=fmt), util.time_to_str(self.tmax, format=fmt)) s += ' delta t: %g\n' % self.deltat if self.meta: for k in sorted(self.meta.keys()): s += ' %s: %s\n' % (k, self.meta[k]) return s def __getstate__(self): return (self.network, self.station, self.location, self.channel, self.tmin, self.tmax, self.deltat, self.mtime, self.ydata, self.meta) def __setstate__(self, state): if len(state) == 10: self.network, self.station, self.location, self.channel, \ self.tmin, self.tmax, self.deltat, self.mtime, \ self.ydata, self.meta = state else: # backward compatibility with old behaviour self.network, self.station, self.location, self.channel, \ self.tmin, self.tmax, self.deltat, self.mtime = state self.ydata = None self.meta = None self._growbuffer = None self._update_ids()
[docs] def name(self): ''' Get a short string description. ''' s = '%s.%s.%s.%s, %s, %s' % (self.nslc_id + ( util.time_to_str(self.tmin), util.time_to_str(self.tmax))) return s
def __eq__(self, other): return ( self.network == other.network and self.station == other.station and self.location == other.location and self.channel == other.channel and (abs(self.deltat - other.deltat) < (self.deltat + other.deltat)*1e-6) and abs(self.tmin-other.tmin) < self.deltat*0.01 and abs(self.tmax-other.tmax) < self.deltat*0.01 and num.all(self.ydata == other.ydata)) def almost_equal(self, other, rtol=1e-5, atol=0.0): return ( self.network == other.network and self.station == other.station and self.location == other.location and self.channel == other.channel and (abs(self.deltat - other.deltat) < (self.deltat + other.deltat)*1e-6) and abs(self.tmin-other.tmin) < self.deltat*0.01 and abs(self.tmax-other.tmax) < self.deltat*0.01 and num.allclose(self.ydata, other.ydata, rtol=rtol, atol=atol)) def assert_almost_equal(self, other, rtol=1e-5, atol=0.0): assert self.network == other.network, \ 'network codes differ: %s, %s' % (self.network, other.network) assert self.station == other.station, \ 'station codes differ: %s, %s' % (self.station, other.station) assert self.location == other.location, \ 'location codes differ: %s, %s' % (self.location, other.location) assert self.channel == other.channel, 'channel codes differ' assert (abs(self.deltat - other.deltat) < (self.deltat + other.deltat)*1e-6), \ 'sampling intervals differ %g, %g' % (self.deltat, other.delta) assert abs(self.tmin-other.tmin) < self.deltat*0.01, \ 'start times differ: %s, %s' % ( util.time_to_str(self.tmin), util.time_to_str(other.tmin)) assert abs(self.tmax-other.tmax) < self.deltat*0.01, \ 'end times differ: %s, %s' % ( util.time_to_str(self.tmax), util.time_to_str(other.tmax)) assert num.allclose(self.ydata, other.ydata, rtol=rtol, atol=atol), \ 'trace values differ' def __hash__(self): return id(self) def __call__(self, t, clip=False, snap=round): it = int(snap((t-self.tmin)/self.deltat)) if clip: it = max(0, min(it, self.ydata.size-1)) else: if it < 0 or self.ydata.size <= it: raise IndexError() return self.tmin+it*self.deltat, self.ydata[it]
[docs] def interpolate(self, t, clip=False): ''' Value of trace between supporting points through linear interpolation. :param t: time instant :param clip: whether to clip indices to trace ends ''' t0, y0 = self(t, clip=clip, snap=math.floor) t1, y1 = self(t, clip=clip, snap=math.ceil) if t0 == t1: return y0 else: return y0+(t-t0)/(t1-t0)*(y1-y0)
[docs] def index_clip(self, i): ''' Clip index to valid range. ''' return min(max(0, i), self.ydata.size)
[docs] def add(self, other, interpolate=True, left=0., right=0.): ''' Add values of other trace (self += other). Add values of ``other`` trace to the values of ``self``, where it intersects with ``other``. This method does not change the extent of ``self``. If ``interpolate`` is ``True`` (the default), the values of ``other`` to be added are interpolated at sampling instants of ``self``. Linear interpolation is performed. In this case the sampling rate of ``other`` must be equal to or lower than that of ``self``. If ``interpolate`` is ``False``, the sampling rates of the two traces must match. ''' if interpolate: assert self.deltat <= other.deltat \ or same_sampling_rate(self, other) other_xdata = other.get_xdata() xdata = self.get_xdata() self.ydata += num.interp( xdata, other_xdata, other.ydata, left=left, right=left) else: assert self.deltat == other.deltat ioff = int(round((other.tmin-self.tmin)/self.deltat)) ibeg = max(0, ioff) iend = min(self.data_len(), ioff+other.data_len()) self.ydata[ibeg:iend] += other.ydata[ibeg-ioff:iend-ioff]
[docs] def mult(self, other, interpolate=True): ''' Muliply with values of other trace (self *= other). Multiply values of ``other`` trace to the values of ``self``, where it intersects with ``other``. This method does not change the extent of ``self``. If ``interpolate`` is ``True`` (the default), the values of ``other`` to be multiplied are interpolated at sampling instants of ``self``. Linear interpolation is performed. In this case the sampling rate of ``other`` must be equal to or lower than that of ``self``. If ``interpolate`` is ``False``, the sampling rates of the two traces must match. ''' if interpolate: assert self.deltat <= other.deltat or \ same_sampling_rate(self, other) other_xdata = other.get_xdata() xdata = self.get_xdata() self.ydata *= num.interp( xdata, other_xdata, other.ydata, left=0., right=0.) else: assert self.deltat == other.deltat ibeg1 = int(round((other.tmin-self.tmin)/self.deltat)) ibeg2 = int(round((self.tmin-other.tmin)/self.deltat)) iend1 = int(round((other.tmax-self.tmin)/self.deltat))+1 iend2 = int(round((self.tmax-other.tmin)/self.deltat))+1 ibeg1 = self.index_clip(ibeg1) iend1 = self.index_clip(iend1) ibeg2 = self.index_clip(ibeg2) iend2 = self.index_clip(iend2) self.ydata[ibeg1:iend1] *= other.ydata[ibeg2:iend2]
[docs] def max(self): ''' Get time and value of data maximum. ''' i = num.argmax(self.ydata) return self.tmin + i*self.deltat, self.ydata[i]
[docs] def min(self): ''' Get time and value of data minimum. ''' i = num.argmin(self.ydata) return self.tmin + i*self.deltat, self.ydata[i]
[docs] def absmax(self): ''' Get time and value of maximum of the absolute of data. ''' tmi, mi = self.min() tma, ma = self.max() if abs(mi) > abs(ma): return tmi, abs(mi) else: return tma, abs(ma)
[docs] def set_codes( self, network=None, station=None, location=None, channel=None): ''' Set network, station, location, and channel codes. ''' if network is not None: self.network = network if station is not None: self.station = station if location is not None: self.location = location if channel is not None: self.channel = channel self._update_ids()
def set_network(self, network): self.network = network self._update_ids() def set_station(self, station): self.station = station self._update_ids() def set_location(self, location): self.location = location self._update_ids() def set_channel(self, channel): self.channel = channel self._update_ids()
[docs] def overlaps(self, tmin, tmax): ''' Check if trace has overlap with a given time span. ''' return not (tmax < self.tmin or self.tmax < tmin)
[docs] def is_relevant(self, tmin, tmax, selector=None): ''' Check if trace has overlap with a given time span and matches a condition callback. (internal use) ''' return not (tmax <= self.tmin or self.tmax < tmin) \ and (selector is None or selector(self))
def _update_ids(self): ''' Update dependent ids. ''' self.full_id = ( self.network, self.station, self.location, self.channel, self.tmin) self.nslc_id = reuse( (self.network, self.station, self.location, self.channel)) def prune_from_reuse_cache(self): util.deuse(self.nslc_id) util.deuse(self.network) util.deuse(self.station) util.deuse(self.location) util.deuse(self.channel)
[docs] def set_mtime(self, mtime): ''' Set modification time of the trace. ''' self.mtime = mtime
[docs] def get_xdata(self): ''' Create array for time axis. ''' if self.ydata is None: raise NoData() return self.tmin \ + num.arange(len(self.ydata), dtype=num.float64) * self.deltat
[docs] def get_ydata(self): ''' Get data array. ''' if self.ydata is None: raise NoData() return self.ydata
[docs] def set_ydata(self, new_ydata): ''' Replace data array. ''' self.drop_growbuffer() self.ydata = new_ydata self.tmax = self.tmin+(len(self.ydata)-1)*self.deltat
def data_len(self): if self.ydata is not None: return self.ydata.size else: return int(round((self.tmax-self.tmin)/self.deltat)) + 1
[docs] def drop_data(self): ''' Forget data, make dataless trace. ''' self.drop_growbuffer() self.ydata = None
[docs] def drop_growbuffer(self): ''' Detach the traces grow buffer. ''' self._growbuffer = None self._pchain = None
[docs] def copy(self, data=True): ''' Make a deep copy of the trace. ''' tracecopy = copy.copy(self) tracecopy.drop_growbuffer() if data: tracecopy.ydata = self.ydata.copy() tracecopy.meta = copy.deepcopy(self.meta) return tracecopy
[docs] def crop_zeros(self): ''' Remove any zeros at beginning and end. ''' indices = num.where(self.ydata != 0.0)[0] if indices.size == 0: raise NoData() ibeg = indices[0] iend = indices[-1]+1 if ibeg == 0 and iend == self.ydata.size-1: return self.drop_growbuffer() self.ydata = self.ydata[ibeg:iend].copy() self.tmin = self.tmin+ibeg*self.deltat self.tmax = self.tmin+(len(self.ydata)-1)*self.deltat self._update_ids()
[docs] def append(self, data): ''' Append data to the end of the trace. To make this method efficient when successively very few or even single samples are appended, a larger grow buffer is allocated upon first invocation. The traces data is then changed to be a view into the currently filled portion of the grow buffer array. ''' assert self.ydata.dtype == data.dtype newlen = data.size + self.ydata.size if self._growbuffer is None or self._growbuffer.size < newlen: self._growbuffer = num.empty(newlen*2, dtype=self.ydata.dtype) self._growbuffer[:self.ydata.size] = self.ydata self._growbuffer[self.ydata.size:newlen] = data self.ydata = self._growbuffer[:newlen] self.tmax = self.tmin + (newlen-1)*self.deltat
[docs] def chop( self, tmin, tmax, inplace=True, include_last=False, snap=(round, round), want_incomplete=True): ''' Cut the trace to given time span. If the ``inplace`` argument is True (the default) the trace is cut in place, otherwise a new trace with the cut part is returned. By default, the indices where to start and end the trace data array are determined by rounding of ``tmin`` and ``tmax`` to sampling instances using Python's :py:func:`round` function. This behaviour can be changed with the ``snap`` argument, which takes a tuple of two functions (one for the lower and one for the upper end) to be used instead of :py:func:`round`. The last sample is by default not included unless ``include_last`` is set to True. If the given time span exceeds the available time span of the trace, the available part is returned, unless ``want_incomplete`` is set to False - in that case, a :py:exc:`NoData` exception is raised. This exception is always raised, when the requested time span does dot overlap with the trace's time span. ''' if want_incomplete: if tmax <= self.tmin-self.deltat or self.tmax+self.deltat < tmin: raise NoData() else: if tmin < self.tmin or self.tmax < tmax: raise NoData() ibeg = max(0, t2ind(tmin-self.tmin, self.deltat, snap[0])) iplus = 0 if include_last: iplus = 1 iend = min( self.data_len(), t2ind(tmax-self.tmin, self.deltat, snap[1])+iplus) if ibeg >= iend: raise NoData() obj = self if not inplace: obj = self.copy(data=False) self.drop_growbuffer() if self.ydata is not None: obj.ydata = self.ydata[ibeg:iend].copy() else: obj.ydata = None obj.tmin = obj.tmin+ibeg*obj.deltat obj.tmax = obj.tmin+((iend-ibeg)-1)*obj.deltat obj._update_ids() return obj
[docs] def downsample(self, ndecimate, snap=False, initials=None, demean=False): ''' Downsample trace by a given integer factor. :param ndecimate: decimation factor, avoid values larger than 8 :param snap: whether to put the new sampling instances closest to multiples of the sampling rate. :param initials: ``None``, ``True``, or initial conditions for the anti-aliasing filter, obtained from a previous run. In the latter two cases the final state of the filter is returned instead of ``None``. :param demean: whether to demean the signal before filtering. ''' newdeltat = self.deltat*ndecimate if snap: ilag = int(round( (math.ceil(self.tmin / newdeltat) * newdeltat - self.tmin) / self.deltat)) else: ilag = 0 if snap and ilag > 0 and ilag < self.ydata.size: data = self.ydata.astype(num.float64) self.tmin += ilag*self.deltat else: data = self.ydata.astype(num.float64) if demean: data -= num.mean(data) if data.size != 0: result = util.decimate( data, ndecimate, ftype='fir', zi=initials, ioff=ilag) else: result = data if initials is None: self.ydata, finals = result, None else: self.ydata, finals = result self.deltat = reuse(self.deltat*ndecimate) self.tmax = self.tmin+(len(self.ydata)-1)*self.deltat self._update_ids() return finals
[docs] def downsample_to(self, deltat, snap=False, allow_upsample_max=1, initials=None, demean=False): ''' Downsample to given sampling rate. Tries to downsample the trace to a target sampling interval of ``deltat``. This runs the :py:meth:`Trace.downsample` one or several times. If allow_upsample_max is set to a value larger than 1, intermediate upsampling steps are allowed, in order to increase the number of possible downsampling ratios. If the requested ratio is not supported, an exception of type :py:exc:`pyrocko.util.UnavailableDecimation` is raised. ''' ratio = deltat/self.deltat rratio = round(ratio) ok = False for upsratio in range(1, allow_upsample_max+1): dratio = (upsratio/self.deltat) / (1./deltat) if abs(dratio - round(dratio)) / dratio < 0.0001 and \ util.decitab(int(round(dratio))): ok = True break if not ok: raise util.UnavailableDecimation('ratio = %g' % ratio) if upsratio > 1: self.drop_growbuffer() ydata = self.ydata self.ydata = num.zeros( ydata.size*upsratio-(upsratio-1), ydata.dtype) self.ydata[::upsratio] = ydata for i in range(1, upsratio): self.ydata[i::upsratio] = \ float(i)/upsratio * ydata[:-1] \ + float(upsratio-i)/upsratio * ydata[1:] self.deltat = self.deltat/upsratio ratio = deltat/self.deltat rratio = round(ratio) deci_seq = util.decitab(int(rratio)) finals = [] for i, ndecimate in enumerate(deci_seq): if ndecimate != 1: xinitials = None if initials is not None: xinitials = initials[i] finals.append(self.downsample( ndecimate, snap=snap, initials=xinitials, demean=demean)) if initials is not None: return finals
[docs] def resample(self, deltat): ''' Resample to given sampling rate ``deltat``. Resampling is performed in the frequency domain. ''' ndata = self.ydata.size ntrans = nextpow2(ndata) fntrans2 = ntrans * self.deltat/deltat ntrans2 = int(round(fntrans2)) deltat2 = self.deltat * float(ntrans)/float(ntrans2) ndata2 = int(round(ndata*self.deltat/deltat2)) if abs(fntrans2 - ntrans2) > 1e-7: logger.warning( 'resample: requested deltat %g could not be matched exactly: ' '%g' % (deltat, deltat2)) data = self.ydata data_pad = num.zeros(ntrans, dtype=num.float) data_pad[:ndata] = data fdata = num.fft.rfft(data_pad) fdata2 = num.zeros((ntrans2+1)//2, dtype=fdata.dtype) n = min(fdata.size, fdata2.size) fdata2[:n] = fdata[:n] data2 = num.fft.irfft(fdata2) data2 = data2[:ndata2] data2 *= float(ntrans2) / float(ntrans) self.deltat = deltat2 self.set_ydata(data2)
def resample_simple(self, deltat): tyear = 3600*24*365. if deltat == self.deltat: return if abs(self.deltat - deltat) * tyear/deltat < deltat: logger.warning( 'resample_simple: less than one sample would have to be ' 'inserted/deleted per year. Doing nothing.') return ninterval = int(round(deltat / (self.deltat - deltat))) if abs(ninterval) < 20: logger.error( 'resample_simple: sample insertion/deletion interval less ' 'than 20. results would be erroneous.') raise ResamplingFailed() delete = False if ninterval < 0: ninterval = - ninterval delete = True tyearbegin = util.year_start(self.tmin) nmin = int(round((self.tmin - tyearbegin)/deltat)) ibegin = (((nmin-1)//ninterval)+1) * ninterval - nmin nindices = (len(self.ydata) - ibegin - 1) / ninterval + 1 if nindices > 0: indices = ibegin + num.arange(nindices) * ninterval data_split = num.split(self.ydata, indices) data = [] for ln, h in zip(data_split[:-1], data_split[1:]): if delete: ln = ln[:-1] data.append(ln) if not delete: if ln.size == 0: v = h[0] else: v = 0.5*(ln[-1] + h[0]) data.append(num.array([v], dtype=ln.dtype)) data.append(data_split[-1]) ydata_new = num.concatenate(data) self.tmin = tyearbegin + nmin * deltat self.deltat = deltat self.set_ydata(ydata_new)
[docs] def stretch(self, tmin_new, tmax_new): ''' Stretch signal while preserving sample rate using sinc interpolation. :param tmin_new: new time of first sample :param tmax_new: new time of last sample This method can be used to correct for a small linear time drift or to introduce sub-sample time shifts. The amount of stretching is limited to 10% by the implementation and is expected to be much smaller than that by the approximations used. ''' from pyrocko import signal_ext i_control = num.array([0, self.ydata.size-1], dtype=num.int64) t_control = num.array([tmin_new, tmax_new], dtype=num.float) r = (tmax_new - tmin_new) / self.deltat + 1.0 n_new = int(round(r)) if abs(n_new - r) > 0.001: n_new = int(math.floor(r)) assert n_new >= 2 tmax_new = tmin_new + (n_new-1) * self.deltat ydata_new = num.empty(n_new, dtype=num.float) signal_ext.antidrift(i_control, t_control, self.ydata.astype(num.float), tmin_new, self.deltat, ydata_new) self.tmin = tmin_new self.set_ydata(ydata_new) self._update_ids()
[docs] def nyquist_check(self, frequency, intro='Corner frequency', warn=True, raise_exception=False): ''' Check if a given frequency is above the Nyquist frequency of the trace. :param intro: string used to introduce the warning/error message :param warn: whether to emit a warning :param raise_exception: whether to raise an :py:exc:`AboveNyquist` exception. ''' if frequency >= 0.5/self.deltat: message = '%s (%g Hz) is equal to or higher than nyquist ' \ 'frequency (%g Hz). (Trace %s)' \ % (intro, frequency, 0.5/self.deltat, self.name()) if warn: logger.warning(message) if raise_exception: raise AboveNyquist(message)
[docs] def lowpass(self, order, corner, nyquist_warn=True, nyquist_exception=False, demean=True): ''' Apply Butterworth lowpass to the trace. :param order: order of the filter :param corner: corner frequency of the filter Mean is removed before filtering. ''' self.nyquist_check( corner, 'Corner frequency of lowpass', nyquist_warn, nyquist_exception) (b, a) = _get_cached_filter_coefs( order, [corner*2.0*self.deltat], btype='low') if len(a) != order+1 or len(b) != order+1: logger.warning( 'Erroneous filter coefficients returned by ' 'scipy.signal.butter(). You may need to downsample the ' 'signal before filtering.') data = self.ydata.astype(num.float64) if demean: data -= num.mean(data) self.drop_growbuffer() self.ydata = signal.lfilter(b, a, data)
[docs] def highpass(self, order, corner, nyquist_warn=True, nyquist_exception=False, demean=True): ''' Apply butterworth highpass to the trace. :param order: order of the filter :param corner: corner frequency of the filter Mean is removed before filtering. ''' self.nyquist_check( corner, 'Corner frequency of highpass', nyquist_warn, nyquist_exception) (b, a) = _get_cached_filter_coefs( order, [corner*2.0*self.deltat], btype='high') data = self.ydata.astype(num.float64) if len(a) != order+1 or len(b) != order+1: logger.warning( 'Erroneous filter coefficients returned by ' 'scipy.signal.butter(). You may need to downsample the ' 'signal before filtering.') if demean: data -= num.mean(data) self.drop_growbuffer() self.ydata = signal.lfilter(b, a, data)
[docs] def bandpass(self, order, corner_hp, corner_lp, demean=True): ''' Apply butterworth bandpass to the trace. :param order: order of the filter :param corner_hp: lower corner frequency of the filter :param corner_lp: upper corner frequency of the filter Mean is removed before filtering. ''' self.nyquist_check(corner_hp, 'Lower corner frequency of bandpass') self.nyquist_check(corner_lp, 'Higher corner frequency of bandpass') (b, a) = _get_cached_filter_coefs( order, [corner*2.0*self.deltat for corner in (corner_hp, corner_lp)], btype='band') data = self.ydata.astype(num.float64) if demean: data -= num.mean(data) self.drop_growbuffer() self.ydata = signal.lfilter(b, a, data)
def abshilbert(self): self.drop_growbuffer() self.ydata = num.abs(hilbert(self.ydata))
[docs] def envelope(self, inplace=True): ''' Calculate the envelope of the trace. :param inplace: calculate envelope in place The calculation follows: .. math:: Y' = \\sqrt{Y^2+H(Y)^2} where H is the Hilbert-Transform of the signal Y. ''' if inplace: self.drop_growbuffer() self.ydata = num.sqrt(self.ydata**2 + hilbert(self.ydata)**2) else: tr = self.copy(data=False) tr.ydata = num.sqrt(self.ydata**2 + hilbert(self.ydata)**2) return tr
[docs] def taper(self, taperer, inplace=True, chop=False): ''' Apply a :py:class:`Taper` to the trace. :param taperer: instance of :py:class:`Taper` subclass :param inplace: apply taper inplace :param chop: if ``True``: exclude tapered parts from the resulting trace ''' if not inplace: tr = self.copy() else: tr = self if chop: i, n = taperer.span(tr.ydata, tr.tmin, tr.deltat) tr.shift(i*tr.deltat) tr.set_ydata(tr.ydata[i:i+n]) taperer(tr.ydata, tr.tmin, tr.deltat) if not inplace: return tr
[docs] def whiten(self, order=6): ''' Whiten signal in time domain using autoregression and recursive filter. :param order: order of the autoregression process ''' b, a = self.whitening_coefficients(order) self.drop_growbuffer() self.ydata = signal.lfilter(b, a, self.ydata)
def whitening_coefficients(self, order=6): ar = yulewalker(self.ydata, order) b, a = [1.] + ar.tolist(), [1.] return b, a
[docs] def ampspec_whiten( self, width, td_taper='auto', fd_taper='auto', pad_to_pow2=True, demean=True): ''' Whiten signal via frequency domain using moving average on amplitude spectra. :param width: width of smoothing kernel [Hz] :param td_taper: time domain taper, object of type :py:class:`Taper` or ``None`` or ``'auto'``. :param fd_taper: frequency domain taper, object of type :py:class:`Taper` or ``None`` or ``'auto'``. :param pad_to_pow2: whether to pad the signal with zeros up to a length of 2^n :param demean: whether to demean the signal before tapering The signal is first demeaned and then tapered using ``td_taper``. Then, the spectrum is calculated and inversely weighted with a smoothed version of its amplitude spectrum. A moving average is used for the smoothing. The smoothed spectrum is then tapered using ``fd_taper``. Finally, the smoothed and tapered spectrum is back-transformed into the time domain. If ``td_taper`` is set to ``'auto'``, ``CosFader(1.0/width)`` is used. If ``fd_taper`` is set to ``'auto'``, ``CosFader(width)`` is used. ''' ndata = self.data_len() if pad_to_pow2: ntrans = nextpow2(ndata) else: ntrans = ndata df = 1./(ntrans*self.deltat) nw = int(round(width/df)) if ndata//2+1 <= nw: raise TraceTooShort( 'Samples in trace: %s, samples needed: %s' % (ndata, nw)) if td_taper == 'auto': td_taper = CosFader(1./width) if fd_taper == 'auto': fd_taper = CosFader(width) if td_taper: self.taper(td_taper) ydata = self.get_ydata().astype(num.float) if demean: ydata -= ydata.mean() spec = num.fft.rfft(ydata, ntrans) amp = num.abs(spec) nspec = amp.size csamp = num.cumsum(amp) amp_smoothed = num.empty(nspec, dtype=csamp.dtype) n1, n2 = nw//2, nw//2 + nspec - nw amp_smoothed[n1:n2] = (csamp[nw:] - csamp[:-nw]) / nw amp_smoothed[:n1] = amp_smoothed[n1] amp_smoothed[n2:] = amp_smoothed[n2-1] denom = amp_smoothed * amp numer = amp eps = num.mean(denom) * 1e-9 if eps == 0.0: eps = 1e-9 numer += eps denom += eps spec *= numer/denom if fd_taper: fd_taper(spec, 0., df) ydata = num.fft.irfft(spec) self.set_ydata(ydata[:ndata])
def _get_cached_freqs(self, nf, deltaf): ck = (nf, deltaf) if ck not in Trace.cached_frequencies: Trace.cached_frequencies[ck] = deltaf * num.arange( nf, dtype=num.float) return Trace.cached_frequencies[ck]
[docs] def bandpass_fft(self, corner_hp, corner_lp): ''' Apply boxcar bandbpass to trace (in spectral domain). ''' n = len(self.ydata) n2 = nextpow2(n) data = num.zeros(n2, dtype=num.float64) data[:n] = self.ydata fdata = num.fft.rfft(data) freqs = self._get_cached_freqs(len(fdata), 1./(self.deltat*n2)) fdata[0] = 0.0 fdata *= num.logical_and(corner_hp < freqs, freqs < corner_lp) data = num.fft.irfft(fdata) self.drop_growbuffer() self.ydata = data[:n]
[docs] def shift(self, tshift): ''' Time shift the trace. ''' self.tmin += tshift self.tmax += tshift self._update_ids()
[docs] def snap(self, inplace=True, interpolate=False): ''' Shift trace samples to nearest even multiples of the sampling rate. :param inplace: (boolean) snap traces inplace If ``inplace`` is ``False`` and the difference of tmin and tmax of both, the snapped and the original trace is smaller than 0.01 x deltat, :py:func:`snap` returns the unsnapped instance of the original trace. ''' tmin = round(self.tmin/self.deltat)*self.deltat tmax = tmin + (self.ydata.size-1)*self.deltat if inplace: xself = self else: if abs(self.tmin - tmin) < 1e-2*self.deltat and \ abs(self.tmax - tmax) < 1e-2*self.deltat: return self xself = self.copy() if interpolate: from pyrocko import signal_ext n = xself.data_len() ydata_new = num.empty(n, dtype=num.float) i_control = num.array([0, n-1], dtype=num.int64) t_control = num.array([xself.tmin, xself.tmax]) signal_ext.antidrift(i_control, t_control, xself.ydata.astype(num.float), tmin, xself.deltat, ydata_new) xself.ydata = ydata_new xself.tmin = tmin xself.tmax = tmax xself._update_ids() return xself
[docs] def fix_deltat_rounding_errors(self): ''' Try to undo sampling rate rounding errors. See :py:func:`fix_deltat_rounding_errors`. ''' self.deltat = fix_deltat_rounding_errors(self.deltat) self.tmax = self.tmin + (self.data_len() - 1) * self.deltat
[docs] def sta_lta_centered(self, tshort, tlong, quad=True, scalingmethod=1): ''' Run special STA/LTA filter where the short time window is centered on the long time window. :param tshort: length of short time window in [s] :param tlong: length of long time window in [s] :param quad: whether to square the data prior to applying the STA/LTA filter :param scalingmethod: integer key to select how output values are scaled / normalized (``1``, ``2``, or ``3``) ============= ====================================== =========== Scalingmethod Implementation Range ============= ====================================== =========== ``1`` As/Al* Tl/Ts [0,1] ``2`` (As/Al - 1) / (Tl/Ts - 1) [-Ts/Tl,1] ``3`` Like ``2`` but clipping range at zero [0,1] ============= ====================================== =========== ''' nshort = int(round(tshort/self.deltat)) nlong = int(round(tlong/self.deltat)) assert nshort < nlong if nlong > len(self.ydata): raise TraceTooShort( 'Samples in trace: %s, samples needed: %s' % (len(self.ydata), nlong)) if quad: sqrdata = self.ydata**2 else: sqrdata = self.ydata mavg_short = moving_avg(sqrdata, nshort) mavg_long = moving_avg(sqrdata, nlong) self.drop_growbuffer() if scalingmethod not in (1, 2, 3): raise Exception('Invalid argument to scalingrange argument.') if scalingmethod == 1: self.ydata = mavg_short/mavg_long * float(nshort)/float(nlong) elif scalingmethod in (2, 3): self.ydata = (mavg_short/mavg_long - 1.) \ / ((float(nlong)/float(nshort)) - 1) if scalingmethod == 3: self.ydata = num.maximum(self.ydata, 0.)
[docs] def sta_lta_right(self, tshort, tlong, quad=True, scalingmethod=1): ''' Run special STA/LTA filter where the short time window is overlapping with the last part of the long time window. :param tshort: length of short time window in [s] :param tlong: length of long time window in [s] :param quad: whether to square the data prior to applying the STA/LTA filter :param scalingmethod: integer key to select how output values are scaled / normalized (``1``, ``2``, or ``3``) ============= ====================================== =========== Scalingmethod Implementation Range ============= ====================================== =========== ``1`` As/Al* Tl/Ts [0,1] ``2`` (As/Al - 1) / (Tl/Ts - 1) [-Ts/Tl,1] ``3`` Like ``2`` but clipping range at zero [0,1] ============= ====================================== =========== With ``scalingmethod=1``, the values produced by this variant of the STA/LTA are equivalent to .. math:: s_i = \\frac{s}{l} \\frac{\\frac{1}{s}\\sum_{j=i}{i+s-1} f_j} {\\frac{1}{l}\\sum_{j=i+s-l}^{i+s-1} f_j} where :math:`f_j` are the input samples, :math:`s` are the number of samples in the short time window and :math:`l` are the number of samples in the long time window. ''' n = self.data_len() tmin = self.tmin nshort = max(1, int(round(tshort/self.deltat))) nlong = max(1, int(round(tlong/self.deltat))) assert nshort < nlong if nlong > len(self.ydata): raise TraceTooShort( 'Samples in trace: %s, samples needed: %s' % (len(self.ydata), nlong)) if quad: sqrdata = self.ydata**2 else: sqrdata = self.ydata nshift = int(math.floor(0.5 * (nlong - nshort))) if nlong % 2 != 0 and nshort % 2 == 0: nshift += 1 mavg_short = moving_avg(sqrdata, nshort)[nshift:] mavg_long = moving_avg(sqrdata, nlong)[:sqrdata.size-nshift] self.drop_growbuffer() if scalingmethod not in (1, 2, 3): raise Exception('Invalid argument to scalingrange argument.') if scalingmethod == 1: ydata = mavg_short/mavg_long * nshort/nlong elif scalingmethod in (2, 3): ydata = (mavg_short/mavg_long - 1.) \ / ((float(nlong)/float(nshort)) - 1) if scalingmethod == 3: ydata = num.maximum(self.ydata, 0.) self.set_ydata(ydata) self.shift((math.ceil(0.5*nlong) - nshort + 1) * self.deltat) self.chop( tmin + (nlong - nshort) * self.deltat, tmin + (n - nshort) * self.deltat)
[docs] def peaks(self, threshold, tsearch, deadtime=False, nblock_duration_detection=100): ''' Detect peaks above a given threshold (method 1). From every instant, where the signal rises above ``threshold``, a time length of ``tsearch`` seconds is searched for a maximum. A list with tuples (time, value) for each detected peak is returned. The ``deadtime`` argument turns on a special deadtime duration detection algorithm useful in combination with recursive STA/LTA filters. ''' y = self.ydata above = num.where(y > threshold, 1, 0) deriv = num.zeros(y.size, dtype=num.int8) deriv[1:] = above[1:]-above[:-1] itrig_positions = num.nonzero(deriv > 0)[0] tpeaks = [] apeaks = [] tzeros = [] tzero = self.tmin for itrig_pos in itrig_positions: ibeg = itrig_pos iend = min( len(self.ydata), itrig_pos + int(math.ceil(tsearch/self.deltat))) ipeak = num.argmax(y[ibeg:iend]) tpeak = self.tmin + (ipeak+ibeg)*self.deltat apeak = y[ibeg+ipeak] if tpeak < tzero: continue if deadtime: ibeg = itrig_pos iblock = 0 nblock = nblock_duration_detection totalsum = 0. while True: if ibeg+iblock*nblock >= len(y): tzero = self.tmin + (len(y)-1) * self.deltat break logy = num.log( y[ibeg+iblock*nblock:ibeg+(iblock+1)*nblock]) logy[0] += totalsum ysum = num.cumsum(logy) totalsum = ysum[-1] below = num.where(ysum <= 0., 1, 0) deriv = num.zeros(ysum.size, dtype=num.int8) deriv[1:] = below[1:]-below[:-1] izero_positions = num.nonzero(deriv > 0)[0] + iblock*nblock if len(izero_positions) > 0: tzero = self.tmin + self.deltat * ( ibeg + izero_positions[0]) break iblock += 1 else: tzero = ibeg*self.deltat + self.tmin + tsearch tpeaks.append(tpeak) apeaks.append(apeak) tzeros.append(tzero) if deadtime: return tpeaks, apeaks, tzeros else: return tpeaks, apeaks
[docs] def peaks2(self, threshold, tsearch): ''' Detect peaks above a given threshold (method 2). This variant of peak detection is a bit more robust (and slower) than the one implemented in :py:meth:`Trace.peaks`. First all samples with ``a[i-1] < a[i] > a[i+1]`` are masked as potential peaks. From these, iteratively the one with the maximum amplitude ``a[j]`` and time ``t[j]`` is choosen and potential peaks within ``t[j] - tsearch, t[j] + tsearch`` are discarded. The algorithm stops, when ``a[j] < threshold`` or when no more potential peaks are left. ''' a = num.copy(self.ydata) amin = num.min(a) a[0] = amin a[1: -1][num.diff(a, 2) <= 0.] = amin a[-1] = amin data = [] while True: imax = num.argmax(a) amax = a[imax] if amax < threshold or amax == amin: break data.append((self.tmin + imax * self.deltat, amax)) ntsearch = int(round(tsearch / self.deltat)) a[max(imax-ntsearch//2, 0):min(imax+ntsearch//2, a.size)] = amin if data: data.sort() tpeaks, apeaks = list(zip(*data)) else: tpeaks, apeaks = [], [] return tpeaks, apeaks
[docs] def extend(self, tmin=None, tmax=None, fillmethod='zeros'): ''' Extend trace to given span. :param tmin: begin time of new span :param tmax: end time of new span :param fillmethod: ``'zeros'``, ``'repeat'``, ``'mean'``, or ``'median'`` ''' nold = self.ydata.size if tmin is not None: nl = min(0, int(round((tmin-self.tmin)/self.deltat))) else: nl = 0 if tmax is not None: nh = max(nold - 1, int(round((tmax-self.tmin)/self.deltat))) else: nh = nold - 1 n = nh - nl + 1 data = num.zeros(n, dtype=self.ydata.dtype) data[-nl:-nl + nold] = self.ydata if self.ydata.size >= 1: if fillmethod == 'repeat': data[:-nl] = self.ydata[0] data[-nl + nold:] = self.ydata[-1] elif fillmethod == 'median': v = num.median(self.ydata) data[:-nl] = v data[-nl + nold:] = v elif fillmethod == 'mean': v = num.mean(self.ydata) data[:-nl] = v data[-nl + nold:] = v self.drop_growbuffer() self.ydata = data self.tmin += nl * self.deltat self.tmax = self.tmin + (self.ydata.size - 1) * self.deltat self._update_ids()
[docs] def transfer(self, tfade=0., freqlimits=None, transfer_function=None, cut_off_fading=True, invert=False): ''' Return new trace with transfer function applied (convolution). :param tfade: rise/fall time in seconds of taper applied in timedomain at both ends of trace. :param freqlimits: 4-tuple with corner frequencies in Hz. :param transfer_function: FrequencyResponse object; must provide a method 'evaluate(freqs)', which returns the transfer function coefficients at the frequencies 'freqs'. :param cut_off_fading: whether to cut off rise/fall interval in output trace. :param invert: set to True to do a deconvolution ''' if transfer_function is None: transfer_function = FrequencyResponse() if self.tmax - self.tmin <= tfade*2.: raise TraceTooShort( 'Trace %s.%s.%s.%s too short for fading length setting. ' 'trace length = %g, fading length = %g' % (self.nslc_id + (self.tmax-self.tmin, tfade))) ndata = self.ydata.size ntrans = nextpow2(ndata*1.2) coefs = self._get_tapered_coefs( ntrans, freqlimits, transfer_function, invert=invert) data = self.ydata data_pad = num.zeros(ntrans, dtype=num.float) data_pad[:ndata] = data - data.mean() if tfade != 0.0: data_pad[:ndata] *= costaper( 0., tfade, self.deltat*(ndata-1)-tfade, self.deltat*ndata, ndata, self.deltat) fdata = num.fft.rfft(data_pad) fdata *= coefs ddata = num.fft.irfft(fdata) output = self.copy() output.ydata = ddata[:ndata] if cut_off_fading and tfade != 0.0: try: output.chop(output.tmin+tfade, output.tmax-tfade, inplace=True) except NoData: raise TraceTooShort( 'Trace %s.%s.%s.%s too short for fading length setting. ' 'trace length = %g, fading length = %g' % (self.nslc_id + (self.tmax-self.tmin, tfade))) else: output.ydata = output.ydata.copy() return output
def drop_chain_cache(self): if self._pchain: self._pchain.clear() def init_chain(self): self._pchain = pchain.Chain( do_downsample, do_extend, do_pre_taper, do_fft, do_filter, do_ifft) def run_chain(self, tmin, tmax, deltat, setup, nocache): if setup.domain == 'frequency_domain': _, _, data = self._pchain( (self, deltat), (tmin, tmax), (setup.taper,), (setup.filter,), (setup.filter,), nocache=nocache) return num.abs(data), num.abs(data) else: processed = self._pchain( (self, deltat), (tmin, tmax), (setup.taper,), (setup.filter,), (setup.filter,), (), nocache=nocache) if setup.domain == 'time_domain': data = processed.get_ydata() elif setup.domain == 'envelope': processed = processed.envelope(inplace=False) elif setup.domain == 'absolute': processed.set_ydata(num.abs(processed.get_ydata())) return processed.get_ydata(), processed
[docs] def misfit(self, candidate, setup, nocache=False, debug=False): """ Calculate misfit and normalization factor against candidate trace. :param candidate: :py:class:`Trace` object :param setup: :py:class:`MisfitSetup` object :returns: tuple ``(m, n)``, where m is the misfit value and n is the normalization divisor If the sampling rates of ``self`` and ``candidate`` differ, the trace with the higher sampling rate will be downsampled. """ a = self b = candidate for tr in (a, b): if not tr._pchain: tr.init_chain() deltat = max(a.deltat, b.deltat) tmin = min(a.tmin, b.tmin) - deltat tmax = max(a.tmax, b.tmax) + deltat adata, aproc = a.run_chain(tmin, tmax, deltat, setup, nocache) bdata, bproc = b.run_chain(tmin, tmax, deltat, setup, nocache) if setup.domain != 'cc_max_norm': m, n = Lx_norm(bdata, adata, norm=setup.norm) else: ctr = correlate(aproc, bproc, mode='full', normalization='normal') ccmax = ctr.max()[1] m = 0.5 - 0.5 * ccmax n = 0.5 if debug: return m, n, aproc, bproc else: return m, n
[docs] def spectrum(self, pad_to_pow2=False, tfade=None): ''' Get FFT spectrum of trace. :param pad_to_pow2: whether to zero-pad the data to next larger power-of-two length :param tfade: ``None`` or a time length in seconds, to apply cosine shaped tapers to both :returns: a tuple with (frequencies, values) ''' ndata = self.ydata.size if pad_to_pow2: ntrans = nextpow2(ndata) else: ntrans = ndata if tfade is None: ydata = self.ydata else: ydata = self.ydata * costaper( 0., tfade, self.deltat*(ndata-1)-tfade, self.deltat*ndata, ndata, self.deltat) fydata = num.fft.rfft(ydata, ntrans) df = 1./(ntrans*self.deltat) fxdata = num.arange(len(fydata))*df return fxdata, fydata
def multi_filter(self, filter_freqs, bandwidth): class Gauss(FrequencyResponse): def __init__(self, f0, a=1.0): self._omega0 = 2.*math.pi*f0 self._a = a def evaluate(self, freqs): omega = 2.*math.pi*freqs return num.exp(-((omega-self._omega0) / (self._a*self._omega0))**2) freqs, coefs = self.spectrum() n = self.data_len() nfilt = len(filter_freqs) signal_tf = num.zeros((nfilt, n)) centroid_freqs = num.zeros(nfilt) for ifilt, f0 in enumerate(filter_freqs): taper = Gauss(f0, a=bandwidth) weights = taper.evaluate(freqs) nhalf = freqs.size analytic_spec = num.zeros(n, dtype=num.complex) analytic_spec[:nhalf] = coefs*weights enorm = num.abs(analytic_spec[:nhalf])**2 enorm /= num.sum(enorm) if n % 2 == 0: analytic_spec[1:nhalf-1] *= 2. else: analytic_spec[1:nhalf] *= 2. analytic = num.fft.ifft(analytic_spec) signal_tf[ifilt, :] = num.abs(analytic) enorm = num.abs(analytic_spec[:nhalf])**2 enorm /= num.sum(enorm) centroid_freqs[ifilt] = num.sum(freqs*enorm) return centroid_freqs, signal_tf def _get_tapered_coefs( self, ntrans, freqlimits, transfer_function, invert=False): deltaf = 1./(self.deltat*ntrans) nfreqs = ntrans//2 + 1 transfer = num.ones(nfreqs, dtype=num.complex) hi = snapper(nfreqs, deltaf) if freqlimits is not None: a, b, c, d = freqlimits freqs = num.arange(hi(d)-hi(a), dtype=num.float)*deltaf \ + hi(a)*deltaf if invert: coeffs = transfer_function.evaluate(freqs) if num.any(coeffs == 0.0): raise InfiniteResponse('%s.%s.%s.%s' % self.nslc_id) transfer[hi(a):hi(d)] = 1.0 / transfer_function.evaluate(freqs) else: transfer[hi(a):hi(d)] = transfer_function.evaluate(freqs) tapered_transfer = costaper(a, b, c, d, nfreqs, deltaf)*transfer else: if invert: raise Exception( 'transfer: `freqlimits` must be given when `invert` is ' 'set to `True`') freqs = num.arange(nfreqs) * deltaf tapered_transfer = transfer_function.evaluate(freqs) tapered_transfer[0] = 0.0 # don't introduce static offsets return tapered_transfer
[docs] def fill_template(self, template, **additional): ''' Fill string template with trace metadata. Uses normal python '%(placeholder)s' string templates. The following placeholders are considered: ``network``, ``station``, ``location``, ``channel``, ``tmin`` (time of first sample), ``tmax`` (time of last sample), ``tmin_ms``, ``tmax_ms``, ``tmin_us``, ``tmax_us``, ``tmin_year``, ``tmax_year``, ``julianday``. The variants ending with ``'_ms'`` include milliseconds, those with ``'_us'`` include microseconds, those with ``'_year'`` contain only the year. ''' template = template.replace('%n', '%(network)s')\ .replace('%s', '%(station)s')\ .replace('%l', '%(location)s')\ .replace('%c', '%(channel)s')\ .replace('%b', '%(tmin)s')\ .replace('%e', '%(tmax)s')\ .replace('%j', '%(julianday)s') params = dict( zip(('network', 'station', 'location', 'channel'), self.nslc_id)) params['tmin'] = util.time_to_str( self.tmin, format='%Y-%m-%d_%H-%M-%S') params['tmax'] = util.time_to_str( self.tmax, format='%Y-%m-%d_%H-%M-%S') params['tmin_ms'] = util.time_to_str( self.tmin, format='%Y-%m-%d_%H-%M-%S.3FRAC') params['tmax_ms'] = util.time_to_str( self.tmax, format='%Y-%m-%d_%H-%M-%S.3FRAC') params['tmin_us'] = util.time_to_str( self.tmin, format='%Y-%m-%d_%H-%M-%S.6FRAC') params['tmax_us'] = util.time_to_str( self.tmax, format='%Y-%m-%d_%H-%M-%S.6FRAC') params['tmin_year'] = util.time_to_str( self.tmin, format='%Y') params['tmax_year'] = util.time_to_str( self.tmax, format='%Y') params['julianday'] = util.julian_day_of_year(self.tmin) params.update(additional) return template % params
[docs] def plot(self): ''' Show trace with matplotlib. See also: :py:meth:`Trace.snuffle`. ''' import pylab pylab.plot(self.get_xdata(), self.get_ydata()) name = '%s %s %s - %s' % ( self.channel, self.station, time.strftime("%d-%m-%y %H:%M:%S", time.gmtime(self.tmin)), time.strftime("%d-%m-%y %H:%M:%S", time.gmtime(self.tmax))) pylab.title(name) pylab.show()
[docs] def snuffle(self, **kwargs): ''' Show trace in a snuffler window. :param stations: list of `pyrocko.model.Station` objects or ``None`` :param events: list of `pyrocko.model.Event` objects or ``None`` :param markers: list of `pyrocko.gui.util.Marker` objects or ``None`` :param ntracks: float, number of tracks to be shown initially (default: 12) :param follow: time interval (in seconds) for real time follow mode or ``None`` :param controls: bool, whether to show the main controls (default: ``True``) :param opengl: bool, whether to use opengl (default: ``False``) ''' return snuffle([self], **kwargs)
[docs]def snuffle(traces, **kwargs): ''' Show traces in a snuffler window. :param stations: list of `pyrocko.model.Station` objects or ``None`` :param events: list of `pyrocko.model.Event` objects or ``None`` :param markers: list of `pyrocko.gui.util.Marker` objects or ``None`` :param ntracks: float, number of tracks to be shown initially (default: 12) :param follow: time interval (in seconds) for real time follow mode or ``None`` :param controls: bool, whether to show the main controls (default: ``True``) :param opengl: bool, whether to use opengl (default: ``False``) ''' from pyrocko import pile from pyrocko.gui import snuffler p = pile.Pile() if traces: trf = pile.MemTracesFile(None, traces) p.add_file(trf) return snuffler.snuffle(p, **kwargs)
[docs]class InfiniteResponse(Exception): ''' This exception is raised by :py:class:`Trace` operations when deconvolution of a frequency response (instrument response transfer function) would result in a division by zero. '''
[docs]class MisalignedTraces(Exception): ''' This exception is raised by some :py:class:`Trace` operations when tmin, tmax or number of samples do not match. ''' pass
[docs]class NoData(Exception): ''' This exception is raised by some :py:class:`Trace` operations when no or not enough data is available. ''' pass
[docs]class AboveNyquist(Exception): ''' This exception is raised by some :py:class:`Trace` operations when given frequencies are above the Nyquist frequency. ''' pass
[docs]class TraceTooShort(Exception): ''' This exception is raised by some :py:class:`Trace` operations when the trace is too short. ''' pass
[docs]class ResamplingFailed(Exception): pass
[docs]def minmax(traces, key=None, mode='minmax'): ''' Get data range given traces grouped by selected pattern. :param key: a callable which takes as single argument a trace and returns a key for the grouping of the results. If this is ``None``, the default, ``lambda tr: (tr.network, tr.station, tr.location, tr.channel)`` is used. :param mode: 'minmax' or floating point number. If this is 'minmax', minimum and maximum of the traces are used, if it is a number, mean +- standard deviation times ``mode`` is used. :returns: a dict with the combined data ranges. Examples:: ranges = minmax(traces, lambda tr: tr.channel) print ranges['N'] # print min & max of all traces with channel == 'N' print ranges['E'] # print min & max of all traces with channel == 'E' ranges = minmax(traces, lambda tr: (tr.network, tr.station)) print ranges['GR', 'HAM3'] # print min & max of all traces with # network == 'GR' and station == 'HAM3' ranges = minmax(traces, lambda tr: None) print ranges[None] # prints min & max of all traces ''' if key is None: key = _default_key ranges = {} for trace in traces: if isinstance(mode, str) and mode == 'minmax': mi, ma = trace.ydata.min(), trace.ydata.max() else: mean = trace.ydata.mean() std = trace.ydata.std() mi, ma = mean-std*mode, mean+std*mode k = key(trace) if k not in ranges: ranges[k] = mi, ma else: tmi, tma = ranges[k] ranges[k] = min(tmi, mi), max(tma, ma) return ranges
[docs]def minmaxtime(traces, key=None): ''' Get time range given traces grouped by selected pattern. :param key: a callable which takes as single argument a trace and returns a key for the grouping of the results. If this is ``None``, the default, ``lambda tr: (tr.network, tr.station, tr.location, tr.channel)`` is used. :returns: a dict with the combined data ranges. ''' if key is None: key = _default_key ranges = {} for trace in traces: mi, ma = trace.tmin, trace.tmax k = key(trace) if k not in ranges: ranges[k] = mi, ma else: tmi, tma = ranges[k] ranges[k] = min(tmi, mi), max(tma, ma) return ranges
[docs]def degapper( traces, maxgap=5, fillmethod='interpolate', deoverlap='use_second', maxlap=None): ''' Try to connect traces and remove gaps. This method will combine adjacent traces, which match in their network, station, location and channel attributes. Overlapping parts are handled according to the ``deoverlap`` argument. :param traces: input traces, must be sorted by their full_id attribute. :param maxgap: maximum number of samples to interpolate. :param fillmethod: what to put into the gaps: 'interpolate' or 'zeros'. :param deoverlap: how to handle overlaps: 'use_second' to use data from second trace (default), 'use_first' to use data from first trace, 'crossfade_cos' to crossfade with cosine taper, 'add' to add amplitude values. :param maxlap: maximum number of samples of overlap which are removed :returns: list of traces ''' in_traces = traces out_traces = [] if not in_traces: return out_traces out_traces.append(in_traces.pop(0)) while in_traces: a = out_traces[-1] b = in_traces.pop(0) avirt, bvirt = a.ydata is None, b.ydata is None assert avirt == bvirt, \ 'traces given to degapper() must either all have data or have ' \ 'no data.' virtual = avirt and bvirt if (a.nslc_id == b.nslc_id and a.deltat == b.deltat and a.data_len() >= 1 and b.data_len() >= 1 and (virtual or a.ydata.dtype == b.ydata.dtype)): dist = (b.tmin-(a.tmin+(a.data_len()-1)*a.deltat))/a.deltat idist = int(round(dist)) if abs(dist - idist) > 0.05 and idist <= maxgap: # logger.warning('Cannot degap traces with displaced sampling ' # '(%s, %s, %s, %s)' % a.nslc_id) pass else: if 1 < idist <= maxgap: if not virtual: if fillmethod == 'interpolate': filler = a.ydata[-1] + ( ((1.0 + num.arange(idist-1, dtype=num.float)) / idist) * (b.ydata[0]-a.ydata[-1]) ).astype(a.ydata.dtype) elif fillmethod == 'zeros': filler = num.zeros(idist-1, dtype=a.ydist.dtype) a.ydata = num.concatenate((a.ydata, filler, b.ydata)) a.tmax = b.tmax if a.mtime and b.mtime: a.mtime = max(a.mtime, b.mtime) continue elif idist == 1: if not virtual: a.ydata = num.concatenate((a.ydata, b.ydata)) a.tmax = b.tmax if a.mtime and b.mtime: a.mtime = max(a.mtime, b.mtime) continue elif idist <= 0 and (maxlap is None or -maxlap < idist): if b.tmax > a.tmax: if not virtual: na = a.ydata.size n = -idist+1 if deoverlap == 'use_second': a.ydata = num.concatenate( (a.ydata[:-n], b.ydata)) elif deoverlap in ('use_first', 'crossfade_cos'): a.ydata = num.concatenate( (a.ydata, b.ydata[n:])) elif deoverlap == 'add': a.ydata[-n:] += b.ydata[:n] a.ydata = num.concatenate( (a.ydata, b.ydata[n:])) else: assert False, 'unknown deoverlap method' if deoverlap == 'crossfade_cos': n = -idist+1 taper = 0.5-0.5*num.cos( (1.+num.arange(n))/(1.+n)*num.pi) a.ydata[na-n:na] *= 1.-taper a.ydata[na-n:na] += b.ydata[:n] * taper a.tmax = b.tmax if a.mtime and b.mtime: a.mtime = max(a.mtime, b.mtime) continue else: # make short second trace vanish continue if b.data_len() >= 1: out_traces.append(b) for tr in out_traces: tr._update_ids() return out_traces
[docs]def rotate(traces, azimuth, in_channels, out_channels): ''' 2D rotation of traces. :param traces: list of input traces :param azimuth: difference of the azimuths of the component directions (azimuth of out_channels[0]) - (azimuth of in_channels[0]) :param in_channels: names of the input channels (e.g. 'N', 'E') :param out_channels: names of the output channels (e.g. 'R', 'T') :returns: list of rotated traces ''' phi = azimuth/180.*math.pi cphi = math.cos(phi) sphi = math.sin(phi) rotated = [] in_channels = tuple(_channels_to_names(in_channels)) out_channels = tuple(_channels_to_names(out_channels)) for a in traces: for b in traces: if ((a.channel, b.channel) == in_channels and a.nslc_id[:3] == b.nslc_id[:3] and abs(a.deltat-b.deltat) < a.deltat*0.001): tmin = max(a.tmin, b.tmin) tmax = min(a.tmax, b.tmax) if tmin < tmax: ac = a.chop(tmin, tmax, inplace=False, include_last=True) bc = b.chop(tmin, tmax, inplace=False, include_last=True) if abs(ac.tmin - bc.tmin) > ac.deltat*0.01: logger.warning( 'Cannot rotate traces with displaced sampling ' '(%s, %s, %s, %s)' % a.nslc_id) continue acydata = ac.get_ydata()*cphi+bc.get_ydata()*sphi bcydata = -ac.get_ydata()*sphi+bc.get_ydata()*cphi ac.set_ydata(acydata) bc.set_ydata(bcydata) ac.set_codes(channel=out_channels[0]) bc.set_codes(channel=out_channels[1]) rotated.append(ac) rotated.append(bc) return rotated
def rotate_to_rt(n, e, source, receiver, out_channels=('R', 'T')): azimuth = orthodrome.azimuth(receiver, source) + 180. in_channels = n.channel, e.channel out = rotate( [n, e], azimuth, in_channels=in_channels, out_channels=out_channels) assert len(out) == 2 for tr in out: if tr.channel == 'R': r = tr elif tr.channel == 'T': t = tr return r, t
[docs]def rotate_to_lqt(traces, backazimuth, incidence, in_channels, out_channels=('L', 'Q', 'T')): '''Rotate traces from ZNE to LQT system. :param traces: list of traces in arbitrary order :param backazimuth: backazimuth in degrees clockwise from north :param incidence: incidence angle in degrees from vertical :param in_channels: input channel names :param out_channels: output channel names (default: ('L', 'Q', 'T')) :returns: list of transformed traces ''' i = incidence/180.*num.pi b = backazimuth/180.*num.pi ci = num.cos(i) cb = num.cos(b) si = num.sin(i) sb = num.sin(b) rotmat = num.array( [[ci, -cb*si, -sb*si], [si, cb*ci, sb*ci], [0., sb, -cb]]) return project(traces, rotmat, in_channels, out_channels)
def _decompose(a): ''' Decompose matrix into independent submatrices. ''' def depends(iout, a): row = a[iout, :] return set(num.arange(row.size).compress(row != 0.0)) def provides(iin, a): col = a[:, iin] return set(num.arange(col.size).compress(col != 0.0)) a = num.asarray(a) outs = set(range(a.shape[0])) systems = [] while outs: iout = outs.pop() gout = set() for iin in depends(iout, a): gout.update(provides(iin, a)) if not gout: continue gin = set() for iout2 in gout: gin.update(depends(iout2, a)) if not gin: continue for iout2 in gout: if iout2 in outs: outs.remove(iout2) gin = list(gin) gin.sort() gout = list(gout) gout.sort() systems.append((gin, gout, a[gout, :][:, gin])) return systems def _channels_to_names(channels): names = [] for ch in channels: if isinstance(ch, model.Channel): names.append(ch.name) else: names.append(ch) return names
[docs]def project(traces, matrix, in_channels, out_channels): ''' Affine transform of three-component traces. Compute matrix-vector product of three-component traces, to e.g. rotate traces into a different basis. The traces are distinguished and ordered by their channel attribute. The tranform is applied to overlapping parts of any appropriate combinations of the input traces. This should allow this function to be robust with data gaps. It also tries to apply the tranformation to subsets of the channels, if this is possible, so that, if for example a vertical compontent is missing, horizontal components can still be rotated. :param traces: list of traces in arbitrary order :param matrix: tranformation matrix :param in_channels: input channel names :param out_channels: output channel names :returns: list of transformed traces ''' in_channels = tuple(_channels_to_names(in_channels)) out_channels = tuple(_channels_to_names(out_channels)) systems = _decompose(matrix) # fallback to full matrix if some are not quadratic for iins, iouts, submatrix in systems: if submatrix.shape[0] != submatrix.shape[1]: return _project3(traces, matrix, in_channels, out_channels) projected = [] for iins, iouts, submatrix in systems: in_cha = tuple([in_channels[iin] for iin in iins]) out_cha = tuple([out_channels[iout] for iout in iouts]) if submatrix.shape[0] == 1: projected.extend(_project1(traces, submatrix, in_cha, out_cha)) elif submatrix.shape[1] == 2: projected.extend(_project2(traces, submatrix, in_cha, out_cha)) else: projected.extend(_project3(traces, submatrix, in_cha, out_cha)) return projected
[docs]def project_dependencies(matrix, in_channels, out_channels): ''' Figure out what dependencies project() would produce. ''' in_channels = tuple(_channels_to_names(in_channels)) out_channels = tuple(_channels_to_names(out_channels)) systems = _decompose(matrix) subpro = [] for iins, iouts, submatrix in systems: if submatrix.shape[0] != submatrix.shape[1]: subpro.append((matrix, in_channels, out_channels)) if not subpro: for iins, iouts, submatrix in systems: in_cha = tuple([in_channels[iin] for iin in iins]) out_cha = tuple([out_channels[iout] for iout in iouts]) subpro.append((submatrix, in_cha, out_cha)) deps = {} for mat, in_cha, out_cha in subpro: for oc in out_cha: if oc not in deps: deps[oc] = [] for ic in in_cha: deps[oc].append(ic) return deps
def _project1(traces, matrix, in_channels, out_channels): assert len(in_channels) == 1 assert len(out_channels) == 1 assert matrix.shape == (1, 1) projected = [] for a in traces: if not (a.channel,) == in_channels: continue ac = a.copy() ac.set_ydata(matrix[0, 0]*a.get_ydata()) ac.set_codes(channel=out_channels[0]) projected.append(ac) return projected def _project2(traces, matrix, in_channels, out_channels): assert len(in_channels) == 2 assert len(out_channels) == 2 assert matrix.shape == (2, 2) projected = [] for a in traces: for b in traces: if not ((a.channel, b.channel) == in_channels and a.nslc_id[:3] == b.nslc_id[:3] and abs(a.deltat-b.deltat) < a.deltat*0.001): continue tmin = max(a.tmin, b.tmin) tmax = min(a.tmax, b.tmax) if tmin > tmax: continue ac = a.chop(tmin, tmax, inplace=False, include_last=True) bc = b.chop(tmin, tmax, inplace=False, include_last=True) if abs(ac.tmin - bc.tmin) > ac.deltat*0.01: logger.warning( 'Cannot project traces with displaced sampling ' '(%s, %s, %s, %s)' % a.nslc_id) continue acydata = num.dot(matrix[0], (ac.get_ydata(), bc.get_ydata())) bcydata = num.dot(matrix[1], (ac.get_ydata(), bc.get_ydata())) ac.set_ydata(acydata) bc.set_ydata(bcydata) ac.set_codes(channel=out_channels[0]) bc.set_codes(channel=out_channels[1]) projected.append(ac) projected.append(bc) return projected def _project3(traces, matrix, in_channels, out_channels): assert len(in_channels) == 3 assert len(out_channels) == 3 assert matrix.shape == (3, 3) projected = [] for a in traces: for b in traces: for c in traces: if not ((a.channel, b.channel, c.channel) == in_channels and a.nslc_id[:3] == b.nslc_id[:3] and b.nslc_id[:3] == c.nslc_id[:3] and abs(a.deltat-b.deltat) < a.deltat*0.001 and abs(b.deltat-c.deltat) < b.deltat*0.001): continue tmin = max(a.tmin, b.tmin, c.tmin) tmax = min(a.tmax, b.tmax, c.tmax) if tmin >= tmax: continue ac = a.chop(tmin, tmax, inplace=False, include_last=True) bc = b.chop(tmin, tmax, inplace=False, include_last=True) cc = c.chop(tmin, tmax, inplace=False, include_last=True) if (abs(ac.tmin - bc.tmin) > ac.deltat*0.01 or abs(bc.tmin - cc.tmin) > bc.deltat*0.01): logger.warning( 'Cannot project traces with displaced sampling ' '(%s, %s, %s, %s)' % a.nslc_id) continue acydata = num.dot( matrix[0], (ac.get_ydata(), bc.get_ydata(), cc.get_ydata())) bcydata = num.dot( matrix[1], (ac.get_ydata(), bc.get_ydata(), cc.get_ydata())) ccydata = num.dot( matrix[2], (ac.get_ydata(), bc.get_ydata(), cc.get_ydata())) ac.set_ydata(acydata) bc.set_ydata(bcydata) cc.set_ydata(ccydata) ac.set_codes(channel=out_channels[0]) bc.set_codes(channel=out_channels[1]) cc.set_codes(channel=out_channels[2]) projected.append(ac) projected.append(bc) projected.append(cc) return projected
[docs]def correlate(a, b, mode='valid', normalization=None, use_fft=False): ''' Cross correlation of two traces. :param a,b: input traces :param mode: ``'valid'``, ``'full'``, or ``'same'`` :param normalization: ``'normal'``, ``'gliding'``, or ``None`` :param use_fft: bool, whether to do cross correlation in spectral domain :returns: trace containing cross correlation coefficients This function computes the cross correlation between two traces. It evaluates the discrete equivalent of .. math:: c(t) = \\int_{-\\infty}^{\\infty} a^{\\ast}(\\tau) b(t+\\tau) d\\tau where the star denotes complex conjugate. Note, that the arguments here are swapped when compared with the :py:func:`numpy.correlate` function, which is internally called. This function should be safe even with older versions of NumPy, where the correlate function has some problems. A trace containing the cross correlation coefficients is returned. The time information of the output trace is set so that the returned cross correlation can be viewed directly as a function of time lag. Example:: # align two traces a and b containing a time shifted similar signal: c = pyrocko.trace.correlate(a,b) t, coef = c.max() # get time and value of maximum b.shift(-t) # align b with a ''' assert_same_sampling_rate(a, b) ya, yb = a.ydata, b.ydata # need reversed order here: yc = numpy_correlate_fixed(yb, ya, mode=mode, use_fft=use_fft) kmin, kmax = numpy_correlate_lag_range(yb, ya, mode=mode, use_fft=use_fft) if normalization == 'normal': normfac = num.sqrt(num.sum(ya**2))*num.sqrt(num.sum(yb**2)) yc = yc/normfac elif normalization == 'gliding': if mode != 'valid': assert False, 'gliding normalization currently only available ' \ 'with "valid" mode.' if ya.size < yb.size: yshort, ylong = ya, yb else: yshort, ylong = yb, ya epsilon = 0.00001 normfac_short = num.sqrt(num.sum(yshort**2)) normfac = normfac_short * num.sqrt( moving_sum(ylong**2, yshort.size, mode='valid')) \ + normfac_short*epsilon if yb.size <= ya.size: normfac = normfac[::-1] yc /= normfac c = a.copy() c.set_ydata(yc) c.set_codes(*merge_codes(a, b, '~')) c.shift(-c.tmin + b.tmin-a.tmin + kmin * c.deltat) return c
def deconvolve( a, b, waterlevel, tshift=0., pad=0.5, fd_taper=None, pad_to_pow2=True): same_sampling_rate(a, b) assert abs(a.tmin - b.tmin) < a.deltat * 0.001 deltat = a.deltat npad = int(round(a.data_len()*pad + tshift / deltat)) ndata = max(a.data_len(), b.data_len()) ndata_pad = ndata + npad if pad_to_pow2: ntrans = nextpow2(ndata_pad) else: ntrans = ndata aspec = num.fft.rfft(a.ydata, ntrans) bspec = num.fft.rfft(b.ydata, ntrans) out = aspec * num.conj(bspec) bautocorr = bspec*num.conj(bspec) denom = num.maximum(bautocorr, waterlevel * bautocorr.max()) out /= denom df = 1/(ntrans*deltat) if fd_taper is not None: fd_taper(out, 0.0, df) ydata = num.roll(num.fft.irfft(out), int(round(tshift/deltat))) c = a.copy(data=False) c.set_ydata(ydata[:ndata]) c.set_codes(*merge_codes(a, b, '/')) return c def assert_same_sampling_rate(a, b, eps=1.0e-6): assert same_sampling_rate(a, b, eps), \ 'Sampling rates differ: %g != %g' % (a.deltat, b.deltat)
[docs]def same_sampling_rate(a, b, eps=1.0e-6): ''' Check if two traces have the same sampling rate. :param a,b: input traces :param eps: relative tolerance ''' return abs(a.deltat - b.deltat) < (a.deltat + b.deltat)*eps
[docs]def fix_deltat_rounding_errors(deltat): ''' Try to undo sampling rate rounding errors. Fix rounding errors of sampling intervals when these are read from single precision floating point values. Assumes that the true sampling rate or sampling interval was an integer value. No correction will be applied if this would change the sampling rate by more than 0.001%. ''' if deltat <= 1.0: deltat_new = 1.0 / round(1.0 / deltat) else: deltat_new = round(deltat) if abs(deltat_new - deltat) / deltat > 1e-5: deltat_new = deltat return deltat_new
[docs]def merge_codes(a, b, sep='-'): ''' Merge network-station-location-channel codes of a pair of traces. ''' o = [] for xa, xb in zip(a.nslc_id, b.nslc_id): if xa == xb: o.append(xa) else: o.append(sep.join((xa, xb))) return o
[docs]class Taper(Object): ''' Base class for tapers. Does nothing by default. ''' def __call__(self, y, x0, dx): pass
[docs]class CosTaper(Taper): ''' Cosine Taper. :param a: start of fading in :param b: end of fading in :param c: start of fading out :param d: end of fading out ''' a = Float.T() b = Float.T() c = Float.T() d = Float.T() def __init__(self, a, b, c, d): Taper.__init__(self, a=a, b=b, c=c, d=d) def __call__(self, y, x0, dx): apply_costaper(self.a, self.b, self.c, self.d, y, x0, dx) def span(self, y, x0, dx): return span_costaper(self.a, self.b, self.c, self.d, y, x0, dx) def time_span(self): return self.a, self.d
[docs]class CosFader(Taper): ''' Cosine Fader. :param xfade: fade in and fade out time in seconds (optional) :param xfrac: fade in and fade out as fraction between 0. and 1. (optional) Only one argument can be set. The other should to be ``None``. ''' xfade = Float.T(optional=True) xfrac = Float.T(optional=True) def __init__(self, xfade=None, xfrac=None): Taper.__init__(self, xfade=xfade, xfrac=xfrac) assert (xfade is None) != (xfrac is None) self._xfade = xfade self._xfrac = xfrac def __call__(self, y, x0, dx): xfade = self._xfade xlen = (y.size - 1)*dx if xfade is None: xfade = xlen * self._xfrac a = x0 b = x0 + xfade c = x0 + xlen - xfade d = x0 + xlen apply_costaper(a, b, c, d, y, x0, dx) def span(self, y, x0, dx): return 0, y.size def time_span(self): return None, None
def none_min(l): if None in l: return None else: return min(x for x in l if x is not None) def none_max(l): if None in l: return None else: return max(x for x in l if x is not None)
[docs]class MultiplyTaper(Taper): ''' Multiplication of several tapers. ''' tapers = List.T(Taper.T()) def __init__(self, tapers=None): if tapers is None: tapers = [] Taper.__init__(self, tapers=tapers) def __call__(self, y, x0, dx): for taper in self.tapers: taper(y, x0, dx) def span(self, y, x0, dx): spans = [] for taper in self.tapers: spans.append(taper.span(y, x0, dx)) mins, maxs = list(zip(*spans)) return min(mins), max(maxs) def time_span(self): spans = [] for taper in self.tapers: spans.append(taper.time_span()) mins, maxs = list(zip(*spans)) return none_min(mins), none_max(maxs)
[docs]class GaussTaper(Taper): ''' Frequency domain Gaussian filter. ''' alpha = Float.T() def __init__(self, alpha): Taper.__init__(self, alpha=alpha) self._alpha = alpha def __call__(self, y, x0, dx): f = x0 + num.arange(y.size)*dx y *= num.exp(-num.pi**2 / (self._alpha**2) * f**2)
[docs]class FrequencyResponse(Object): ''' Evaluates frequency response at given frequencies. ''' def evaluate(self, freqs): coefs = num.ones(freqs.size, dtype=num.complex) return coefs
[docs]class Evalresp(FrequencyResponse): ''' Calls evalresp and generates values of the instrument response transfer function. :param respfile: response file in evalresp format :param trace: trace for which the response is to be extracted from the file :param target: ``'dis'`` for displacement or ``'vel'`` for velocity ''' respfile = String.T() nslc_id = Tuple.T(4, String.T()) target = String.T(default='dis') instant = Float.T() def __init__( self, respfile, trace=None, target='dis', nslc_id=None, time=None): if trace is not None: nslc_id = trace.nslc_id time = (trace.tmin + trace.tmax) / 2. FrequencyResponse.__init__( self, respfile=respfile, nslc_id=nslc_id, instant=time, target=target) def evaluate(self, freqs): network, station, location, channel = self.nslc_id x = evalresp.evalresp( sta_list=station, cha_list=channel, net_code=network, locid=location, instant=self.instant, freqs=freqs, units=self.target.upper(), file=self.respfile, rtype='CS') transfer = x[0][4] return transfer
[docs]class InverseEvalresp(FrequencyResponse): ''' Calls evalresp and generates values of the inverse instrument response for deconvolution of instrument response. :param respfile: response file in evalresp format :param trace: trace for which the response is to be extracted from the file :param target: ``'dis'`` for displacement or ``'vel'`` for velocity ''' respfile = String.T() nslc_id = Tuple.T(4, String.T()) target = String.T(default='dis') instant = Float.T() def __init__(self, respfile, trace, target='dis'): FrequencyResponse.__init__( self, respfile=respfile, nslc_id=trace.nslc_id, instant=(trace.tmin + trace.tmax)/2., target=target) def evaluate(self, freqs): network, station, location, channel = self.nslc_id x = evalresp.evalresp(sta_list=station, cha_list=channel, net_code=network, locid=location, instant=self.instant, freqs=freqs, units=self.target.upper(), file=self.respfile, rtype='CS') transfer = x[0][4] return 1./transfer
[docs]class PoleZeroResponse(FrequencyResponse): ''' Evaluates frequency response from pole-zero representation. :param zeros: :py:class:`numpy.array` containing complex positions of zeros :param poles: :py:class:`numpy.array` containing complex positions of poles :param constant: gain as floating point number :: (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ... T(f) = constant * ---------------------------------------------------- (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ... The poles and zeros should be given as angular frequencies, not in Hz. ''' zeros = List.T(Complex.T()) poles = List.T(Complex.T()) constant = Complex.T(default=1.0+0j) def __init__(self, zeros=None, poles=None, constant=1.0+0j): if zeros is None: zeros = [] if poles is None: poles = [] FrequencyResponse.__init__( self, zeros=zeros, poles=poles, constant=constant) def evaluate(self, freqs): jomeg = 1.0j * 2.*num.pi*freqs a = num.ones(freqs.size, dtype=num.complex)*self.constant for z in self.zeros: a *= jomeg-z for p in self.poles: a /= jomeg-p return a
[docs]class ButterworthResponse(FrequencyResponse): ''' Butterworth frequency response. :param corner: corner frequency of the response :param order: order of the response :param type: either ``high`` or ``low`` ''' corner = Float.T(default=1.0) order = Int.T(default=4) type = StringChoice.T(choices=['low', 'high'], default='low') def evaluate(self, freqs): b, a = signal.butter( int(self.order), float(self.corner), self.type, analog=True) w, h = signal.freqs(b, a, freqs) return h
[docs]class SampledResponse(FrequencyResponse): ''' Interpolates frequency response given at a set of sampled frequencies. :param frequencies,values: frequencies and values of the sampled response function. :param left,right: values to return when input is out of range. If set to ``None`` (the default) the endpoints are returned. ''' frequencies = Array.T(shape=(None,), dtype=num.float, serialize_as='list') values = Array.T(shape=(None,), dtype=num.complex, serialize_as='list') left = Complex.T(optional=True) right = Complex.T(optional=True) def __init__(self, frequencies, values, left=None, right=None): FrequencyResponse.__init__( self, frequencies=asarray_1d(frequencies, num.float), values=asarray_1d(values, num.complex)) def evaluate(self, freqs): ereal = num.interp( freqs, self.frequencies, num.real(self.values), left=self.left, right=self.right) eimag = num.interp( freqs, self.frequencies, num.imag(self.values), left=self.left, right=self.right) transfer = ereal + 1.0j*eimag return transfer
[docs] def inverse(self): ''' Get inverse as a new :py:class:`SampledResponse` object. ''' def inv_or_none(x): if x is not None: return 1./x return SampledResponse( self.frequencies, 1./self.values, left=inv_or_none(self.left), right=inv_or_none(self.right))
[docs]class IntegrationResponse(FrequencyResponse): ''' The integration response, optionally multiplied by a constant gain. :param n: exponent (integer) :param gain: gain factor (float) :: gain T(f) = -------------- (j*2*pi * f)^n ''' n = Int.T(optional=True, default=1) gain = Float.T(optional=True, default=1.0) def __init__(self, n=1, gain=1.0): FrequencyResponse.__init__(self, n=n, gain=gain) def evaluate(self, freqs): nonzero = freqs != 0.0 resp = num.empty(freqs.size, dtype=num.complex) resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n resp[num.logical_not(nonzero)] = 0.0 return resp
[docs]class DifferentiationResponse(FrequencyResponse): ''' The differentiation response, optionally multiplied by a constant gain. :param n: exponent (integer) :param gain: gain factor (float) :: T(f) = gain * (j*2*pi * f)^n ''' n = Int.T(optional=True, default=1) gain = Float.T(optional=True, default=1.0) def __init__(self, n=1, gain=1.0): FrequencyResponse.__init__(self, n=n, gain=gain) def evaluate(self, freqs): return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
[docs]class AnalogFilterResponse(FrequencyResponse): ''' Frequency response of an analog filter. (see :py:func:`scipy.signal.freqs`). ''' b = List.T(Float.T()) a = List.T(Float.T()) def __init__(self, b, a): FrequencyResponse.__init__(self, b=b, a=a) def evaluate(self, freqs): return signal.freqs(self.b, self.a, freqs/(2.*num.pi))[1]
[docs]class MultiplyResponse(FrequencyResponse): ''' Multiplication of several :py:class:`FrequencyResponse` objects. ''' responses = List.T(FrequencyResponse.T()) def __init__(self, responses=None): if responses is None: responses = [] FrequencyResponse.__init__(self, responses=responses) def evaluate(self, freqs): a = num.ones(freqs.size, dtype=num.complex) for resp in self.responses: a *= resp.evaluate(freqs) return a
def asarray_1d(x, dtype): if isinstance(x, (list, tuple)) and x and isinstance(x[0], (str, newstr)): return num.asarray(list(map(dtype, x)), dtype=dtype) else: a = num.asarray(x, dtype=dtype) if not a.ndim == 1: raise ValueError('could not convert to 1D array') return a cached_coefficients = {} def _get_cached_filter_coefs(order, corners, btype): ck = (order, tuple(corners), btype) if ck not in cached_coefficients: if len(corners) == 0: cached_coefficients[ck] = signal.butter( order, corners[0], btype=btype) else: cached_coefficients[ck] = signal.butter( order, corners, btype=btype) return cached_coefficients[ck] class _globals(object): _numpy_has_correlate_flip_bug = None def _default_key(tr): return (tr.network, tr.station, tr.location, tr.channel)
[docs]def numpy_has_correlate_flip_bug(): ''' Check if NumPy's correlate function reveals old behaviour ''' if _globals._numpy_has_correlate_flip_bug is None: a = num.array([0, 0, 1, 0, 0, 0, 0]) b = num.array([0, 0, 0, 0, 1, 0, 0, 0]) ab = num.correlate(a, b, mode='same') ba = num.correlate(b, a, mode='same') _globals._numpy_has_correlate_flip_bug = num.all(ab == ba) return _globals._numpy_has_correlate_flip_bug
[docs]def numpy_correlate_fixed(a, b, mode='valid', use_fft=False): ''' Call :py:func:`numpy.correlate` with fixes. c[k] = sum_i a[i+k] * conj(b[i]) Note that the result produced by newer numpy.correlate is always flipped with respect to the formula given in its documentation (if ascending k assumed for the output). ''' if use_fft: if a.size < b.size: c = signal.fftconvolve(b[::-1], a, mode=mode) else: c = signal.fftconvolve(a, b[::-1], mode=mode) return c else: buggy = numpy_has_correlate_flip_bug() a = num.asarray(a) b = num.asarray(b) if buggy: b = num.conj(b) c = num.correlate(a, b, mode=mode) if buggy and a.size < b.size: return c[::-1] else: return c
[docs]def numpy_correlate_emulate(a, b, mode='valid'): ''' Slow version of :py:func:`numpy.correlate` for comparison. ''' a = num.asarray(a) b = num.asarray(b) kmin = -(b.size-1) klen = a.size-kmin kmin, kmax = numpy_correlate_lag_range(a, b, mode=mode) kmin = int(kmin) kmax = int(kmax) klen = kmax - kmin + 1 c = num.zeros(klen, dtype=num.find_common_type((b.dtype, a.dtype), ())) for k in range(kmin, kmin+klen): imin = max(0, -k) ilen = min(b.size, a.size-k) - imin c[k-kmin] = num.sum( a[imin+k:imin+ilen+k] * num.conj(b[imin:imin+ilen])) return c
[docs]def numpy_correlate_lag_range(a, b, mode='valid', use_fft=False): ''' Get range of lags for which :py:func:`numpy.correlate` produces values. ''' a = num.asarray(a) b = num.asarray(b) kmin = -(b.size-1) if mode == 'full': klen = a.size-kmin elif mode == 'same': klen = max(a.size, b.size) kmin += (a.size+b.size-1 - max(a.size, b.size)) // 2 + \ int(not use_fft and a.size % 2 == 0 and b.size > a.size) elif mode == 'valid': klen = abs(a.size - b.size) + 1 kmin += min(a.size, b.size) - 1 return kmin, kmin + klen - 1
[docs]def autocorr(x, nshifts): ''' Compute biased estimate of the first autocorrelation coefficients. :param x: input array :param nshifts: number of coefficients to calculate ''' mean = num.mean(x) std = num.std(x) n = x.size xdm = x - mean r = num.zeros(nshifts) for k in range(nshifts): r[k] = 1./((n-num.abs(k))*std) * num.sum(xdm[:n-k] * xdm[k:]) return r
[docs]def yulewalker(x, order): ''' Compute autoregression coefficients using Yule-Walker method. :param x: input array :param order: number of coefficients to produce A biased estimate of the autocorrelation is used. The Yule-Walker equations are solved by :py:func:`numpy.linalg.inv` instead of Levinson-Durbin recursion which is normally used. ''' gamma = autocorr(x, order+1) d = gamma[1:1+order] a = num.zeros((order, order)) gamma2 = num.concatenate((gamma[::-1], gamma[1:order])) for i in range(order): ioff = order-i a[i, :] = gamma2[ioff:ioff+order] return num.dot(num.linalg.inv(a), -d)
def moving_avg(x, n): n = int(n) cx = x.cumsum() nn = len(x) y = num.zeros(nn, dtype=cx.dtype) y[n//2:n//2+(nn-n)] = (cx[n:]-cx[:-n])/n y[:n//2] = y[n//2] y[n//2+(nn-n):] = y[n//2+(nn-n)-1] return y def moving_sum(x, n, mode='valid'): n = int(n) cx = x.cumsum() nn = len(x) if mode == 'valid': if nn-n+1 <= 0: return num.zeros(0, dtype=cx.dtype) y = num.zeros(nn-n+1, dtype=cx.dtype) y[0] = cx[n-1] y[1:nn-n+1] = cx[n:nn]-cx[0:nn-n] if mode == 'full': y = num.zeros(nn+n-1, dtype=cx.dtype) if n <= nn: y[0:n] = cx[0:n] y[n:nn] = cx[n:nn]-cx[0:nn-n] y[nn:nn+n-1] = cx[-1]-cx[nn-n:nn-1] else: y[0:nn] = cx[0:nn] y[nn:n] = cx[nn-1] y[n:nn+n-1] = cx[nn-1] - cx[0:nn-1] if mode == 'same': n1 = (n-1)//2 y = num.zeros(nn, dtype=cx.dtype) if n <= nn: y[0:n-n1] = cx[n1:n] y[n-n1:nn-n1] = cx[n:nn]-cx[0:nn-n] y[nn-n1:nn] = cx[nn-1] - cx[nn-n:nn-n+n1] else: y[0:max(0, nn-n1)] = cx[min(n1, nn):nn] y[max(nn-n1, 0):min(n-n1, nn)] = cx[nn-1] y[min(n-n1, nn):nn] = cx[nn-1] - cx[0:max(0, nn-(n-n1))] return y def nextpow2(i): return 2**int(math.ceil(math.log(i)/math.log(2.))) def snapper_w_offset(nmax, offset, delta, snapfun=math.ceil): def snap(x): return max(0, min(int(snapfun((x-offset)/delta)), nmax)) return snap def snapper(nmax, delta, snapfun=math.ceil): def snap(x): return max(0, min(int(snapfun(x/delta)), nmax)) return snap def apply_costaper(a, b, c, d, y, x0, dx): hi = snapper_w_offset(y.size, x0, dx) y[:hi(a)] = 0. y[hi(a):hi(b)] *= 0.5 \ - 0.5*num.cos((dx*num.arange(hi(a), hi(b))-(a-x0))/(b-a)*num.pi) y[hi(c):hi(d)] *= 0.5 \ + 0.5*num.cos((dx*num.arange(hi(c), hi(d))-(c-x0))/(d-c)*num.pi) y[hi(d):] = 0. def span_costaper(a, b, c, d, y, x0, dx): hi = snapper_w_offset(y.size, x0, dx) return hi(a), hi(d) - hi(a) def costaper(a, b, c, d, nfreqs, deltaf): hi = snapper(nfreqs, deltaf) tap = num.zeros(nfreqs) tap[hi(a):hi(b)] = 0.5 \ - 0.5*num.cos((deltaf*num.arange(hi(a), hi(b))-a)/(b-a)*num.pi) tap[hi(b):hi(c)] = 1. tap[hi(c):hi(d)] = 0.5 \ + 0.5*num.cos((deltaf*num.arange(hi(c), hi(d))-c)/(d-c)*num.pi) return tap def t2ind(t, tdelta, snap=round): return int(snap(t/tdelta))
[docs]def hilbert(x, N=None): ''' Return the hilbert transform of x of length N. (from scipy.signal, but changed to use fft and ifft from numpy.fft) ''' x = num.asarray(x) if N is None: N = len(x) if N <= 0: raise ValueError("N must be positive.") if num.iscomplexobj(x): logger.warning('imaginary part of x ignored.') x = num.real(x) Xf = num.fft.fft(x, N, axis=0) h = num.zeros(N) if N % 2 == 0: h[0] = h[N//2] = 1 h[1:N//2] = 2 else: h[0] = 1 h[1:(N+1)//2] = 2 if len(x.shape) > 1: h = h[:, num.newaxis] x = num.fft.ifft(Xf*h) return x
def near(a, b, eps): return abs(a-b) < eps def coroutine(func): def wrapper(*args, **kwargs): gen = func(*args, **kwargs) next(gen) return gen wrapper.__name__ = func.__name__ wrapper.__dict__ = func.__dict__ wrapper.__doc__ = func.__doc__ return wrapper
[docs]class States(object): ''' Utility to store channel-specific state in coroutines. ''' def __init__(self): self._states = {} def get(self, tr): k = tr.nslc_id if k in self._states: tmin, deltat, dtype, value = self._states[k] if (near(tmin, tr.tmin, deltat/100.) and near(deltat, tr.deltat, deltat/10000.) and dtype == tr.ydata.dtype): return value return None def set(self, tr, value): k = tr.nslc_id if k in self._states and self._states[k][-1] is not value: self.free(self._states[k][-1]) self._states[k] = (tr.tmax+tr.deltat, tr.deltat, tr.ydata.dtype, value) def free(self, value): pass
@coroutine def co_list_append(list): while True: list.append((yield))
[docs]class ScipyBug(Exception): pass
[docs]@coroutine def co_lfilter(target, b, a): ''' Successively filter broken continuous trace data (coroutine). Create coroutine which takes :py:class:`Trace` objects, filters their data through :py:func:`scipy.signal.lfilter` and sends new :py:class:`Trace` objects containing the filtered data to target. This is useful, if one wants to filter a long continuous time series, which is split into many successive traces without producing filter artifacts at trace boundaries. Filter states are kept *per channel*, specifically, for each (network, station, location, channel) combination occuring in the input traces, a separate state is created and maintained. This makes it possible to filter multichannel or multistation data with only one :py:func:`co_lfilter` instance. Filter state is reset, when gaps occur. Use it like this:: from pyrocko.trace import co_lfilter, co_list_append filtered_traces = [] pipe = co_lfilter(co_list_append(filtered_traces), a, b) for trace in traces: pipe.send(trace) pipe.close() ''' try: states = States() output = None while True: input = (yield) zi = states.get(input) if zi is None: zi = num.zeros(max(len(a), len(b))-1, dtype=num.float) output = input.copy(data=False) try: ydata, zf = signal.lfilter(b, a, input.get_ydata(), zi=zi) except ValueError: raise ScipyBug( 'signal.lfilter failed: could be related to a bug ' 'in some older scipy versions, e.g. on opensuse42.1') output.set_ydata(ydata) states.set(input, zf) target.send(output) except GeneratorExit: target.close()
def co_antialias(target, q, n=None, ftype='fir'): b, a, n = util.decimate_coeffs(q, n, ftype) anti = co_lfilter(target, b, a) return anti @coroutine def co_dropsamples(target, q, nfir): try: states = States() while True: tr = (yield) newdeltat = q * tr.deltat ioffset = states.get(tr) if ioffset is None: # for fir filter, the first nfir samples are pulluted by # boundary effects; cut it off. # for iir this may be (much) more, we do not correct for that. # put sample instances to a time which is a multiple of the # new sampling interval. newtmin_want = math.ceil( (tr.tmin+(nfir+1)*tr.deltat) / newdeltat) * newdeltat \ - (nfir/2*tr.deltat) ioffset = int(round((newtmin_want - tr.tmin)/tr.deltat)) if ioffset < 0: ioffset = ioffset % q newtmin_have = tr.tmin + ioffset * tr.deltat newtr = tr.copy(data=False) newtr.deltat = newdeltat # because the fir kernel shifts data by nfir/2 samples: newtr.tmin = newtmin_have - (nfir/2*tr.deltat) newtr.set_ydata(tr.get_ydata()[ioffset::q].copy()) states.set(tr, (ioffset % q - tr.data_len() % q) % q) target.send(newtr) except GeneratorExit: target.close()
[docs]def co_downsample(target, q, n=None, ftype='fir'): ''' Successively downsample broken continuous trace data (coroutine). Create coroutine which takes :py:class:`Trace` objects, downsamples their data and sends new :py:class:`Trace` objects containing the downsampled data to target. This is useful, if one wants to downsample a long continuous time series, which is split into many successive traces without producing filter artifacts and gaps at trace boundaries. Filter states are kept *per channel*, specifically, for each (network, station, location, channel) combination occuring in the input traces, a separate state is created and maintained. This makes it possible to filter multichannel or multistation data with only one :py:func:`co_lfilter` instance. Filter state is reset, when gaps occur. The sampling instances are choosen so that they occur at (or as close as possible) to even multiples of the sampling interval of the downsampled trace (based on system time). ''' b, a, n = util.decimate_coeffs(q, n, ftype) return co_antialias(co_dropsamples(target, q, n), q, n, ftype)
@coroutine def co_downsample_to(target, deltat): decimators = {} try: while True: tr = (yield) ratio = deltat / tr.deltat rratio = round(ratio) if abs(rratio - ratio)/ratio > 0.0001: raise util.UnavailableDecimation('ratio = %g' % ratio) deci_seq = tuple(x for x in util.decitab(int(rratio)) if x != 1) if deci_seq not in decimators: pipe = target for q in deci_seq[::-1]: pipe = co_downsample(pipe, q) decimators[deci_seq] = pipe decimators[deci_seq].send(tr) except GeneratorExit: for g in decimators.values(): g.close()
[docs]class DomainChoice(StringChoice): choices = [ 'time_domain', 'frequency_domain', 'envelope', 'absolute', 'cc_max_norm']
[docs]class MisfitSetup(Object): ''' Contains misfit setup to be used in :py:func:`trace.misfit` :param description: Description of the setup :param norm: L-norm classifier :param taper: Object of :py:class:`Taper` :param filter: Object of :py:class:`FrequencyResponse` :param domain: ['time_domain', 'frequency_domain', 'envelope', 'absolute', 'cc_max_norm'] Can be dumped to a yaml file. ''' xmltagname = 'misfitsetup' description = String.T(optional=True) norm = Int.T(optional=False) taper = Taper.T(optional=False) filter = FrequencyResponse.T(optional=True) domain = DomainChoice.T(default='time_domain')
[docs]def equalize_sampling_rates(trace_1, trace_2): ''' Equalize sampling rates of two traces (reduce higher sampling rate to lower). :param trace_1: :py:class:`Trace` object :param trace_2: :py:class:`Trace` object Returns a copy of the resampled trace if resampling is needed. ''' if same_sampling_rate(trace_1, trace_2): return trace_1, trace_2 if trace_1.deltat < trace_2.deltat: t1_out = trace_1.copy() t1_out.downsample_to(deltat=trace_2.deltat, snap=True) logger.debug('Trace downsampled (return copy of trace): %s' % '.'.join(t1_out.nslc_id)) return t1_out, trace_2 elif trace_1.deltat > trace_2.deltat: t2_out = trace_2.copy() t2_out.downsample_to(deltat=trace_1.deltat, snap=True) logger.debug('Trace downsampled (return copy of trace): %s' % '.'.join(t2_out.nslc_id)) return trace_1, t2_out
[docs]def Lx_norm(u, v, norm=2): ''' Calculate the misfit denominator *m* and the normalization devisor *n* according to norm. The normalization divisor *n* is calculated from ``v``. :param u: :py:class:`numpy.array` :param v: :py:class:`numpy.array` :param norm: (default = 2) ``u`` and ``v`` must be of same size. ''' if norm == 1: return ( num.sum(num.abs(v-u)), num.sum(num.abs(v))) elif norm == 2: return ( num.sqrt(num.sum((v-u)**2)), num.sqrt(num.sum(v**2))) else: return ( num.power(num.sum(num.abs(num.power(v - u, norm))), 1./norm), num.power(num.sum(num.abs(num.power(v, norm))), 1./norm))
def do_downsample(tr, deltat): if abs(tr.deltat - deltat) / tr.deltat > 1e-6: tr = tr.copy() tr.downsample_to(deltat, snap=True, demean=False) else: if tr.tmin/tr.deltat > 1e-6 or tr.tmax/tr.deltat > 1e-6: tr = tr.copy() tr.snap() return tr def do_extend(tr, tmin, tmax): if tmin < tr.tmin or tmax > tr.tmax: tr = tr.copy() tr.extend(tmin=tmin, tmax=tmax, fillmethod='repeat') return tr def do_pre_taper(tr, taper): return tr.taper(taper, inplace=False, chop=True) def do_fft(tr, filter): if filter is None: return tr else: ndata = tr.ydata.size nfft = nextpow2(ndata) padded = num.zeros(nfft, dtype=num.float) padded[:ndata] = tr.ydata spectrum = num.fft.rfft(padded) df = 1.0 / (tr.deltat * nfft) frequencies = num.arange(spectrum.size)*df return [tr, frequencies, spectrum] def do_filter(inp, filter): if filter is None: return inp else: tr, frequencies, spectrum = inp spectrum *= filter.evaluate(frequencies) return [tr, frequencies, spectrum] def do_ifft(inp): if isinstance(inp, Trace): return inp else: tr, _, spectrum = inp ndata = tr.ydata.size tr = tr.copy(data=False) tr.set_ydata(num.fft.irfft(spectrum)[:ndata]) return tr def check_alignment(t1, t2): if abs(t1.tmin-t2.tmin) > t1.deltat * 1e-4 or \ abs(t1.tmax - t2.tmax) > t1.deltat * 1e-4 or \ t1.ydata.shape != t2.ydata.shape: raise MisalignedTraces( 'Cannot calculate misfit of %s and %s due to misaligned ' 'traces.' % ('.'.join(t1.nslc_id), '.'.join(t2.nslc_id)))