Source code for pyrocko.squirrel.tool.commands.jackseis

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

'''
Implementation of :app:`squirrel jackseis`.
'''

import re
import sys
import logging
import os.path as op

import numpy as num

from pyrocko import io, trace, util
from pyrocko import progress
from pyrocko.has_paths import Path, HasPaths
from pyrocko.guts import Dict, String, Choice, Float, List, Timestamp, \
    StringChoice, IntChoice, Defer, load_all, clone
from pyrocko.squirrel.dataset import Dataset
from pyrocko.squirrel.client.local import LocalData
from pyrocko.squirrel.error import ToolError, SquirrelError
from pyrocko.squirrel.model import CodesNSLCE, QuantityType
from pyrocko.squirrel.operators.base import NetworkGrouping, StationGrouping, \
    ChannelGrouping, SensorGrouping
from pyrocko.squirrel.tool.common import ldq

tts = util.time_to_str

guts_prefix = 'jackseis'
logger = logging.getLogger('psq.cli.jackseis')


def dset(kwargs, keys, values):
    for k, v in zip(keys, values):
        kwargs[k] = v


def make_task(*args):
    return progress.task(*args, logger=logger)


def parse_rename_rule_from_string(s):
    s = s.strip()
    if re.match(r'^([^:,]*:[^:,]*,?)+', s):
        return dict(
            x.split(':') for x in s.strip(',').split(','))
    else:
        return s


class JackseisError(ToolError):
    pass


class Chain(object):
    def __init__(self, node, parent=None):
        self.node = node
        self.parent = parent

    def mcall(self, name, *args, **kwargs):
        ret = []
        if self.parent is not None:
            ret.append(self.parent.mcall(name, *args, **kwargs))

        ret.append(getattr(self.node, name)(*args, **kwargs))
        return ret

    def fcall(self, name, *args, **kwargs):
        v = getattr(self.node, name)(*args, **kwargs)
        if v is None and self.parent is not None:
            return self.parent.fcall(name, *args, **kwargs)
        else:
            return v

    def get(self, name):
        v = getattr(self.node, name)
        if v is None and self.parent is not None:
            return self.parent.get(name)
        else:
            return v

    def dget(self, name, k):
        v = getattr(self.node, name).get(k, None)
        if v is None and self.parent is not None:
            return self.parent.dget(name, k)
        else:
            return v


