# - GPLv3
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

Squirrel client to access FDSN web services for seismic waveforms and metadata.

import time
import os
import copy
import logging
import tempfile
import importlib.util
from collections import defaultdict
    import cPickle as pickle
except ImportError:
    import pickle
import os.path as op
from .base import Source, Constraint
from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
    order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
    codes_to_str_abbreviated, CodesNSLCE
from ..database import ExecuteGet1Error
from pyrocko.squirrel.error import SquirrelError
from pyrocko.client import fdsn

from pyrocko import util, trace, io
from import FileLoadError
from import stationxml
from pyrocko import progress
from pyrocko import has_paths

from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
    Duration, Bool, clone, dump_all_spickle

guts_prefix = 'squirrel'

logger = logging.getLogger('psq.client.fdsn')

g_sites_not_supporting = {
    'startbefore': ['geonet'],
    'includerestricted': ['geonet', 'ncedc', 'scedc']}

g_keys_conflicting_post_codes = {
    'network', 'station', 'location', 'channel', 'minlatitude', 'maxlatitude',
    'minlongitude', 'maxlongitude', 'latitude', 'longitude', 'minradius',

def make_task(*args):
    return progress.task(*args, logger=logger)

def diff(fn_a, fn_b):
        if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
            return True

    except OSError:
        return True

    with open(fn_a, 'rb') as fa:
        with open(fn_b, 'rb') as fb:
            while True:
                a =
                b =
                if a != b:
                    return True

                if len(a) == 0 or len(b) == 0:
                    return False

def move_or_keep(fn_temp, fn):
    if op.exists(fn):
        if diff(fn, fn_temp):
            os.rename(fn_temp, fn)
            status = 'updated'
            status = 'upstream unchanged'

        os.rename(fn_temp, fn)
        status = 'new'

    return status

[docs]class Archive(Object): def add(self): raise NotImplementedError()
[docs]class MSeedArchive(Archive): template = String.T(default=op.join( '%(tmin_year)s', '%(tmin_month)s', '%(tmin_day)s', 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) def __init__(self, **kwargs): Archive.__init__(self, **kwargs) self._base_path = None def set_base_path(self, path): self._base_path = path def add(self, order, trs): path = op.join(self._base_path, self.template) fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' return, path, overwrite=True, additional=dict( block_tmin_us=util.time_to_str(order.tmin, format=fmt), block_tmax_us=util.time_to_str(order.tmax, format=fmt)))
def combine_selections(selection): out = [] last = None for this in selection: if last and this[:4] == last[:4] and this[4] == last[5]: last = last[:5] + (this[5],) else: if last: out.append(last) last = this if last: out.append(last) return out def orders_sort_key(order): return (, order.tmin) def orders_to_selection(orders, pad=1.0): selection = [] nslc_to_deltat = {} for order in sorted(orders, key=orders_sort_key): selection.append( + (order.tmin, order.tmax)) nslc_to_deltat[] = order.deltat selection = combine_selections(selection) selection_padded = [] for (net, sta, loc, cha, tmin, tmax) in selection: deltat = nslc_to_deltat[net, sta, loc, cha] selection_padded.append(( net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat)) return selection_padded def codes_to_selection(codes_list, tmin, tmax): if codes_list is None: return None selection = [] for codes in sorted(codes_list): selection.append( codes.nslc + (tmin, tmax)) return selection
[docs]class ErrorEntry(Object): time = Timestamp.T() order = WaveformOrder.T() kind = String.T() details = String.T(optional=True)
[docs]class ErrorAggregate(Object): site = String.T() kind = String.T() details = String.T() entries = List.T(ErrorEntry.T()) codes = List.T(CodesNSLCE.T()) time_spans = List.T(Tuple.T(2, Timestamp.T())) def __str__(self): codes = [str(x) for x in] scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' tss = self.time_spans sspans = '\n' + util.ewrap(('%s - %s' % ( util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), indent=' ') return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 'Codes:%s\n Time spans:%s') % (, self.kind, len(self.entries), ' Details: %s\n' % self.details if self.details else '', scodes, sspans)
[docs]class ErrorLog(Object): site = String.T() entries = List.T(ErrorEntry.T()) checkpoints = List.T(Int.T()) def append_checkpoint(self): self.checkpoints.append(len(self.entries)) def append(self, time, order, kind, details=''): entry = ErrorEntry(time=time, order=order, kind=kind, details=details) self.entries.append(entry) def iter_aggregates(self): by_kind_details = defaultdict(list) for entry in self.entries: by_kind_details[entry.kind, entry.details].append(entry) kind_details = sorted(by_kind_details.keys()) for kind, details in kind_details: entries = by_kind_details[kind, details] codes = sorted(set( for entry in entries)) selection = orders_to_selection(entry.order for entry in entries) time_spans = sorted(set(row[-2:] for row in selection)) yield ErrorAggregate(, kind=kind, details=details, entries=entries, codes=codes, time_spans=time_spans) def summarize_recent(self): ioff = self.checkpoints[-1] if self.checkpoints else 0 recent = self.entries[ioff:] kinds = sorted(set(entry.kind for entry in recent)) if recent: return '%i error%s (%s)' % ( len(recent), util.plural_s(recent), '; '.join(kinds)) else: return ''
class Aborted(SquirrelError): pass
[docs]class FDSNSource(Source, has_paths.HasPaths): ''' Squirrel data-source to transparently get data from FDSN web services. Attaching an :py:class:`FDSNSource` object to a :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download station and waveform data from an FDSN web service should the data not already happen to be available locally. ''' site = String.T( help='FDSN site url or alias name (see ' ':py:mod:`pyrocko.client.fdsn`).') query_args = Dict.T( String.T(), String.T(), optional=True, help='Common query arguments, which are appended to all queries.') codes = List.T( CodesNSLCE.T(), optional=True, help='List of codes patterns to query via POST parameters.') expires = Duration.T( optional=True, help='Expiration time [s]. Information older than this will be ' 'refreshed. This only applies to station-metadata. Waveforms do ' 'not expire. If set to ``None`` neither type of data expires.') cache_path = String.T( optional=True, help='Directory path where any downloaded waveforms and station ' 'meta-data are to be kept. By default the Squirrel ' "environment's cache directory is used.") shared_waveforms = Bool.T( default=False, help='If ``True``, waveforms are shared with other FDSN sources in ' 'the same Squirrel environment. If ``False``, they are kept ' 'separate.') user_credentials = Tuple.T( 2, String.T(), optional=True, help='User and password for FDSN servers requiring password ' 'authentication') auth_token = String.T( optional=True, help='Authentication token to be presented to the FDSN server.') auth_token_path = String.T( optional=True, help='Path to file containing the authentication token to be ' 'presented to the FDSN server.') hotfix_module_path = has_paths.Path.T( optional=True, help='Path to Python module to locally patch metadata errors.') def __init__(self, site, query_args=None, codes=None, **kwargs): if codes: codes = [CodesNSLCE(codes_) for codes_ in codes] if codes is not None and query_args is not None: conflicting = g_keys_conflicting_post_codes \ & set(query_args.keys()) if conflicting: raise SquirrelError( 'Cannot use %s in `query_args` when `codes` are also ' 'given.' % ' or '.join("'%s'" % k for k in conflicting)) Source.__init__( self, site=site, query_args=query_args, codes=codes, **kwargs) self._constraint = None self._hash = self.make_hash() self._source_id = 'client:fdsn:%s' % self._hash self._error_infos = [] def describe(self): return self._source_id def make_hash(self): s = s += 'notoken' \ if (self.auth_token is None and self.auth_token_path is None) \ else 'token' if self.user_credentials is not None: s += self.user_credentials[0] else: s += 'nocred' if self.query_args is not None: s += ','.join( '%s:%s' % (k, self.query_args[k]) for k in sorted(self.query_args.keys())) else: s += 'noqueryargs' if is not None: s += 'post_codes:' + ','.join( codes.safe_str for codes in return ehash(s) def get_hash(self): return self._hash def get_auth_token(self): if self.auth_token: return self.auth_token elif self.auth_token_path is not None: try: with open(self.auth_token_path, 'rb') as f: return'ascii') except OSError as e: raise FileLoadError( 'Cannot load auth token file (%s): %s' % (str(e), self.auth_token_path)) else: raise Exception( 'FDSNSource: auth_token and auth_token_path are mutually ' 'exclusive.') def setup(self, squirrel, check=True): self._cache_path = op.join( self.cache_path or squirrel._cache_path, 'fdsn') util.ensuredir(self._cache_path) self._load_constraint() self._archive = MSeedArchive() waveforms_path = self._get_waveforms_path() util.ensuredir(waveforms_path) self._archive.set_base_path(waveforms_path) squirrel.add( self._get_waveforms_path(), check=check) fn = self._get_channels_path() if os.path.exists(fn): squirrel.add(fn) squirrel.add_virtual( [], virtual_paths=[self._source_id]) responses_path = self._get_responses_path() if os.path.exists(responses_path): squirrel.add( responses_path, kinds=['response'], exclude=r'\.temp$') self._hotfix_module = None def _hotfix(self, query_type, sx): if self.hotfix_module_path is None: return if self._hotfix_module is None: module_path = self.expand_path(self.hotfix_module_path) spec = importlib.util.spec_from_file_location( 'hotfix_' + self._hash, module_path) self._hotfix_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(self._hotfix_module) hook = getattr( self._hotfix_module, 'stationxml_' + query_type + '_hook') return hook(sx) def _get_constraint_path(self): return op.join(self._cache_path, self._hash, 'constraint.pickle') def _get_channels_path(self): return op.join(self._cache_path, self._hash, 'channels.stationxml') def _get_responses_path(self, nslc=None): dirpath = op.join( self._cache_path, self._hash, 'responses') if nslc is None: return dirpath else: return op.join( dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) def _get_waveforms_path(self): if self.shared_waveforms: return op.join(self._cache_path, 'waveforms') else: return op.join(self._cache_path, self._hash, 'waveforms') def _log_meta(self, message, log_prefix = 'FDSN "%s" metadata:' % target(' '.join((log_prefix, message))) def _log_responses(self, message, log_prefix = 'FDSN "%s" responses:' % target(' '.join((log_prefix, message))) def _log_info_data(self, *args): log_prefix = 'FDSN "%s" waveforms:' %' '.join((log_prefix,) + args)) def _str_expires(self, t, now): if t is None: return 'expires: never' else: expire = 'expires' if t > now else 'expired' return '%s: %s' % ( expire, util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) def update_channel_inventory(self, squirrel, constraint=None): if constraint is None: constraint = Constraint() expiration_time = self._get_channels_expiration_time() now = time.time() log_target = if self._constraint and self._constraint.contains(constraint) \ and (expiration_time is None or now < expiration_time): status = 'using cached' else: if self._constraint: constraint_temp = copy.deepcopy(self._constraint) constraint_temp.expand(constraint) constraint = constraint_temp try: channel_sx = self._do_channel_query(constraint) channel_sx.created = None # timestamp would ruin diff fn = self._get_channels_path() util.ensuredirs(fn) fn_temp = fn + '.%i.temp' % os.getpid() dump_all_spickle([channel_sx], filename=fn_temp) # channel_sx.dump_xml(filename=fn_temp) status = move_or_keep(fn_temp, fn) if status == 'upstream unchanged': squirrel.get_database().silent_touch(fn) self._constraint = constraint self._dump_constraint() except OSError as e: status = 'update failed (%s)' % str(e) log_target = logger.error expiration_time = self._get_channels_expiration_time() self._log_meta( '%s (%s)' % (status, self._str_expires(expiration_time, now)), target=log_target) fn = self._get_channels_path() if os.path.exists(fn): squirrel.add(fn) def _do_channel_query(self, constraint): extra_args = {} tmin = constraint.tmin \ if constraint.tmin is not None and constraint.tmin != g_tmin \ else g_tmin_queries tmax = constraint.tmax \ if constraint.tmax is not None and constraint.tmax != g_tmax \ else g_tmax if in g_sites_not_supporting['startbefore']: ktmin = 'starttime' ktmax = 'endtime' else: ktmin = 'endafter' ktmax = 'startbefore' if is None: extra_args[ktmin] = tmin extra_args[ktmax] = tmax if not in g_sites_not_supporting['includerestricted']: extra_args.update( includerestricted=( self.user_credentials is not None or self.auth_token is not None or self.auth_token_path is not None)) if self.query_args is not None: extra_args.update(self.query_args) self._log_meta('querying...') try: channel_sx = fdsn.station(, format='text', level='channel', selection=codes_to_selection(, tmin, tmax), **extra_args) self._hotfix('channel', channel_sx) return channel_sx except fdsn.EmptyResult: return stationxml.FDSNStationXML(source='dummy-empty-result') except fdsn.DownloadError as e: raise SquirrelError(str(e)) def _load_constraint(self): fn = self._get_constraint_path() if op.exists(fn): with open(fn, 'rb') as f: self._constraint = pickle.load(f) else: self._constraint = None def _dump_constraint(self): with open(self._get_constraint_path(), 'wb') as f: pickle.dump(self._constraint, f, protocol=2) def _get_expiration_time(self, path): if self.expires is None: return None try: t = os.stat(path)[8] return t + self.expires except OSError: return 0.0 def _get_channels_expiration_time(self): return self._get_expiration_time(self._get_channels_path()) def update_waveform_promises(self, squirrel, constraint): from ..base import gaps cpath = os.path.abspath(self._get_channels_path()) ctmin = constraint.tmin ctmax = constraint.tmax nuts = squirrel.iter_nuts( 'channel', path=cpath,, tmin=ctmin, tmax=ctmax) coverages = squirrel.get_coverage( 'waveform', if else None, tmin=ctmin, tmax=ctmax) codes_to_avail = defaultdict(list) for coverage in coverages: for tmin, tmax, _ in coverage.iter_spans(): codes_to_avail[].append((tmin, tmax)) def sgaps(nut): for tmin, tmax in gaps( codes_to_avail[], max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): subnut = clone(nut) subnut.tmin = tmin subnut.tmax = tmax # ignore 1-sample gaps produced by rounding errors if subnut.tmax - subnut.tmin < 2*subnut.deltat: continue yield subnut def wanted(nuts): for nut in nuts: for nut in sgaps(nut): yield nut path = self._source_id squirrel.add_virtual( (make_waveform_promise_nut( file_path=path, **nut.waveform_promise_kwargs) for nut in wanted(nuts)), virtual_paths=[path])
[docs] def remove_waveform_promises(self, squirrel, from_database='selection'): ''' Remove waveform promises from live selection or global database. :param from_database: Remove from live selection ``'selection'`` or global database ``'global'``. ''' path = self._source_id if from_database == 'selection': squirrel.remove(path) elif from_database == 'global': squirrel.get_database().remove(path) else: raise ValueError( 'Values allowed for from_database: ("selection", "global")')
def _get_user_credentials(self): d = {} if self.user_credentials is not None: d['user'], d['passwd'] = self.user_credentials if self.auth_token is not None or self.auth_token_path is not None: d['token'] = self.get_auth_token() return d def download_waveforms( self, orders, success, batch_add, error_permanent, error_temporary): elog = ErrorLog( orders.sort(key=orders_sort_key) neach = 20 i = 0 task = make_task( 'FDSN "%s" waveforms: downloading' %, len(orders)) while i < len(orders): orders_now = orders[i:i+neach] selection_now = orders_to_selection(orders_now) nsamples_estimate = sum( order.estimate_nsamples() for order in orders_now) nsuccess = 0 elog.append_checkpoint() self._log_info_data( 'downloading, %s' % order_summary(orders_now)) all_paths = [] with tempfile.TemporaryDirectory() as tmpdir: try: data = fdsn.dataselect(, selection=selection_now, **self._get_user_credentials()) now = time.time() path = op.join(tmpdir, 'tmp.mseed') with open(path, 'wb') as f: nread = 0 while True: buf = nread += len(buf) if not buf: break f.write(buf) # abort if we get way more data than expected if nread > max( 1024 * 1000, nsamples_estimate * 4 * 10): raise Aborted('Too much data received.') trs = io.load(path) by_nslc = defaultdict(list) for tr in trs: by_nslc[tr.nslc_id].append(tr) for order in orders_now: trs_order = [] err_this = None for tr in by_nslc[]: try: order.validate(tr) trs_order.append(tr.chop( order.tmin, order.tmax, inplace=False)) except trace.NoData: err_this = ( 'empty result', 'empty sub-interval') except InvalidWaveform as e: err_this = ('invalid waveform', str(e)) if len(trs_order) == 0: if err_this is None: err_this = ('empty result', '') elog.append(now, order, *err_this) if order.is_near_real_time(): error_temporary(order) else: error_permanent(order) else: def tsame(ta, tb): return abs(tb - ta) < 2 * order.deltat if len(trs_order) != 1 \ or not tsame( trs_order[0].tmin, order.tmin) \ or not tsame( trs_order[0].tmax, order.tmax): if err_this: elog.append( now, order, 'partial result, %s' % err_this[0], err_this[1]) else: elog.append(now, order, 'partial result') paths = self._archive.add(order, trs_order) all_paths.extend(paths) nsuccess += 1 success(order, trs_order) except fdsn.EmptyResult: now = time.time() for order in orders_now: elog.append(now, order, 'empty result') if order.is_near_real_time(): error_temporary(order) else: error_permanent(order) except Aborted as e: now = time.time() for order in orders_now: elog.append(now, order, 'aborted', str(e)) error_permanent(order) except (util.HTTPError, fdsn.DownloadError) as e: now = time.time() for order in orders_now: elog.append(now, order, 'http error', str(e)) error_temporary(order) emessage = elog.summarize_recent() self._log_info_data( '%i download%s %ssuccessful' % ( nsuccess, util.plural_s(nsuccess), '(partially) ' if emessage else '') + (', %s' % emessage if emessage else '')) if all_paths: batch_add(all_paths) i += neach task.update(i) for agg in elog.iter_aggregates(): logger.warning(str(agg)) task.done() def _do_response_query(self, selection): extra_args = {} if not in g_sites_not_supporting['includerestricted']: extra_args.update( includerestricted=( self.user_credentials is not None or self.auth_token is not None or self.auth_token_path is not None)) self._log_responses('querying...') try: response_sx = fdsn.station(, level='response', selection=selection, **extra_args) self._hotfix('response', response_sx) return response_sx except fdsn.EmptyResult: return stationxml.FDSNStationXML(source='dummy-empty-result') except fdsn.DownloadError as e: raise SquirrelError(str(e)) def update_response_inventory(self, squirrel, constraint): cpath = os.path.abspath(self._get_channels_path()) nuts = squirrel.iter_nuts( 'channel', path=cpath, tmin = g_tmin_queries tmax = g_tmax selection = [] now = time.time() have = set() status = defaultdict(list) for nut in nuts: nslc = if nslc in have: continue have.add(nslc) fn = self._get_responses_path(nslc) expiration_time = self._get_expiration_time(fn) if os.path.exists(fn) \ and (expiration_time is None or now < expiration_time): status['using cached'].append(nslc) else: selection.append(nslc + (tmin, tmax)) dummy = stationxml.FDSNStationXML(source='dummy-empty') neach = 100 i = 0 fns = [] while i < len(selection): selection_now = selection[i:i+neach] i += neach try: sx = self._do_response_query(selection_now) except Exception as e: status['update failed (%s)' % str(e)].extend( entry[:4] for entry in selection_now) continue sx.created = None # timestamp would ruin diff by_nslc = dict(stationxml.split_channels(sx)) for entry in selection_now: nslc = entry[:4] response_sx = by_nslc.get(nslc, dummy) try: fn = self._get_responses_path(nslc) fn_temp = fn + '.%i.temp' % os.getpid() util.ensuredirs(fn_temp) dump_all_spickle([response_sx], filename=fn_temp) # response_sx.dump_xml(filename=fn_temp) status_this = move_or_keep(fn_temp, fn) if status_this == 'upstream unchanged': try: squirrel.get_database().silent_touch(fn) except ExecuteGet1Error: pass status[status_this].append(nslc) fns.append(fn) except OSError as e: status['update failed (%s)' % str(e)].append(nslc) for k in sorted(status): if k.find('failed') != -1: log_target = logger.error else: log_target = self._log_responses( '%s: %s' % ( k, codes_to_str_abbreviated( CodesNSLCE(tup) for tup in status[k])), target=log_target) if fns: squirrel.add(fns, kinds=['response'])
__all__ = [ 'FDSNSource', ]