# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
'''
Reader for `DiGOS DATA-CUBE³ <https://digos.eu/the-seismic-data-recorder/>`_
raw data.
'''
import os
import math
import logging
import warnings
import numpy as num
from pyrocko import trace, util, plot
from pyrocko.guts import Object, Int, String, Timestamp
from typing import NamedTuple
from . import io_common
logger = logging.getLogger(__name__)
HOUR = 3600.
DAY = 24 * HOUR
WEEK = 7 * DAY
WNRO = 1024 * WEEK # GPS Week Number Rollover period
N_GPS_TAGS_WANTED = 200 # must match definition in datacube_ext.c
APPLY_SUBSAMPLE_SHIFT_CORRECTION = True
def color(c):
c = plot.color(c)
return tuple(x/255. for x in c)
class DataCubeError(io_common.FileLoadError):
pass
class ControlPointError(Exception):
pass
[docs]def check_WNRO(utc_gps_offsets: num.ndarray, leap_seconds: int) -> bool:
"""Checks for GPS week number rollover offset."""
# give some tolerance, we expect larger offsets from 1024 week rollovers
max_utc_gps_offset = utc_gps_offsets.max()
if max_utc_gps_offset == 0:
return False
return abs(leap_seconds + max_utc_gps_offset) > 1
def make_control_point(ipos_block, t_block, tref, deltat):
# reduce time (no drift would mean straight line)
tred = (t_block - tref) - ipos_block*deltat
# first round, remove outliers
q25, q75 = num.percentile(tred, (25., 75.))
iok = num.logical_and(q25 <= tred, tred <= q75)
# detrend
slope, offset = num.polyfit(ipos_block[iok], tred[iok], 1)
tred2 = tred - (offset + slope * ipos_block)
# second round, remove outliers based on detrended tred, refit
q25, q75 = num.percentile(tred2, (25., 75.))
iok = num.logical_and(q25 <= tred2, tred2 <= q75)
x = ipos_block[iok].copy()
ipos0 = x[0]
x -= ipos0
y = tred[iok].copy()
if x.size < 2:
raise ControlPointError('Insufficient number control points after QC.')
elif x.size < 5: # needed for older numpy versions
(slope, offset) = num.polyfit(x, y, 1)
else:
(slope, offset), cov = num.polyfit(x, y, 1, cov=True)
slope_err, offset_err = num.sqrt(num.diag(cov))
slope_err_limit = 1.0e-10
if ipos_block.size < N_GPS_TAGS_WANTED // 2:
slope_err_limit *= (200. / ipos_block.size)
offset_err_limit = 5.0e-3
if slope_err > slope_err_limit:
raise ControlPointError(
'Slope error too large: %g (limit: %g' % (
slope_err, slope_err_limit))
if offset_err > offset_err_limit:
raise ControlPointError(
'Offset error too large: %g (limit: %g' % (
offset_err, offset_err_limit))
ic = ipos_block[ipos_block.size//2]
tc = offset + slope * (ic - ipos0)
return ic, tc + ic * deltat + tref
def analyse_gps_tags(header, gps_tags, offset, nsamples):
ipos, times, fix, sat_count = gps_tags
deltat = 1.0 / int(header['S_RATE'])
tquartz = offset + ipos * deltat
toff = times - tquartz
toff_median = num.median(toff)
n = times.size
dtdt = (times[1:n] - times[0:n-1]) / (tquartz[1:n] - tquartz[0:n-1])
ok = abs(toff_median - toff) < 10.
xok = num.abs(dtdt - 1.0) < 0.00001
if ok.size >= 1:
ok[0] = False
ok[1:n] &= xok
ok[0:n-1] &= xok
ok[n-1] = False
ipos = ipos[ok]
times = times[ok]
fix = fix[ok]
sat_count = sat_count[ok]
blocksize = N_GPS_TAGS_WANTED // 2
if ipos.size < blocksize:
blocksize = max(10, ipos.size // 4)
warnings.warn(
'Small number of GPS tags found. '
'Reducing analysis block size to %i tags. '
'Time correction may be unreliable.' % blocksize)
try:
if ipos.size < blocksize:
raise ControlPointError(
'Cannot determine GPS time correction: '
'Too few GPS tags found: %i' % ipos.size)
j = 0
control_points = []
tref = num.median(times - ipos*deltat)
while j < ipos.size - blocksize:
ipos_block = ipos[j:j+blocksize]
t_block = times[j:j+blocksize]
try:
ic, tc = make_control_point(ipos_block, t_block, tref, deltat)
control_points.append((ic, tc))
except ControlPointError as e:
logger.debug(str(e))
j += blocksize
ipos_last = ipos[-blocksize:]
t_last = times[-blocksize:]
try:
ic, tc = make_control_point(ipos_last, t_last, tref, deltat)
control_points.append((ic, tc))
except ControlPointError as e:
logger.debug(str(e))
if len(control_points) < 2:
raise ControlPointError(
'Could not safely determine time corrections from GPS: '
'unable to construct two or more control points')
i0, t0 = control_points[0]
i1, t1 = control_points[1]
i2, t2 = control_points[-2]
i3, t3 = control_points[-1]
if len(control_points) == 2:
tmin = t0 - i0 * deltat - offset * deltat
tmax = t3 + (nsamples - i3 - 1) * deltat
else:
icontrol = num.array(
[x[0] for x in control_points], dtype=num.int64)
tcontrol = num.array(
[x[1] for x in control_points], dtype=float)
# robust against steps:
slope = num.median(
(tcontrol[1:] - tcontrol[:-1])
/ (icontrol[1:] - icontrol[:-1]))
tmin = t0 + (offset - i0) * slope
tmax = t2 + (offset + nsamples - 1 - i2) * slope
if offset < i0:
control_points[0:0] = [(offset, tmin)]
if offset + nsamples - 1 > i3:
control_points.append((offset + nsamples - 1, tmax))
icontrol = num.array([x[0] for x in control_points], dtype=num.int64)
tcontrol = num.array([x[1] for x in control_points], dtype=float)
# corrected 2021-10-26: This sub-sample time shift introduced by the
# Cube's ADC was previously not recognized.
if APPLY_SUBSAMPLE_SHIFT_CORRECTION:
tcontrol -= 0.199 * deltat + 0.0003
return tmin, tmax, icontrol, tcontrol, ok
except ControlPointError as e:
logger.error(str(e))
tmin = util.str_to_time(header['S_DATE'] + header['S_TIME'],
format='%y/%m/%d%H:%M:%S')
idat = int(header['DAT_NO'])
if idat == 0:
tmin = tmin + util.gps_utc_offset(tmin)
else:
tmin = util.day_start(tmin + idat * DAY) \
+ util.gps_utc_offset(tmin)
tmax = tmin + (nsamples - 1) * deltat
icontrol = num.array([offset, offset+nsamples - 1], dtype=num.int64)
tcontrol = num.array([tmin, tmax])
return tmin, tmax, icontrol, tcontrol, ok
def plot_gnss_location_timeline(fns):
from matplotlib import pyplot as plt
from pyrocko.orthodrome import latlon_to_ne_numpy
not_ = num.logical_not
fig = plt.figure()
axes = []
for i in range(4):
axes.append(
fig.add_subplot(4, 1, i+1, sharex=axes[-1] if axes else None))
background_colors = [
color('aluminium1'),
color('aluminium2')]
tref = None
for ifn, fn in enumerate(fns):
header, gps_tags, nsamples = get_time_infos(fn)
_, t, fix, nsvs, lats, lons, elevations, _, gps_utc_offset = gps_tags
leap_seconds = util.utc_gps_offset(num.median(t))
if check_WNRO(gps_utc_offset, leap_seconds):
t += WNRO
fix = fix.astype(bool)
if t.size < 2:
logger.warning('Need at least 2 gps tags for plotting: %s' % fn)
if tref is None:
tref = util.day_start(t[0])
lat, lon, elevation = coordinates_from_gps(gps_tags)
norths, easts = latlon_to_ne_numpy(lat, lon, lats, lons)
for ax, data in zip(axes, (norths, easts, elevations, nsvs)):
tspan = t[num.array([0, -1])]
ax.axvspan(*((tspan - tref) / HOUR),
color=background_colors[ifn % 2])
med = num.median(data)
ax.plot(
(tspan - tref) / HOUR,
[med, med],
ls='--',
c='k',
lw=3,
alpha=0.5)
ax.plot(
(t[not_(fix)] - tref) / HOUR, data[not_(fix)], 'o',
ms=1.5,
mew=0,
color=color('scarletred2'))
ax.plot(
(t[fix] - tref) / HOUR, data[fix], 'o',
ms=1.5,
mew=0,
color=color('aluminium6'))
for ax in axes:
ax.grid(alpha=.3)
ax_lat, ax_lon, ax_elev, ax_nsv = axes
ax_lat.set_ylabel('Northing [m]')
ax_lon.set_ylabel('Easting [m]')
ax_elev.set_ylabel('Elevation [m]')
ax_nsv.set_ylabel('Number of Satellites')
ax_lat.get_xaxis().set_tick_params(labelbottom=False)
ax_lon.get_xaxis().set_tick_params(labelbottom=False)
ax_nsv.set_xlabel(
'Hours after %s' % util.time_to_str(tref, format='%Y-%m-%d'))
fig.suptitle(
u'Lat: %.5f\u00b0 Lon: %.5f\u00b0 Elevation: %g m' % (
lat, lon, elevation))
plt.show()
def coordinates_from_gps(gps_tags):
ipos, t, fix, nsvs, lats, lons, elevations, temps, gps_utc_offset = \
gps_tags
return tuple(num.median(x) for x in (lats, lons, elevations))
def extract_stations(fns):
import io
import sys
from pyrocko.model import Station
from pyrocko.guts import dump_all
stations = {}
for fn in fns:
sta_name = os.path.splitext(fn)[1].lstrip('.')
if sta_name in stations:
logger.warning('Cube %s already in list!', sta_name)
continue
header, gps_tags, nsamples = get_time_infos(fn)
lat, lon, elevation = coordinates_from_gps(gps_tags)
sta = Station(
network='',
station=sta_name,
name=sta_name,
location='',
lat=lat,
lon=lon,
elevation=elevation)
stations[sta_name] = sta
f = io.BytesIO()
dump_all(stations.values(), stream=f)
sys.stdout.write(f.getvalue().decode())
def plot_timeline(fns):
from matplotlib import pyplot as plt
fig = plt.figure()
axes = fig.gca()
if isinstance(fns, str):
fn = fns
if os.path.isdir(fn):
fns = [
os.path.join(fn, entry) for entry in sorted(os.listdir(fn))]
ipos, t, fix, nsvs, header, offset, nsamples = \
get_timing_context(fns)
else:
ipos, t, fix, nsvs, header, offset, nsamples = \
get_extended_timing_context(fn)
else:
ipos, t, fix, nsvs, header, offset, nsamples = \
get_timing_context(fns)
deltat = 1.0 / int(header['S_RATE'])
tref = num.median(t - ipos * deltat)
tref = round(tref / deltat) * deltat
if APPLY_SUBSAMPLE_SHIFT_CORRECTION:
tcorr = 0.199 * deltat + 0.0003
else:
tcorr = 0.0
x = ipos*deltat
y = (t - tref) - ipos*deltat - tcorr
bfix = fix != 0
bnofix = fix == 0
tmin, tmax, icontrol, tcontrol, ok = analyse_gps_tags(
header, (ipos, t, fix, nsvs), offset, nsamples)
la = num.logical_and
nok = num.logical_not(ok)
axes.plot(
x[la(bfix, ok)]/HOUR, y[la(bfix, ok)], '+',
ms=5, color=color('chameleon3'))
axes.plot(
x[la(bfix, nok)]/HOUR, y[la(bfix, nok)], '+',
ms=5, color=color('aluminium4'))
axes.plot(
x[la(bnofix, ok)]/HOUR, y[la(bnofix, ok)], 'x',
ms=5, color=color('chocolate3'))
axes.plot(
x[la(bnofix, nok)]/HOUR, y[la(bnofix, nok)], 'x',
ms=5, color=color('aluminium4'))
tred = tcontrol - icontrol*deltat - tref
axes.plot(icontrol*deltat/HOUR, tred, color=color('aluminium6'))
axes.plot(icontrol*deltat/HOUR, tred, 'o', ms=5, color=color('aluminium6'))
ymin = (math.floor(tred.min() / deltat)-1) * deltat
ymax = (math.ceil(tred.max() / deltat)+1) * deltat
# ymin = min(ymin, num.min(y))
# ymax = max(ymax, num.max(y))
if ymax - ymin < 1000 * deltat:
ygrid = math.floor(tred.min() / deltat) * deltat
while ygrid < ymax:
axes.axhline(ygrid, color=color('aluminium4'), alpha=0.3)
ygrid += deltat
xmin = icontrol[0]*deltat/HOUR
xmax = icontrol[-1]*deltat/HOUR
xsize = xmax - xmin
xmin -= xsize * 0.1
xmax += xsize * 0.1
axes.set_xlim(xmin, xmax)
axes.set_ylim(ymin, ymax)
axes.set_xlabel('Uncorrected (quartz) time [h]')
axes.set_ylabel('Relative time correction [s]')
plt.show()
g_dir_contexts = {}
[docs]class DirContextEntry(Object):
path = String.T()
tstart = Timestamp.T()
ifile = Int.T()
[docs]class DirContext(Object):
path = String.T()
mtime = Timestamp.T()
entries = DirContextEntry.T()
def get_entry(self, fn):
path = os.path.abspath(fn)
for entry in self.entries:
if entry.path == path:
return entry
raise Exception('entry not found')
def iter_entries(self, fn, step=1):
current = self.get_entry(fn)
by_ifile = dict(
(entry.ifile, entry) for entry in self.entries
if entry.tstart == current.tstart)
icurrent = current.ifile
while True:
icurrent += step
try:
yield by_ifile[icurrent]
except KeyError:
break
def context(fn):
from pyrocko import datacube_ext
dpath = os.path.dirname(os.path.abspath(fn))
mtimes = [os.stat(dpath)[8]]
dentries = sorted([os.path.join(dpath, f) for f in os.listdir(dpath)
if os.path.isfile(os.path.join(dpath, f))])
for dentry in dentries:
fn2 = os.path.join(dpath, dentry)
mtimes.append(os.stat(fn2)[8])
mtime = float(max(mtimes))
if dpath in g_dir_contexts:
dir_context = g_dir_contexts[dpath]
if dir_context.mtime == mtime:
return dir_context
del g_dir_contexts[dpath]
entries = []
for dentry in dentries:
fn2 = os.path.join(dpath, dentry)
if not os.path.isfile(fn2):
continue
with open(fn2, 'rb') as f:
first512 = f.read(512)
if not detect(first512):
continue
with open(fn2, 'rb') as f:
try:
header, data_arrays, gps_tags, nsamples, _ = \
datacube_ext.load(f.fileno(), 3, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
header = dict(header)
entries.append(DirContextEntry(
path=os.path.abspath(fn2),
tstart=util.str_to_time(
'20' + header['S_DATE'] + ' ' + header['S_TIME'],
format='%Y/%m/%d %H:%M:%S'),
ifile=int(header['DAT_NO'])))
dir_context = DirContext(mtime=mtime, path=dpath, entries=entries)
return dir_context
def get_time_infos(fn):
from pyrocko import datacube_ext
with open(fn, 'rb') as f:
try:
header, _, gps_tags, nsamples, _ = datacube_ext.load(
f.fileno(), 1, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
return dict(header), GPSTags.from_tuple(gps_tags), nsamples
def get_timing_context(fns):
joined = [[], [], [], []]
ioff = 0
for fn in fns:
header, gps_tags, nsamples = get_time_infos(fn)
gps_tags = GPSTags.from_tuple(gps_tags)
ipos = gps_tags.position
ipos += ioff
for i in range(4):
joined[i].append(gps_tags[i])
ioff += nsamples
ipos, t, fix, nsvs = [num.concatenate(x) for x in joined]
nsamples = ioff
return ipos, t, fix, nsvs, header, 0, nsamples
def get_extended_timing_context(fn):
c = context(fn)
header, gps_tags, nsamples_base = get_time_infos(fn)
gps_tags = GPSTags.from_tuple(gps_tags)
ioff = 0
aggregated = [gps_tags]
nsamples_total = nsamples_base
if num.sum(gps_tags.fix) == 0:
ioff = nsamples_base
for entry in c.iter_entries(fn, 1):
_, gps_tags, nsamples = get_time_infos(entry.path)
ipos = gps_tags.position
ipos += ioff
aggregated.append(gps_tags)
nsamples_total += nsamples
if num.sum(gps_tags.fix) > 0:
break
ioff += nsamples
ioff = 0
for entry in c.iter_entries(fn, -1):
_, gps_tags, nsamples = get_time_infos(entry.path)
ioff -= nsamples
ipos = gps_tags.position
ipos += ioff
aggregated[0:0] = [gps_tags]
nsamples_total += nsamples
if num.sum(gps_tags.fix) > 0:
break
concat = num.concatenate
ipos = concat([tag.position for tag in aggregated])
time = concat([tag.gps_time for tag in aggregated])
fix = concat([tag.fix for tag in aggregated])
sat_count = concat([tag.sat_count for tag in aggregated])
gps_utc_offsets = concat([tag.gps_utc_offsets for tag in aggregated])
return ipos, time, fix, sat_count, gps_utc_offsets, header, \
0, nsamples_base
def iload(fn, load_data=True, interpolation='sinc', yield_gps_tags=False):
from pyrocko import datacube_ext
from pyrocko import signal_ext
if interpolation not in ('sinc', 'off'):
raise NotImplementedError(
'no such interpolation method: %s' % interpolation)
with open(fn, 'rb') as f:
loadflag = 2 if load_data else 1
try:
header, data_arrays, gps_tags, nsamples, _ = datacube_ext.load(
f.fileno(), loadflag, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
header = dict(header)
deltat = 1.0 / int(header['S_RATE'])
nchannels = int(header['CH_NUM'])
ipos, times, fix, sat_count, utc_gps_offsets, \
header_, offset_, nsamples_ = get_extended_timing_context(fn)
tmin, tmax, icontrol, tcontrol, _ = analyse_gps_tags(
header_, (ipos, times, fix, sat_count), offset_, nsamples_)
if icontrol is None:
warnings.warn(
'No usable GPS timestamps found. Using datacube header '
'information to guess time. (file: "%s")' % fn)
tmin_ip = round(tmin / deltat) * deltat
if interpolation != 'off':
tmax_ip = round(tmax / deltat) * deltat
else:
tmax_ip = tmin_ip + (nsamples-1) * deltat
nsamples_ip = int(round((tmax_ip - tmin_ip)/deltat)) + 1
# to prevent problems with rounding errors:
tmax_ip = tmin_ip + (nsamples_ip-1) * deltat
leaps = num.array(
[x[0] + util.gps_utc_offset(x[0]) for x in util.read_leap_seconds2()],
dtype=float)
if load_data and icontrol is not None:
ncontrol_this = num.sum(
num.logical_and(0 <= icontrol, icontrol < nsamples))
if ncontrol_this <= 1:
warnings.warn(
'Extrapolating GPS time information from directory context '
'(insufficient number of GPS timestamps in file: "%s").' % fn)
for i in range(nchannels):
if load_data:
arr = data_arrays[i]
assert arr.size == nsamples
if interpolation == 'sinc' and icontrol is not None:
ydata = num.empty(nsamples_ip, dtype=float)
try:
signal_ext.antidrift(
icontrol, tcontrol,
arr.astype(float), tmin_ip, deltat, ydata)
except signal_ext.Error as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
e.set_context('n_control_points', icontrol.size)
e.set_context('n_samples_raw', arr.size)
e.set_context('n_samples_ip', ydata.size)
e.set_context('tmin_ip', util.time_to_str(tmin_ip))
raise e
ydata = num.round(ydata).astype(arr.dtype)
else:
ydata = arr
tr_tmin = tmin_ip
tr_tmax = None
else:
ydata = None
tr_tmin = tmin_ip
tr_tmax = tmax_ip
tr = trace.Trace('', header['DEV_NO'], '', 'p%i' % i, deltat=deltat,
ydata=ydata, tmin=tr_tmin, tmax=tr_tmax, meta=header)
bleaps = num.logical_and(tmin_ip <= leaps, leaps < tmax_ip)
if num.any(bleaps):
assert num.sum(bleaps) == 1
tcut = leaps[bleaps][0]
for tmin_cut, tmax_cut in [
(tr.tmin, tcut), (tcut, tr.tmax+tr.deltat)]:
try:
tr_cut = tr.chop(tmin_cut, tmax_cut, inplace=False)
ref_time = 0.5*(tr_cut.tmin+tr_cut.tmax)
leap_second = util.utc_gps_offset(ref_time)
if check_WNRO(utc_gps_offsets, leap_second):
tr_cut.shift(WNRO)
leap_second = util.utc_gps_offset(ref_time + WNRO)
tr_cut.shift(leap_second)
if yield_gps_tags:
yield tr_cut, gps_tags
else:
yield tr_cut
except trace.NoData:
pass
else:
t_ref = 0.5*(tr.tmin+tr.tmax)
leap_second = util.utc_gps_offset(t_ref)
if check_WNRO(utc_gps_offsets, leap_second):
tr.shift(WNRO)
leap_second = util.utc_gps_offset(t_ref + WNRO)
tr.shift(leap_second)
if yield_gps_tags:
yield tr, gps_tags
else:
yield tr
header_keys = {
str: b'GIPP_V DEV_NO E_NAME GPS_PO S_TIME S_DATE DAT_NO'.split(),
int: b'''P_AMPL CH_NUM S_RATE D_FILT C_MODE A_CHOP F_TIME GPS_TI GPS_OF
A_FILT A_PHAS GPS_ON ACQ_ON V_TCXO D_VOLT E_VOLT'''.split()}
all_header_keys = header_keys[str] + header_keys[int]
def detect(first512):
s = first512
if len(s) < 512:
return False
if ord(s[0:1]) >> 4 != 15:
return False
n = s.find(b'\x80')
if n == -1:
n = len(s)
s = s[1:n]
s = s.replace(b'\xf0', b'')
s = s.replace(b';', b' ')
s = s.replace(b'=', b' ')
kvs = s.split(b' ')
if len([k for k in all_header_keys if k in kvs]) == 0:
return False
return True
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Datacube reader')
parser.add_argument(
'action', choices=['timeline', 'gnss', 'stations'],
help='Action')
parser.add_argument(
'files', nargs='+')
parser.add_argument(
'--loglevel', '-l',
choices=['critical', 'error', 'warning', 'info', 'debug'],
default='info',
help='Set logger level. Default: %(default)s')
args = parser.parse_args()
util.setup_logging('pyrocko.io.datacube', args.loglevel)
logging.getLogger('matplotlib.font_manager').disabled = True
if args.action == 'timeline':
plot_timeline(args.files)
elif args.action == 'gnss':
plot_gnss_location_timeline(args.files)
elif args.action == 'stations':
extract_stations(args.files)