Source code for pyrocko.model.gnss

# - GPLv3
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

GNSS station and campaign data model.

import logging
import math
import numpy as num
from collections import OrderedDict

import pyrocko.orthodrome as od

from pyrocko.guts import (Object, Float, String, List, StringChoice,
from pyrocko.model import Location

guts_prefix = 'pf.gnss'
logger = logging.getLogger('pyrocko.model.gnss')

[docs]class GNSSComponent(Object): ''' Component of a GNSSStation ''' unit = StringChoice.T( choices=['mm', 'cm', 'm'], help='Unit of displacement', default='m') shift = Float.T( default=0., help="Component's shift in unit") sigma = Float.T( default=0., help='One sigma uncertainty of the measurement') def __add__(self, other): if not isinstance(other, self.__class__): raise AttributeError('Other has to be of instance %s' % self.__class__) comp = self.__class__() comp.shift = self.shift + other.shift comp.sigma = math.sqrt(self.sigma**2 + other.sigma**2) return comp def __iadd__(self, other): self.shift += other.shift self.sigma = math.sqrt(self.sigma**2 + other.sigma**2) return self
[docs]class GNSSStation(Location): ''' Representation of a GNSS station during a campaign measurement For more information see ''' code = String.T( help='Four letter station code', optional=True) style = StringChoice.T( choices=['static', 'rapid_static', 'kinematic'], default='static') survey_start = DateTimestamp.T( optional=True, help='Survey start time') survey_end = DateTimestamp.T( optional=True, help='Survey end time') correlation_ne = Float.T( default=0., help='North-East component correlation') correlation_eu = Float.T( default=0., help='East-Up component correlation') correlation_nu = Float.T( default=0., help='North-Up component correlation') north = GNSSComponent.T( optional=True) east = GNSSComponent.T( optional=True) up = GNSSComponent.T( optional=True) def __eq__(self, other): try: return self.code == other.code except AttributeError: return False def get_covariance_matrix(self): components = self.components.values() ncomponents = self.ncomponents covar = num.zeros((ncomponents, ncomponents)) for ic1, comp1 in enumerate(components): for ic2, comp2 in enumerate(components): corr = self._get_comp_correlation(comp1, comp2) covar[ic1, ic2] = corr * comp1.sigma * comp2.sigma # This floating point operation is inaccurate: # corr * comp1.sigma * comp2.sigma != corr * comp2.sigma * comp1.sigma # # Hence this identity covar[num.tril_indices_from(covar, k=-1)] = \ covar[num.triu_indices_from(covar, k=1)] return covar def get_correlation_matrix(self): components = self.components.values() ncomponents = self.ncomponents corr = num.zeros((ncomponents, ncomponents)) corr[num.diag_indices_from(corr)] = num.array( [c.sigma for c in components]) for ic1, comp1 in enumerate(components): for ic2, comp2 in enumerate(components): if comp1 is comp2: continue corr[ic1, ic2] = self._get_comp_correlation(comp1, comp2) # See comment at get_covariance_matrix corr[num.tril_indices_from(corr, k=-1)] = \ corr[num.triu_indices_from(corr, k=1)] return corr def get_displacement_data(self): return num.array([c.shift for c in self.components.values()]) def get_component_mask(self): return num.array( [False if self.__getattribute__(name) is None else True for name in ('north', 'east', 'up')], dtype=bool) @property def components(self): return OrderedDict( [(name, self.__getattribute__(name)) for name in ('north', 'east', 'up') if self.__getattribute__(name) is not None]) @property def ncomponents(self): return len(self.components) def _get_comp_correlation(self, comp1, comp2): if comp1 is comp2: return 1. s = self correlation_map = { (s.north, s.east): s.correlation_ne, (s.east, s.up): s.correlation_eu, (s.north, s.up): s.correlation_nu } return correlation_map.get( (comp1, comp2), correlation_map.get((comp2, comp1), False))
[docs]class GNSSCampaign(Object): stations = List.T( GNSSStation.T(), help='List of GNSS campaign measurements') name = String.T( help='Campaign name', default='Unnamed campaign') survey_start = DateTimestamp.T( optional=True) survey_end = DateTimestamp.T( optional=True) def __init__(self, *args, **kwargs): Object.__init__(self, *args, **kwargs) self._cov_mat = None self._cor_mat = None def add_station(self, station): self._cor_mat = None self._cov_mat = None return self.stations.append(station) def remove_station(self, station_code): try: station = self.get_station(station_code) self.stations.remove(station) self._cor_mat = None self._cov_mat = None except ValueError: logger.warning('Station {} does not exist in campaign, ' 'do nothing.'.format(station_code)) def get_station(self, station_code): for sta in self.stations: if sta.code == station_code: return sta raise ValueError('Could not find station %s' % station_code) def get_center_latlon(self): return od.geographic_midpoint_locations(self.stations) def get_radius(self): coords = self.coordinates return od.distance_accurate50m( coords[:, 0].min(), coords[:, 1].min(), coords[:, 0].max(), coords[:, 1].max()) / 2. def get_covariance_matrix(self): if self._cov_mat is None: ncomponents = self.ncomponents cov_arr = num.zeros((ncomponents, ncomponents)) idx = 0 for ista, sta in enumerate(self.stations): ncomp = sta.ncomponents cov_arr[idx:idx+ncomp, idx:idx+ncomp] = \ sta.get_covariance_matrix() idx += ncomp self._cov_mat = cov_arr return self._cov_mat def get_correlation_matrix(self): if self._cor_mat is None: ncomponents = self.ncomponents cor_arr = num.zeros((ncomponents, ncomponents)) idx = 0 for ista, sta in enumerate(self.stations): ncomp = sta.ncomponents cor_arr[idx:idx+ncomp, idx:idx+ncomp] = \ sta.get_correlation_matrix() idx += ncomp self._cor_mat = cor_arr return self._cor_mat def get_component_mask(self): return num.concatenate( [s.get_component_mask() for s in self.stations]) def dump(self, *args, **kwargs): self.regularize() return Object.dump(self, *args, **kwargs) @property def coordinates(self): return num.array([loc.effective_latlon for loc in self.stations]) @property def nstations(self): return len(self.stations) @property def ncomponents(self): return sum([s.ncomponents for s in self.stations])