Source code for kite.sandbox_scene

import numpy as num
import time
from os import path as op

from pyrocko.guts import Object, List, Int, String
from pyrocko import guts

from kite.util import Subject, property_cached
from kite.scene import BaseScene, FrameConfig

# Import the modeling backends
from kite.sources import (DislocProcessor, PyrockoProcessor,

__processors__ = [DislocProcessor, PyrockoProcessor, CompoundModelProcessor]
km = 1e3
d2r = num.pi / 180.
r2d = 180. / num.pi

[docs]class SandboxSceneConfig(Object): frame = FrameConfig.T( default=FrameConfig(), help='Frame/reference configuration') extent_north = Int.T( default=800, help='Model size towards north in [px]') extent_east = Int.T( default=800, help='Model size towards east in [px]') sources = List.T( help='List of sources') reference_scene = String.T( optional=True, help='Reference kite.Scene container')
[docs]class SandboxScene(BaseScene): def __init__(self, config=None, **kwargs): self.evChanged = Subject() self.evModelUpdated = Subject() self.evConfigChanged = Subject() self._initialised = False self.config = config if config else SandboxSceneConfig() super().__init__(frame_config=self.config.frame, **kwargs) self.reference = None self._los_factors = None for attr in ('theta', 'phi'): data = kwargs.pop(attr, None) if data is not None: self.__setattr__(attr, data) self.setExtent(self.config.extent_east, self.config.extent_north) if self.config.reference_scene is not None: self.loadReferenceScene(self.config.reference_scene) @property def sources(self): """ :returns: List of sources attached sandbox :rtype: list """ return self.config.sources
[docs] def setExtent(self, east, north): """Set the sandbox's extent in pixels :param east: Pixels in East :type east: int :param north: Pixels in North :type north: int """ if self.reference is not None: self._log.warning('Cannot change a referenced model!') return self._log.debug('Changing model extent to %d px by %d px' % (east, north)) self.cols = east self.rows = north self._north = num.zeros((self.rows, self.cols)) self._east = num.zeros_like(self._north) self._down = num.zeros_like(self._north) self.theta = num.zeros_like(self._north) self.phi = num.zeros_like(self._north) self.theta.fill(num.pi/2) self.phi.fill(0.) self.config.extent_east = east self.config.extent_north = north self.frame.updateExtent() self._clearModel() self.processSources() self.evChanged.notify()
[docs] def setLOS(self, phi, theta): """Set the sandbox's LOS vector :param phi: phi in degree :type phi: int :param theta: theta in degree :type theta: int """ if self.reference is not None: self._log.warning('Cannot change a referenced model!') return self._log.debug( 'Changing model LOS to %d phi and %d theta', phi, theta) self.theta = num.full_like(self.theta, theta*r2d) self.phi = num.full_like(self.phi, phi*r2d) self.frame.updateExtent() self._clearModel() self.evChanged.notify()
@property def north(self): if not self._initialised: self.processSources() return self._north @property def east(self): if not self._initialised: self.processSources() return self._east @property def down(self): if not self._initialised: self.processSources() return self._down @property_cached def displacement(self): """ Displacement projected to LOS """ self.processSources() los_factors = self.los_rotation_factors self._displacement =\ (los_factors[:, :, 0] * -self._down + los_factors[:, :, 1] * self._east + los_factors[:, :, 2] * self._north) return self._displacement @property_cached def max_horizontal_displacement(self): """ Maximum horizontal displacement """ return num.sqrt(self._north**2 + self._east**2).max()
[docs] def addSource(self, source): """Add displacement source to sandbox :param source: Displacement Source :type source: :class:`kite.sources.meta.SandboxSource` """ if source not in self.sources: self.sources.append(source) source._sandbox = self source.evParametersChanged.subscribe(self._clearModel) self._clearModel() self._log.debug('Source %s added' % source.__class__.__name__)
[docs] def removeSource(self, source): """Remove displacement source from sandbox :param source: Displacement Source :type source: :class:`kite.sources.meta.SandboxSource` """ source.evParametersChanged.unsubscribe(self._clearModel) self.sources.remove(source) self._log.debug('Source %s removed' % source.__class__.__name__) del source self._clearModel()
[docs] def processSources(self): """ Process displacement sources and update displacements """ result = self._process(self.sources) self._north += result['north'].reshape(self.rows, self.cols) self._east += result['east'].reshape(self.rows, self.cols) self._down += result['down'].reshape(self.rows, self.cols) self._initialised = True
def processCustom(self, coordinates, sources, result_dict=None): return self._process(sources, result_dict) def _process(self, sources, result=None): if result is None: result = num.zeros( self.frame.npixel, dtype=[('north', num.float64), ('east', num.float64), ('down', num.float64)]) avail_processors = {} for proc in __processors__: avail_processors[proc.__implements__] = proc for impl in set([src.__implements__ for src in sources]): proc_sources = [src for src in sources if src.__implements__ == impl and src._cached_result is None] if not proc_sources: continue processor = avail_processors.get(impl, None) if processor is None: self._log.warning( 'Could not find source processor for %s', impl) continue t0 = time.time() proc_result = processor(self).process( proc_sources, sandbox=self, nthreads=0) src_name = proc_sources[0].__class__.__name__ self._log.debug('Processed %s (nsources:%d) using %s [%.4f s]', src_name, len(proc_sources), processor.__name__, time.time() - t0) result['north'] += proc_result['displacement.n'] result['east'] += proc_result['displacement.e'] result['down'] += proc_result['displacement.d'] return result
[docs] def loadReferenceScene(self, filename): """Load a reference kite scene container into the sandbox A reference scene could be actually measured InSAR displacements. :param filename: filename of the scene container to load [.npy, .yml] :type filename: str """ from .scene import Scene self._log.debug('Loading reference scene from %s', filename) scene = Scene.load(filename) self.setReferenceScene(scene) self.config.reference_scene = filename
[docs] def setReferenceScene(self, scene): """Set a reference scene. A reference scene could be actually measured InSAR displacements. :param scene: Kite scene :type scene: :class:`kite.Scene` """ self.config.frame = scene.config.frame self.setExtent(scene.cols, scene.rows) self.frame._updateConfig() self.phi = scene.phi self.theta = scene.theta self._los_factors = None self.reference = Reference(self, scene) self._log.debug( 'Reference scene set to', scene.meta.scene_id) self.config.reference_scene = scene.meta.filename self._clearModel()
[docs] def getKiteScene(self): """Return a :class:`kite.Scene` from current model. :returns: Scene :rtype: :class:`Scene` """ from .scene import Scene, SceneConfig self._log.debug('Creating kite.Scene from SandboxScene') config = SceneConfig() config.frame = self.frame.config config.meta.scene_id = 'Exported SandboxScene' return Scene( displacement=self.displacement, theta=self.theta, phi=self.phi, config=config)
def _clearModel(self): for arr in (self._north, self._east, self._down): arr.fill(0.) if self.reference: arr[self.reference.scene.displacement_mask] = num.nan self.displacement = None self._los_factors = None self._initialised = False self.max_horizontal_displacement = None self.evModelUpdated.notify()
[docs] def save(self, filename): """Save the sandbox as kite scene container :param filename: filename to save under :type filename: str """ _file, ext = op.splitext(filename) filename = filename if ext in ['.yml'] else filename + '.yml' self._log.debug('Saving model scene to %s' % filename) for source in self.sources: source.regularize() self.config.dump(filename='%s' % filename, header='kite.SandboxScene YAML Config')
[docs] @classmethod def load(cls, filename): """Load a :class:`kite.SandboxScene` :param filename: Config file to load [.yml] :type filename: str :returns: A sandbox from config file :rtype: :class:`kite.SandboxScene` """ config = guts.load(filename=filename) sandbox_scene = cls(config=config) sandbox_scene._log.debug('Loading config from %s' % filename) for source in sandbox_scene.sources: sandbox_scene.addSource(source) return sandbox_scene
class Reference(object): def __init__(self, model, scene): self.model = model self.scene = scene self.model.evModelUpdated.subscribe(self._clearRefence) def _clearRefence(self): self.difference = None @property_cached def difference(self): return self.scene.displacement - self.model.displacement def optimizeSource(self, callback=None): from scipy import optimize quadtree = self.scene.quadtree coordinates = quadtree.leaf_coordinates sources = self.model.sources model_result = num.zeros( coordinates.shape[0], dtype=[('north', num.float64), ('east', num.float64), ('down', num.float64)]) if len(sources) > 1 or not sources: self._log.warning( 'We can optimize single, individual sources only!') return source = sources[0] source.evParametersChanged.mute() def misfit(model_displacement, lp_norm=2): p = lp_norm mf = num.sum( num.abs(quadtree.leaf_medians - model_displacement)**p)**1./p return mf def kernel(model): source.setParametersArray(model) res = self.model.processCustom( coordinates, [source], model_result) model_displacement =\ (quadtree.leaf_los_rotation_factors[:, 0] * -res['down'] + quadtree.leaf_los_rotation_factors[:, 1] * res['east'] + quadtree.leaf_los_rotation_factors[:, 2] * res['north']) mf = misfit(model_displacement) model_result.fill(0.) return mf result = optimize.minimize( kernel, source.getParametersArray(), method='SLSQP', bounds=None, constraints=(), tol=None, callback=callback, options={ 'disp': True, 'iprint': 10, 'eps': 1.4901161193847656e-04, 'maxiter': 300, 'ftol': 1e-06}) source.evParametersChanged.unmute() self.model.sources[0].setParametersArray(result.x) return result class TestSandboxScene(SandboxScene): @classmethod def randomOkada(cls, nsources=1): from .sources import OkadaSource sandbox_scene = cls() def r(lo, hi): return float(num.random.randint( lo, high=hi, size=1)) for s in range(nsources): length = r(5000, 15000) sandbox_scene.addSource( OkadaSource( easting=r(length, sandbox_scene.frame.E.max()-length), northing=r(length, sandbox_scene.frame.N.max()-length), depth=r(0, 8000), strike=r(0, 360), dip=r(0, 90), slip=r(1, 5), rake=r(-180, 180), length=length, width=15. * length**.66,)) return sandbox_scene @classmethod def simpleOkada(cls, **kwargs): from .sources import OkadaSource sandbox_scene = cls() parameters = { 'easting': 50000, 'northing': 50000, 'depth': 0, 'strike': 180., 'dip': 20, 'slip': 2, 'rake': 90, 'length': 10000, 'width': 15. * 10000**.66, } parameters.update(kwargs) sandbox_scene.addSource(OkadaSource(**parameters)) return sandbox_scene