Source code for pyrocko.io.tdms_idas

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

'''
Reader for `Silixia iDAS
<https://silixa.com/technology/idas-intelligent-distributed-acoustic-sensor/>`_
TDMS files.
'''

import logging
import os.path as op
import struct
import datetime
import mmap
import numpy as num

from pyrocko import trace

logger = logging.getLogger(__name__)


def write_property_dict(prop_dict, out_file):
    from pprint import pformat

    f = open(out_file, 'w')
    f.write('tdms_property_map=')
    f.write(pformat(prop_dict))
    f.close()


[docs]def type_not_supported(vargin): '''Function raises a NotImplementedException.''' raise NotImplementedError('Reading of this tdsDataType is not implemented')
[docs]def parse_time_stamp(fractions, seconds): ''' Convert time TDMS time representation to datetime fractions -- fractional seconds (2^-64) seconds -- The number of seconds since 1/1/1904 @rtype : datetime.datetime ''' if ( fractions is not None and seconds is not None and fractions + seconds > 0 ): return datetime.timedelta( 0, fractions * 2 ** -64 + seconds ) + datetime.datetime(1904, 1, 1) else: return None
# Enum mapping TDM data types to description string, numpy type where exists # See Ref[2] for enum values TDS_DATA_TYPE = dict( { 0x00: 'void', # tdsTypeVoid 0x01: 'int8', # tdsTypeI8 0x02: 'int16', # tdsTypeI16 0x03: 'int32', # tdsTypeI32 0x04: 'int64', # tdsTypeI64 0x05: 'uint8', # tdsTypeU8 0x06: 'uint16', # tdsTypeU16 0x07: 'uint32', # tdsTypeU32 0x08: 'uint64', # tdsTypeU64 0x09: 'float32', # tdsTypeSingleFloat 0x0A: 'float64', # tdsTypeDoubleFloat 0x0B: 'float128', # tdsTypeExtendedFloat 0x19: 'singleFloatWithUnit', # tdsTypeSingleFloatWithUnit 0x1A: 'doubleFloatWithUnit', # tdsTypeDoubleFloatWithUnit 0x1B: 'extendedFloatWithUnit', # tdsTypeExtendedFloatWithUnit 0x20: 'str', # tdsTypeString 0x21: 'bool', # tdsTypeBoolean 0x44: 'datetime', # tdsTypeTimeStamp 0xFFFFFFFF: 'raw', # tdsTypeDAQmxRawData } ) # Function mapping for reading TDMS data types TDS_READ_VAL = dict( { 'void': lambda f: None, # tdsTypeVoid 'int8': lambda f: struct.unpack('<b', f.read(1))[0], 'int16': lambda f: struct.unpack('<h', f.read(2))[0], 'int32': lambda f: struct.unpack('<i', f.read(4))[0], 'int64': lambda f: struct.unpack('<q', f.read(8))[0], 'uint8': lambda f: struct.unpack('<B', f.read(1))[0], 'uint16': lambda f: struct.unpack('<H', f.read(2))[0], 'uint32': lambda f: struct.unpack('<I', f.read(4))[0], 'uint64': lambda f: struct.unpack('<Q', f.read(8))[0], 'float32': lambda f: struct.unpack('<f', f.read(4))[0], 'float64': lambda f: struct.unpack('<d', f.read(8))[0], 'float128': type_not_supported, 'singleFloatWithUnit': type_not_supported, 'doubleFloatWithUnit': type_not_supported, 'extendedFloatWithUnit': type_not_supported, 'str': lambda f: f.read(struct.unpack('<i', f.read(4))[0]), 'bool': lambda f: struct.unpack('<?', f.read(1))[0], 'datetime': lambda f: parse_time_stamp( struct.unpack('<Q', f.read(8))[0], struct.unpack('<q', f.read(8))[0], ), 'raw': type_not_supported, } ) DECIMATE_MASK = 0b00100000 LEAD_IN_LENGTH = 28 FILEINFO_NAMES = ( 'file_tag', 'toc', 'version', 'next_segment_offset', 'raw_data_offset', )
[docs]class TdmsReader(object): '''A TDMS file reader object for reading properties and data''' def __init__(self, filename): self._properties = None self._end_of_properties_offset = None self._data_type = None self._chunk_size = None self._raw_data = None self._raw_data2 = None # The mapped data in the 'Next Segment' self._raw_last_chunk = None self._raw2_last_chunk = None self.file_size = op.getsize(filename) self._channel_length = None self._seg1_length = None self._seg2_length = None # TODO: Error if file not big enough to hold header self._tdms_file = open(filename, 'rb') # Read lead in (28 bytes): lead_in = self._tdms_file.read(LEAD_IN_LENGTH) # lead_in is 28 bytes: # [string of length 4][int32][int32][int64][int64] fields = struct.unpack('<4siiQQ', lead_in) if fields[0].decode() not in 'TDSm': msg = 'Not a TDMS file (TDSm tag not found)' raise (TypeError, msg) self.fileinfo = dict(zip(FILEINFO_NAMES, fields)) self.fileinfo['decimated'] = not bool( self.fileinfo['toc'] & DECIMATE_MASK ) # Make offsets relative to beginning of file: self.fileinfo['next_segment_offset'] += LEAD_IN_LENGTH self.fileinfo['raw_data_offset'] += LEAD_IN_LENGTH self.fileinfo['file_size'] = op.getsize(self._tdms_file.name) # TODO: Validate lead in: if self.fileinfo['next_segment_offset'] > self.file_size: self.fileinfo['next_segment_offset'] = self.file_size # raise(ValueError, "Next Segment Offset too large in TDMS header") def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._tdms_file.close() @property def channel_length(self): if self._properties is None: self.get_properties() rdo = num.int64(self.fileinfo['raw_data_offset']) nch = num.int64(self.n_channels) nso = self.fileinfo['next_segment_offset'] return num.int64( (nso - rdo) / nch / num.dtype(self._data_type).itemsize) @property def n_channels(self): if self._properties is None: self.get_properties() return self.fileinfo['n_channels']
[docs] def get_properties(self, mapped=False): ''' Return a dictionary of properties. Read from file only if necessary. ''' # Check if already hold properties in memory if self._properties is None: self._properties = self._read_properties() return self._properties
def _read_property(self): ''' Read a single property from the TDMS file. Return the name, type and value of the property as a list. ''' # Read length of object path: var = struct.unpack('<i', self._tdms_file.read(4))[0] # Read property name and type: name, data_type = struct.unpack( '<{0}si'.format(var), self._tdms_file.read(var + 4) ) # Lookup function to read and parse property value based on type: value = TDS_READ_VAL[TDS_DATA_TYPE[data_type]](self._tdms_file) name = name.decode() if data_type == 32: value = value.decode() return name, data_type, value def _read_properties(self): '''Read the properties from the file''' self._tdms_file.seek(LEAD_IN_LENGTH, 0) # Number of channels is total objects - file objects - group objects self.fileinfo['n_channels'] = ( struct.unpack('i', self._tdms_file.read(4))[0] - 2 ) # Read length of object path: var = struct.unpack('<i', self._tdms_file.read(4))[0] # skip over object path and raw data index: self._tdms_file.seek(var + 4, 1) # Read number of properties in this group: var = struct.unpack('<i', self._tdms_file.read(4))[0] # loop through and read each property properties = [self._read_property() for _ in range(var)] properties = {prop: value for (prop, type, value) in properties} self._end_of_properties_offset = self._tdms_file.tell() self._read_chunk_size() # TODO: Add number of channels to properties return properties def _read_chunk_size(self): '''Read the data chunk size from the TDMS file header.''' if self._end_of_properties_offset is None: self._read_properties() self._tdms_file.seek(self._end_of_properties_offset, 0) # skip over Group Information: var = struct.unpack('<i', self._tdms_file.read(4))[0] self._tdms_file.seek(var + 8, 1) # skip over first channel path and length of index information: var = struct.unpack('<i', self._tdms_file.read(4))[0] self._tdms_file.seek(var + 4, 1) self._data_type = TDS_DATA_TYPE.get( struct.unpack('<i', self._tdms_file.read(4))[0] ) if self._data_type not in ('int16', 'float32'): raise Exception('Unsupported TDMS data type: ' + self._data_type) # Read Dimension of the raw data array (has to be 1): # dummy = struct.unpack("<i", self._tdms_file.read(4))[0] self._chunk_size = struct.unpack('<i', self._tdms_file.read(4))[0]
[docs] def get_data(self, first_ch=0, last_ch=None, first_s=0, last_s=None): ''' Get a block of data from the TDMS file. first_ch -- The first channel to load last_ch -- The last channel to load first_s -- The first sample to load last_s -- The last sample to load ''' if self._raw_data is None: self._initialise_data() if first_ch is None or first_ch < 0: first_ch = 0 if last_ch is None or last_ch >= self.n_channels: last_ch = self.n_channels else: last_ch += 1 if last_s is None or last_s > self._channel_length: last_s = self._channel_length else: last_s += 1 nch = num.int64(max(last_ch - first_ch, 0)) ns = num.int64(max(last_s - first_s, 0)) # Allocate output container data = num.empty((ns, nch), dtype=num.dtype(self._data_type)) if data.size == 0: return data # 1. Index first block & reshape? first_blk = first_s // self._chunk_size last_blk = last_s // self._chunk_size last_full_blk = min(last_blk + 1, self._raw_data.shape[1]) nchunk = min( max(last_full_blk - first_blk, 0), self._raw_data.shape[1] ) first_s_1a = max(first_s - first_blk * self._chunk_size, 0) last_s_1a = min( last_s - first_blk * self._chunk_size, nchunk * self._chunk_size ) ind_s = 0 ind_e = ind_s + max(last_s_1a - first_s_1a, 0) d = self._raw_data[:, first_blk:last_full_blk, first_ch:last_ch] d.shape = (self._chunk_size * nchunk, nch) d.reshape((self._chunk_size * nchunk, nch), order='F') data[ind_s:ind_e, :] = d[first_s_1a:last_s_1a, :] # 2. Index first additional samples first_s_1b = max( first_s - self._raw_data.shape[1] * self._chunk_size, 0 ) last_s_1b = min( last_s - self._raw_data.shape[1] * self._chunk_size, self._raw_last_chunk.shape[0], ) ind_s = ind_e ind_e = ind_s + max(last_s_1b - first_s_1b, 0) # data_1b = self._raw_last_chunk[first_s_1b:last_s_1b,first_ch:last_ch] if ind_e > ind_s: data[ind_s:ind_e, :] = self._raw_last_chunk[ first_s_1b:last_s_1b, first_ch:last_ch ] # 3. Index second block first_s_2 = max(first_s - self._seg1_length, 0) last_s_2 = last_s - self._seg1_length if (first_s_2 > 0 or last_s_2 > 0) and self._raw_data2 is not None: first_blk_2 = max(first_s_2 // self._chunk_size, 0) last_blk_2 = max(last_s_2 // self._chunk_size, 0) last_full_blk_2 = min(last_blk_2 + 1, self._raw_data2.shape[1]) nchunk_2 = min( max(last_full_blk_2 - first_blk_2, 0), self._raw_data2.shape[1] ) first_s_2a = max(first_s_2 - first_blk_2 * self._chunk_size, 0) last_s_2a = min( last_s_2 - first_blk_2 * self._chunk_size, nchunk_2 * self._chunk_size, ) ind_s = ind_e ind_e = ind_s + max(last_s_2a - first_s_2a, 0) # data_2a = self._raw_data2[:, first_blk_2:last_full_blk_2, # first_ch:last_ch]\ # .reshape((self._chunk_size*nchunk_2, nch), order='F')\ # [first_s_2a:last_s_2a, :] if ind_e > ind_s: data[ind_s:ind_e, :] = self._raw_data2[ :, first_blk_2:last_full_blk_2, first_ch:last_ch ].reshape((self._chunk_size * nchunk_2, nch), order='F')[ first_s_2a:last_s_2a, : ] # 4. Index second additional samples if ( first_s_2 > 0 or last_s_2 > 0 ) and self._raw2_last_chunk is not None: first_s_2b = max( first_s_2 - self._raw_data2.shape[1] * self._chunk_size, 0 ) last_s_2b = min( last_s_2 - self._raw_data2.shape[1] * self._chunk_size, self._raw2_last_chunk.shape[0], ) ind_s = ind_e ind_e = ind_s + max(last_s_2b - first_s_2b, 0) # data_2b = \ # self._raw2_last_chunk[first_s_2b:last_s_2b,first_ch:last_ch] if ind_e > ind_s: data[ind_s:ind_e, :] = self._raw2_last_chunk[ first_s_2b:last_s_2b, first_ch:last_ch ] # 5. Concatenate blocks # data = num.concatenate((data_1a, data_1b, data_2a, data_2b)) if data.size == 0: data = data.reshape(0, 0) return data
def _initialise_data(self): '''Initialise the memory map for the data array.''' if self._chunk_size is None: self._read_chunk_size() dmap = mmap.mmap(self._tdms_file.fileno(), 0, access=mmap.ACCESS_READ) rdo = num.int64(self.fileinfo['raw_data_offset']) nch = num.int64(self.n_channels) # TODO: Support streaming file type? # TODO: Is this a valid calculation for ChannelLength? nso = self.fileinfo['next_segment_offset'] self._seg1_length = num.int64( (nso - rdo) / nch / num.dtype(self._data_type).itemsize ) self._channel_length = self._seg1_length if self.fileinfo['decimated']: n_complete_blk = num.int64(self._seg1_length / self._chunk_size) ax_ord = 'C' else: n_complete_blk = 0 ax_ord = 'F' self._raw_data = num.ndarray( (n_complete_blk, nch, self._chunk_size), dtype=self._data_type, buffer=dmap, offset=rdo, ) # Rotate the axes to [chunk_size, nblk, nch] self._raw_data = num.rollaxis(self._raw_data, 2) additional_samples = num.int64( self._seg1_length - n_complete_blk * self._chunk_size ) additional_samples_offset = ( rdo + n_complete_blk * nch * self._chunk_size * num.dtype(self._data_type).itemsize ) self._raw_last_chunk = num.ndarray( (nch, additional_samples), dtype=self._data_type, buffer=dmap, offset=additional_samples_offset, order=ax_ord, ) # Rotate the axes to [samples, nch] self._raw_last_chunk = num.rollaxis(self._raw_last_chunk, 1) if self.file_size == nso: self._seg2_length = 0 else: self._tdms_file.seek(nso + 12, 0) (seg2_nso, seg2_rdo) = struct.unpack( '<qq', self._tdms_file.read(2 * 8) ) self._seg2_length = ( (seg2_nso - seg2_rdo) / nch / num.dtype(self._data_type).itemsize ) if self.fileinfo['decimated']: n_complete_blk2 = num.int64( self._seg2_length / self._chunk_size) else: n_complete_blk2 = num.int64(0) self._raw_data2 = num.ndarray( (n_complete_blk2, nch, self._chunk_size), dtype=self._data_type, buffer=dmap, offset=(nso + LEAD_IN_LENGTH + seg2_rdo), ) self._raw_data2 = num.rollaxis(self._raw_data2, 2) additional_samples = num.int64( self._seg2_length - n_complete_blk2 * self._chunk_size ) additional_samples_offset = ( nso + LEAD_IN_LENGTH + seg2_rdo + n_complete_blk2 * nch * self._chunk_size * num.dtype(self._data_type).itemsize ) self._raw2_last_chunk = num.ndarray( (nch, additional_samples), dtype=self._data_type, buffer=dmap, offset=additional_samples_offset, order=ax_ord, ) # Rotate the axes to [samples, nch] self._raw2_last_chunk = num.rollaxis(self._raw2_last_chunk, 1) if self._raw_data2.size != 0 or self._raw2_last_chunk.size != 0: pass # raise Exception('Second segment contains some data, \ # not currently supported') self._channel_length = self._seg1_length + self._seg2_length
# else: # print "Not decimated" # raise Exception('Reading file with decimated flag not set is not' # ' supported yet') META_KEYS = { 'measure_length': 'MeasureLength[m]', 'start_position': 'StartPosition[m]', 'spatial_resolution': 'SpatialResolution[m]', 'fibre_index': 'FibreIndex', 'unit_calibration': 'Unit Calibration (nm)', 'start_distance': 'Start Distance (m)', 'stop_distance': 'Stop Distance (m)', 'normalization': 'Normalization', 'decimation_filter': 'Decimation Filter', 'gauge_length': 'GaugeLength', 'norm_offset': 'Norm Offset', 'source_mode': 'Source Mode', 'time_decimation': 'Time Decimation', 'zero_offset': 'Zero Offset (m)', 'p_parameter': 'P', 'p_coefficients': 'P Coefficients', 'idas_version': 'iDASVersion', 'precice_sampling_freq': 'Precise Sampling Frequency (Hz)', 'receiver_gain': 'Receiver Gain', 'continuous_mode': 'Continuous Mode', 'geo_lat': 'SystemInfomation.GPS.Latitude', 'geo_lon': 'SystemInfomation.GPS.Longitude', 'geo_elevation': 'SystemInfomation.GPS.Altitude', 'channel': None, 'unit': None } def get_meta(tdms_properties): prop = tdms_properties deltat = 1. / prop['SamplingFrequency[Hz]'] tmin = prop['GPSTimeStamp'].timestamp() fibre_meta = {key: prop.get(key_map, -1) for key, key_map in META_KEYS.items() if key_map is not None} coeff = fibre_meta['p_coefficients'] try: coeff = tuple(map(float, coeff.split('\t'))) except ValueError: coeff = tuple(map(float, coeff.split(';'))) gain = fibre_meta['receiver_gain'] try: gain = tuple(map(float, gain.split('\t'))) except ValueError: gain = tuple(map(float, gain.split(';'))) fibre_meta['receiver_gain'] = coeff fibre_meta['unit'] = 'radians' return deltat, tmin, fibre_meta def iload(filename, load_data=True): tdms = TdmsReader(filename) deltat, tmin, meta = get_meta(tdms.get_properties()) data = tdms.get_data().T.copy() if load_data else None for icha in range(tdms.n_channels): meta_cha = meta.copy() assert icha < 99999 station = '%05i' % icha meta_cha['channel'] = icha nsamples = tdms.channel_length tr = trace.Trace( network='DA', station=station, ydata=None, deltat=deltat, tmin=tmin, tmax=tmin + (nsamples - 1) * deltat, meta=meta_cha) if data is not None: tr.set_ydata(data[icha]) yield tr def detect(first512): return first512.startswith(b'TDSm.')