[docs]class OutputFormatChoice(StringChoice): choices = io.allowed_formats('save')
[docs]class OutputDataTypeChoice(StringChoice): choices = ['int32', 'int64', 'float32', 'float64'] name_to_dtype = { 'int32': num.int32, 'int64': num.int64, 'float32': num.float32, 'float64': num.float64}
[docs]class TraversalChoice(StringChoice): choices = ['network', 'station', 'channel', 'sensor'] name_to_grouping = { 'network': NetworkGrouping(), 'station': StationGrouping(), 'sensor': SensorGrouping(), 'channel': ChannelGrouping()}
[docs]class Converter(HasPaths): in_dataset = Dataset.T(optional=True) in_path = String.T(optional=True) in_paths = List.T(String.T(optional=True)) codes = List.T(CodesNSLCE.T(), optional=True) rename = Dict.T( String.T(), Choice.T([ String.T(), Dict.T(String.T(), String.T())])) tmin = Timestamp.T(optional=True) tmax = Timestamp.T(optional=True) tinc = Float.T(optional=True) downsample = Float.T(optional=True) quantity = QuantityType.T(optional=True) fmin = Float.T(optional=True) fmax = Float.T(optional=True) fcut_factor = Float.T(optional=True) fmin_cut = Float.T(optional=True) fmax_cut = Float.T(optional=True) out_path = Path.T(optional=True) out_sds_path = Path.T(optional=True) out_format = OutputFormatChoice.T(optional=True) out_data_type = OutputDataTypeChoice.T(optional=True) out_mseed_record_length = IntChoice.T( optional=True, choices=list(io.mseed.VALID_RECORD_LENGTHS)) out_mseed_steim = IntChoice.T( optional=True, choices=[1, 2]) out_meta_path = Path.T(optional=True) traversal = TraversalChoice.T(optional=True) parts = List.T(Defer('Converter.T')) def get_effective_frequency_taper(self, chain): fmin = chain.get('fmin') fmax = chain.get('fmax') if None in (fmin, fmax): if fmin is not None: raise JackseisError('Converter: fmax not specified.') if fmax is not None: raise JackseisError('Converter: fmin not specified.') return None fcut_factor = chain.get('fcut_factor') or 2.0 fmin_cut = chain.get('fmin_cut') fmax_cut = chain.get('fmax_cut') fmin_cut = fmin_cut if fmin_cut is not None else fmin / fcut_factor fmax_cut = fmax_cut if fmax_cut is not None else fmax * fcut_factor return fmin_cut, fmin, fmax, fmax_cut @classmethod def add_arguments(cls, p): p.add_squirrel_query_arguments(without=['time']) p.add_argument( '--tinc', dest='tinc', type=float, metavar='SECONDS', help='Set time length of output files [s].') p.add_argument( '--downsample', dest='downsample', type=float, metavar='RATE', help='Downsample to RATE [Hz].') p.add_argument( '--quantity', dest='quantity', metavar='QUANTITY', choices=QuantityType.choices, help=''' Restitute waveforms to selected ``QUANTITY``. Restitution is performed by multiplying the waveform spectra with a tapered inverse of the instrument response transfer function. The frequency band of the taper can be adjusted using the ``--band`` option. Choices: %s. '''.strip() % ldq(QuantityType.choices)) p.add_argument( '--band', metavar='FMIN,FMAX or FMIN,FMAX,CUTFACTOR or ' 'FMINCUT,FMIN,FMAX,FMAXCUT', help=''' Frequency band used in restitution (see ``--quantity``) or for (acausal) filtering. Waveform spectra are multiplied with a taper with cosine-shaped flanks and which is flat between ``FMIN`` and ``FMAX``. The flanks of the taper drop to zero at ``FMINCUT`` and ``FMAXCUT``. If ``CUTFACTOR`` is given, ``FMINCUT`` and ``FMAXCUT`` are set to ``FMIN/CUTFACTOR`` and ``FMAX*CUTFACTOR`` respectively. ``CUTFACTOR`` defaults to 2. '''.strip()) p.add_argument( '--out-path', dest='out_path', metavar='TEMPLATE', help=''' Set output path to ``TEMPLATE``. Available placeholders are ``%%n``: network, ``%%s``: station, ``%%l``: location, ``%%c``: channel, ``%%b``: begin time, ``%%e``: end time, ``%%j``: julian day of year. The following additional placeholders use the current processing window's begin and end times rather than trace begin and end times (to suppress producing many small files for gappy traces), ``%%(wmin_year)s``, ``%%(wmin_month)s``, ``%%(wmin_day)s``, ``%%(wmin)s``, ``%%(wmin_jday)s``, ``%%(wmax_year)s``, ``%%(wmax_month)s``, ``%%(wmax_day)s``, ``%%(wmax)s``, ``%%(wmax_jday)s``. Example: ``--out-path 'data/%%s/trace-%%s-%%c.mseed'`` '''.strip()) p.add_argument( '--out-sds-path', dest='out_sds_path', metavar='PATH', help=''' Set output path to create an SDS archive (https://www.seiscomp.de/seiscomp3/doc/applications/slarchive/SDS.html), rooted at PATH. Implies ``--tinc 86400``. Example: ``--out-sds-path data/sds`` '''.strip()) p.add_argument( '--out-format', dest='out_format', choices=io.allowed_formats('save'), metavar='FORMAT', help='Set output file format. Choices: %s' % io.allowed_formats( 'save', 'cli_help', 'mseed')) p.add_argument( '--out-data-type', dest='out_data_type', choices=OutputDataTypeChoice.choices, metavar='DTYPE', help='Set numerical data type. Choices: %s. The output file ' 'format must support the given type. By default, the data ' 'type is kept unchanged.' % ldq( OutputDataTypeChoice.choices)) p.add_argument( '--out-mseed-record-length', dest='out_mseed_record_length', type=int, choices=io.mseed.VALID_RECORD_LENGTHS, metavar='INT', help='Set the Mini-SEED record length in bytes. Choices: %s. ' 'Default is 4096 bytes, which is commonly used for archiving.' % ldq(str(b) for b in io.mseed.VALID_RECORD_LENGTHS)) p.add_argument( '--out-mseed-steim', dest='out_mseed_steim', type=int, choices=(1, 2), metavar='INT', help='Set the Mini-SEED STEIM compression. Choices: ``1`` or ' '``2``. Default is STEIM-2. Note: STEIM-2 is limited to 30 ' 'bit dynamic range.') p.add_argument( '--out-meta-path', dest='out_meta_path', metavar='PATH', help='Set output path for station metadata (StationXML) export.') p.add_argument( '--traversal', dest='traversal', metavar='GROUPING', choices=TraversalChoice.choices, help='By default the outermost processing loop is over time. ' 'Add outer loop with given GROUPING. Choices: %s' % ldq(TraversalChoice.choices)) p.add_argument( '--rename-network', dest='rename_network', metavar='REPLACEMENT', help=""" Replace network code. REPLACEMENT can be a string for direct replacement, a mapping for selective replacement, or a regular expression for tricky replacements. Examples: Direct replacement: ```XX``` - set all network codes to ```XX```. Mapping: ```AA:XX,BB:YY``` - replace ```AA``` with ```XX``` and ```BB``` with ```YY```. Regular expression: ```/A(\\d)/X\\1/``` - replace ```A1``` with ```X1``` and ```A2``` with ```X2``` etc. """.strip()) p.add_argument( '--rename-station', dest='rename_station', metavar='REPLACEMENT', help='Replace station code. See ``--rename-network``.') p.add_argument( '--rename-location', dest='rename_location', metavar='REPLACEMENT', help='Replace location code. See ``--rename-network``.') p.add_argument( '--rename-channel', dest='rename_channel', metavar='REPLACEMENT', help='Replace channel code. See ``--rename-network``.') p.add_argument( '--rename-extra', dest='rename_extra', metavar='REPLACEMENT', help='Replace extra code. See ``--rename-network``. Note: the ' '```extra``` code is not available in Mini-SEED.') @classmethod def from_arguments(cls, args): kwargs = args.squirrel_query rename = {} for (k, v) in [ ('network', args.rename_network), ('station', args.rename_station), ('location', args.rename_location), ('channel', args.rename_channel), ('extra', args.rename_extra)]: if v is not None: rename[k] = parse_rename_rule_from_string(v) if args.band: try: values = list(map(float, args.band.split(','))) if len(values) not in (2, 3, 4): raise ValueError() if len(values) == 2: dset(kwargs, 'fmin fmax'.split(), values) elif len(values) == 3: dset(kwargs, 'fmin fmax fcut_factor'.split(), values) elif len(values) == 4: dset(kwargs, 'fmin_cut fmin fmax fmax_cut'.split(), values) except ValueError: raise JackseisError( 'Invalid argument to --band: %s' % args.band) from None obj = cls( downsample=args.downsample, quantity=args.quantity, out_format=args.out_format, out_path=args.out_path, tinc=args.tinc, out_sds_path=args.out_sds_path, out_data_type=args.out_data_type, out_mseed_record_length=args.out_mseed_record_length, out_mseed_steim=args.out_mseed_steim, out_meta_path=args.out_meta_path, traversal=args.traversal, rename=rename, **kwargs) obj.validate() return obj def add_dataset(self, sq): if self.in_dataset is not None: sq.add_dataset(self.in_dataset) if self.in_path is not None: ds = Dataset(sources=[LocalData(paths=[self.in_path])]) ds.set_basepath_from(self) sq.add_dataset(ds) if self.in_paths: ds = Dataset(sources=[LocalData(paths=self.in_paths)]) ds.set_basepath_from(self) sq.add_dataset(ds) def get_effective_rename_rules(self, chain): d = {} for k in ['network', 'station', 'location', 'channel']: v = chain.dget('rename', k) if isinstance(v, str): m = re.match(r'/([^/]+)/([^/]*)/', v) if m: try: v = (re.compile(m.group(1)), m.group(2)) except Exception: raise JackseisError( 'Invalid replacement pattern: /%s/' % m.group(1)) d[k] = v return d def get_effective_out_path(self): nset = sum(x is not None for x in ( self.out_path, self.out_sds_path)) if nset > 1: raise JackseisError( 'More than one out of [out_path, out_sds_path] set.') is_sds = False if self.out_path: out_path = self.out_path elif self.out_sds_path: out_path = op.join( self.out_sds_path, '%(wmin_year)s/%(network)s/%(station)s/%(channel)s.D' '/%(network)s.%(station)s.%(location)s.%(channel)s.D' '.%(wmin_year)s.%(wmin_jday)s') is_sds = True else: out_path = None if out_path is not None: return self.expand_path(out_path), is_sds else: return None def get_effective_out_meta_path(self): if self.out_meta_path is not None: return self.expand_path(self.out_meta_path) else: return None def do_rename(self, rules, tr): rename = {} for k in ['network', 'station', 'location', 'channel']: v = rules.get(k, None) if isinstance(v, str): rename[k] = v elif isinstance(v, dict): try: oldval = getattr(tr, k) rename[k] = v[oldval] except KeyError: raise ToolError( 'No mapping defined for %s code "%s".' % (k, oldval)) elif isinstance(v, tuple): pat, repl = v oldval = getattr(tr, k) newval, n = pat.subn(repl, oldval) if n: rename[k] = newval tr.set_codes(**rename) def convert(self, args, chain=None): if chain is None: defaults = clone(g_defaults) defaults.set_basepath_from(self) chain = Chain(defaults) chain = Chain(self, chain) if self.parts: task = make_task('Jackseis parts') for part in task(self.parts): part.convert(args, chain) del task else: sq = args.make_squirrel() cli_overrides = Converter.from_arguments(args) cli_overrides.set_basepath('.') chain = Chain(cli_overrides, chain) chain.mcall('add_dataset', sq) tmin = chain.get('tmin') tmax = chain.get('tmax') tinc = chain.get('tinc') codes = chain.get('codes') downsample = chain.get('downsample') out_path, is_sds = chain.fcall('get_effective_out_path') \ or (None, False) if is_sds and tinc != 86400.: logger.warning('Setting time window to 1 day to generate SDS.') tinc = 86400.0 out_format = chain.get('out_format') out_data_type = chain.get('out_data_type') out_meta_path = chain.fcall('get_effective_out_meta_path') if out_meta_path is not None: sx = sq.get_stationxml(codes=codes, tmin=tmin, tmax=tmax) util.ensuredirs(out_meta_path) sx.dump_xml(filename=out_meta_path) if out_path is None: return target_deltat = None if downsample is not None: target_deltat = 1.0 / float(downsample) save_kwargs = {} if out_format == 'mseed': save_kwargs['record_length'] = chain.get( 'out_mseed_record_length') save_kwargs['steim'] = chain.get( 'out_mseed_steim') traversal = chain.get('traversal') if traversal is not None: grouping = TraversalChoice.name_to_grouping[traversal] else: grouping = None frequency_taper = self.get_effective_frequency_taper(chain) if frequency_taper is not None: if frequency_taper[0] != 0.0: frequency_taper_tpad = 1.0 / frequency_taper[0] else: if frequency_taper[1] == 0.0: raise JackseisError( 'Converter: fmin must be greater than zero.') frequency_taper_tpad = 2.0 / frequency_taper[1] else: frequency_taper_tpad = 0.0 quantity = chain.get('quantity') do_transfer = \ quantity is not None or frequency_taper is not None tpad = 0.0 if target_deltat is not None: tpad += target_deltat * 50. if do_transfer: tpad += frequency_taper_tpad task = None rename_rules = self.get_effective_rename_rules(chain) for batch in sq.chopper_waveforms( tmin=tmin, tmax=tmax, tpad=tpad, tinc=tinc, codes=codes, snap_window=True, grouping=grouping): if task is None: task = make_task( 'Jackseis blocks', batch.n * batch.ngroups) tlabel = '%s%s - %s' % ( 'groups %i / %i: ' % (batch.igroup, batch.ngroups) if batch.ngroups > 1 else '', util.time_to_str(batch.tmin), util.time_to_str(batch.tmax)) task.update(batch.i + batch.igroup * batch.n, tlabel) twmin = batch.tmin twmax = batch.tmax traces = batch.traces if target_deltat is not None: downsampled_traces = [] for tr in traces: try: tr.downsample_to( target_deltat, snap=True, demean=False, allow_upsample_max=4) downsampled_traces.append(tr) except (trace.TraceTooShort, trace.NoData) as e: logger.warning(str(e)) traces = downsampled_traces if do_transfer: restituted_traces = [] for tr in traces: try: if quantity is not None: resp = sq.get_response(tr).get_effective( input_quantity=quantity) else: resp = None restituted_traces.append(tr.transfer( frequency_taper_tpad, frequency_taper, transfer_function=resp, invert=True)) except (trace.NoData, trace.TraceTooShort, SquirrelError) as e: logger.warning(str(e)) traces = restituted_traces for tr in traces: self.do_rename(rename_rules, tr) if out_data_type: for tr in traces: tr.ydata = tr.ydata.astype( OutputDataTypeChoice.name_to_dtype[out_data_type]) chopped_traces = [] for tr in traces: try: otr = tr.chop(twmin, twmax, inplace=False) chopped_traces.append(otr) except trace.NoData: pass traces = chopped_traces if out_path is not None: try: io.save( traces, out_path, format=out_format, overwrite=args.force, additional=dict( wmin_year=tts(twmin, format='%Y'), wmin_month=tts(twmin, format='%m'), wmin_day=tts(twmin, format='%d'), wmin_jday=tts(twmin, format='%j'), wmin=tts(twmin, format='%Y-%m-%d_%H-%M-%S'), wmax_year=tts(twmax, format='%Y'), wmax_month=tts(twmax, format='%m'), wmax_day=tts(twmax, format='%d'), wmax_jday=tts(twmax, format='%j'), wmax=tts(twmax, format='%Y-%m-%d_%H-%M-%S')), **save_kwargs) except io.FileSaveError as e: raise JackseisError(str(e)) else: for tr in traces: print(tr.summary_stats) if task: task.done()
g_defaults = Converter( out_mseed_record_length=4096, out_format='mseed', out_mseed_steim=2) headline = 'Convert waveform archive data.' def make_subparser(subparsers): return subparsers.add_parser( 'jackseis', help=headline, description=headline) def setup(parser): parser.add_squirrel_selection_arguments() parser.add_argument( '--config', dest='config_path', metavar='NAME', help='File containing `jackseis.Converter` settings.') parser.add_argument( '--dump-config', dest='dump_config', action='store_true', default=False, help=''' Print configuration file snippet representing given command line arguments to standard output and exit. Only command line options affecting the conversion are included in the dump. Additional ``--config`` settings, data collection and data query options are ignored. '''.strip()) parser.add_argument( '--force', dest='force', action='store_true', default=False, help='Force overwriting of existing files.') Converter.add_arguments(parser) def run(parser, args): if args.dump_config: converter = Converter.from_arguments(args) print(converter) sys.exit(0) if args.config_path: try: converters = load_all(filename=args.config_path) except Exception as e: raise ToolError(str(e)) for converter in converters: if not isinstance(converter, Converter): raise ToolError( 'Config file should only contain ' '`jackseis.Converter` objects.') converter.set_basepath(op.dirname(args.config_path)) else: converter = Converter() converter.set_basepath('.') converters = [converter] with progress.view(): task = make_task('Jackseis jobs') for converter in task(converters): converter.convert(args)