Source code for pyrocko.hamster_pile
# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import
import os
import logging
from . import pile, io
from . import trace as tracemod
logger = logging.getLogger('pyrocko.hamster_pile')
class Processor(object):
def __init__(self):
self._buffers = {}
def process(self, trace):
return [trace]
def get_buffer(self, trace):
nslc = trace.nslc_id
if nslc not in self._buffers:
return None
trbuf = self._buffers[nslc]
if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
and trbuf.ydata.dtype == trace.ydata.dtype
and trbuf.deltat == trace.deltat):
return trbuf
else:
return None
def set_buffer(self, trace):
nslc = trace.nslc_id
self._buffers[nslc] = trace
def empty_buffer(self, trace):
nslc = trace.nslc_id
del self._buffers[nslc]
def flush_buffers(self):
traces = list(self._buffers.values())
self._buffers = {}
return traces
class Renamer(object):
def __init__(self, mapping):
self._mapping = mapping
def process(self, trace):
target_id = self._mapping(trace)
if target_id is None:
return []
out = trace.copy()
out.set_codes(*target_id)
return [out]
class Chain(Processor):
def __init__(self, *processors):
self._processors = processors
def process(self, trace):
traces = [trace]
xtraces = []
for p in self._processors:
for tr in traces:
xtraces.extend(p.process(traces))
return xtraces
class Downsampler(Processor):
def __init__(self, mapping, deltat):
Processor.__init__(self)
self._mapping = mapping
self._downsampler = tracemod.co_downsample_to(deltat)
def process(self, trace):
target_id = self._mapping(trace)
if target_id is None:
return []
ds_trace = self._downsampler.send(trace)
ds_trace.set_codes(*target_id)
if ds_trace.data_len() == 0:
return []
return [ds_trace]
def __del__(self):
self._downsampler.close()
class Grower(Processor):
def __init__(self, tflush=None):
Processor.__init__(self)
self._tflush = tflush
def process(self, trace):
buffer = self.get_buffer(trace)
if buffer is None:
buffer = trace
self.set_buffer(buffer)
else:
buffer.append(trace.ydata)
if buffer.tmax - buffer.tmin >= self._tflush:
self.empty_buffer(buffer)
return [buffer]
else:
return []
[docs]class HamsterPile(pile.Pile):
def __init__(
self,
fixation_length=None,
path=None,
format='from_extension',
forget_fixed=False,
processors=None):
pile.Pile.__init__(self)
self._buffers = {} # keys: nslc, values: MemTracesFile
self._fixation_length = fixation_length
self._format = format
self._path = path
self._forget_fixed = forget_fixed
if processors is None:
self._processors = [Processor()]
else:
self._processors = []
for p in processors:
self.add_processor(p)
[docs] def set_fixation_length(self, length):
'''Set length after which the fixation method is called on buffer traces.
The length should be given in seconds. Give None to disable.
'''
self.fixate_all()
self._fixation_length = length # in seconds
def set_save_path(
self,
path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
'%(tmin)s_%(tmax)s.mseed'):
self.fixate_all()
self._path = path
def add_processor(self, processor):
self.fixate_all()
self._processors.append(processor)
def insert_trace(self, trace):
logger.debug('Received a trace: %s' % trace)
for p in self._processors:
for tr in p.process(trace):
self._insert_trace(tr)
def _insert_trace(self, trace):
buf = self._append_to_buffer(trace)
nslc = trace.nslc_id
if buf is None: # create new buffer trace
if nslc in self._buffers:
self._fixate(self._buffers[nslc])
trbuf = trace.copy()
buf = pile.MemTracesFile(None, [trbuf])
self.add_file(buf)
self._buffers[nslc] = buf
buf.recursive_grow_update([trace])
trbuf = buf.get_traces()[0]
if self._fixation_length is not None:
if trbuf.tmax - trbuf.tmin > self._fixation_length:
self._fixate(buf)
del self._buffers[nslc]
def _append_to_buffer(self, trace):
'''Try to append the trace to the active buffer traces.
Returns the current buffer trace or None if unsuccessful.
'''
nslc = trace.nslc_id
if nslc not in self._buffers:
return None
buf = self._buffers[nslc]
trbuf = buf.get_traces()[0]
if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
and trbuf.ydata.dtype == trace.ydata.dtype
and trbuf.deltat == trace.deltat):
trbuf.append(trace.ydata)
return buf
return None
def fixate_all(self):
for buf in self._buffers.values():
self._fixate(buf)
self._buffers = {}
def _fixate(self, buf):
if self._path:
trbuf = buf.get_traces()[0]
fns = io.save([trbuf], self._path, format=self._format)
self.remove_file(buf)
if not self._forget_fixed:
self.load_files(
fns, show_progress=False, fileformat=self._format)
def drop_older(self, tmax, delete_disk_files=False):
self.drop(
condition=lambda file: file.tmax < tmax,
delete_disk_files=delete_disk_files)
def drop(self, condition, delete_disk_files=False):
candidates = []
buffers = list(self._buffers.values())
for file in self.iter_files():
if condition(file) and file not in buffers:
candidates.append(file)
self.remove_files(candidates)
if delete_disk_files:
for file in candidates:
if file.abspath and os.path.exists(file.abspath):
os.unlink(file.abspath)
def __del__(self):
self.fixate_all()