import re
import glob
import os
import time
from datetime import datetime
import utm
import scipy.io
import numpy as num
from kite import util
__all__ = ['Gamma', 'Matlab', 'ISCE', 'GMTSAR', 'ROI_PAC', 'SARscape']
try:
from osgeo import gdal
__all__.append('LiCSAR')
__all__.append('ARIA')
except ImportError:
pass
d2r = num.pi/180.
km = 1e3
op = os.path
LAMBDA_SENTINEL = 0.055465763
def check_required(required, params):
for r in required:
if r not in params:
return False
return True
def safe_cast(val, to_type, default=None):
try:
return to_type(val)
except (ValueError, TypeError):
return default
class HeaderError(Exception):
pass
class AttribDict(dict):
def __getattr__(self, item):
return self[item]
def __setattr__(self, item, value):
self[item] = value
class SceneIO(object):
""" Prototype class for SARIO objects. """
def __init__(self, scene=None):
if scene is not None:
self._log = scene._log.getChild('IO/%s' % self.__class__.__name__)
else:
import logging
self._log = logging.getLogger('SceneIO/%s'
% self.__class__.__name__)
self.container = AttribDict(
phi=0., # Look orientation counter-clockwise angle from east
theta=0., # Look elevation angle (up from horizontal in degree)
# 90 deg North
displacement=None, # Displacement towards LOS
frame=AttribDict(
llLon=None, # Lower left corner latitude
llLat=None, # Lower left corner londgitude
dN=None, # Pixel delta in north, meter or degree
dE=None, # Pixel delta in east, meter or degree
spacing='meter', # Pixel spacing unit
),
# Meta information
meta=AttribDict(
title=None,
orbital_node=None,
satellite_name=None,
wavelength=None,
time_master=None,
time_slave=None
),
# All extra information
extra={}
)
def read(self, filename, **kwargs):
""" Read function of the file format
:param filename: file to read
:type filename: string
:param kwargs: Keyword arguments
:type kwargs: {dict}
"""
raise NotImplementedError('read not implemented')
def write(self, filename, **kwargs):
""" Write method for IO
:param filename: file to write to
:type filename: string
:param **kwargs: Keyword arguments
:type **kwargs: {dict}
"""
raise NotImplementedError('write not implemented')
def validate(self, filename, **kwargs):
""" Validate file format
:param filename: file to validate
:type filename: string
:returns: Validation
:rtype: {bool}
"""
pass
raise NotImplementedError('validate not implemented')
[docs]class Matlab(SceneIO):
"""
Variable naming conventions for Matlab :file:`.mat` container:
================== ==================== ===================== =====
Property Matlab ``.mat`` name type unit
================== ==================== ===================== =====
Scene.displacement ``ig_`` n x m array [m]
Scene.phi ``phi`` float or n x m array [rad]
Scene.theta ``theta`` float or n x m array [rad]
Scene.frame.x ``xx`` n x 1 vector [m]
Scene.frame.y ``yy`` m x 1 vector [m]
Scene.utm_zone ``utm_zone`` str ('33T')
================== ==================== ===================== =====
Displacement is expected to be in meters. Note that the displacement maps
could also be pixel offset maps rather than unwrapped SAR interferograms.
For SAR azimuth pixel offset maps calculate ``phi`` from the heading
direction and set ``theta=0.``. For SAR range pixel offsets use the same
LOS angles as for InSAR.
"""
[docs] def validate(self, filename, **kwargs):
if filename[-4:] == '.mat':
return True
else:
return False
try:
variables = self.io.whosmat(filename)
if len(variables) > 50:
return False
return True
except ValueError:
return False
[docs] def read(self, filename, **kwargs):
c = self.container
mat = scipy.io.loadmat(filename)
utm_e = None
utm_n = None
utm_zone = None
utm_zone_letter = None
phi0 = None
theta0 = None
for mat_k, v in mat.items():
for io_k in c.keys():
if io_k in mat_k:
c[io_k] = num.rot90(mat[mat_k])
elif 'ig_' in mat_k:
c.displacement = num.rot90(mat[mat_k])
elif 'xx' in mat_k:
utm_e = mat[mat_k].flatten()
elif 'yy' in mat_k:
utm_n = mat[mat_k].flatten()
elif 'utm_zone' in mat_k:
utm_zone = int(mat['utm_zone'][0][:-1])
utm_zone_letter = str(mat['utm_zone'][0][-1])
elif 'phi' in mat_k:
phi0 = mat[mat_k].flatten()
elif 'theta' in mat_k:
theta0 = mat[mat_k].flatten()
if len(theta0) == 1:
c.theta = num.ones(num.shape(c.displacement)) * theta0
if len(theta0) == 1:
c.phi = num.ones(num.shape(c.displacement)) * phi0
if utm_zone is None:
utm_zone = 33
utm_zone_letter = 'N'
self._log.warning(
'Variable utm_zone not defined. Defaulting to UTM Zone %d%s!'
% (utm_zone, utm_zone_letter))
if not (num.all(utm_e) or num.all(utm_n)):
self._log.warning(
'Could not find referencing UTM vectors in .mat file!')
utm_e = num.linspace(100000, 110000, c.displacement.shape[0])
utm_n = num.linspace(1100000, 1110000, c.displacement.shape[1])
if utm_e.min() < 1e4 or utm_n.min() < 1e4:
utm_e *= km
utm_n *= km
c.frame.dE = num.abs(utm_e[1] - utm_e[0])
c.frame.dN = num.abs(utm_n[1] - utm_n[0])
try:
c.frame.llLat, c.frame.llLon =\
utm.to_latlon(utm_e.min(), utm_n.min(),
utm_zone, utm_zone_letter)
except utm.error.OutOfRangeError:
self._log.warning(
'Could not interpret spatial vectors,'
' referencing to 0, 0 (lat, lon)')
c.frame.llLat, c.frame.llLon = (0., 0.)
return c
[docs]class Gamma(SceneIO):
"""
Reading geocoded displacement maps (unwrapped igs) originating
from GAMMA software.
.. note :: Expects:
* [:file:`*`] Binary file from Gamma with displacement in radians
* [:file:`*.slc.par`] If you want to translate radians to
meters using the `radar_frequency`.
* [:file:`*par`] Parameter file, describing ``corner_lat, corner_lon,
nlines, width, post_lat, post_lon`` or ``post_north, post_east,
corner_east, corner_north, nlines, width``.
* [:file:`*theta*`, :file:`*phi*`] Two look vector files,
generated by GAMMA command ``look_vector``.
.. warning ::
* Data has to be georeferenced to latitude/longitude or UTM!
* Look vector files - expected to have a particular name
"""
@staticmethod
def _parseParameterFile(filename):
params = {}
rc = re.compile(r'^(\w*):\s*([a-zA-Z0-9+-.*]*\s[a-zA-Z0-9_]*).*')
with open(filename, mode='r') as par:
for line in par:
parsed = rc.match(line)
if parsed is None:
continue
groups = parsed.groups()
params[groups[0]] = safe_cast(groups[1], float,
default=groups[1].strip())
return params
def _getParameters(self, path, log=False):
required_utm = ['post_north', 'post_east', 'corner_east',
'corner_north', 'nlines', 'width']
required_lat_lon = ['corner_lat', 'corner_lon', 'nlines',
'width', 'post_lat', 'post_lon']
path = op.dirname(op.realpath(path))
par_files = glob.glob('%s/*par' % path)
for file in par_files:
params = self._parseParameterFile(file)
if check_required(required_utm, params)\
or check_required(required_lat_lon, params):
if not log:
self._log.info('Found parameter file %s' % file)
return params
raise ImportError(
'Parameter file does not hold required parameters')
def _getSLCParameters(self, path):
required_params = ['radar_frequency']
path = op.dirname(op.realpath(path))
par_files = glob.glob('%s/*slc.par' % path)
for file in par_files:
params = self._parseParameterFile(file)
if check_required(required_params, params):
self._log.info('Found SLC parameter file %s' % file)
return params
raise ImportError('Could not find SLC parameter file *.slc.par'
' with parameters %s' % required_params)
[docs] def validate(self, filename, **kwargs):
try:
par_file = kwargs.pop('par_file', filename)
self._getParameters(par_file)
return True
except ImportError:
return False
def _getLOSAngles(self, filename, pattern):
path = op.dirname(op.realpath(filename))
phi_files = glob.glob('%s/%s' % (path, pattern))
if len(phi_files) == 0:
self._log.warning('Could not find LOS file %s, '
'defaulting to angle to 0. [rad]' % pattern)
return 0.
elif len(phi_files) > 1:
self._log.warning('Found multiple LOS files %s, '
'defaulting to angle 0. [rad]' % pattern)
return 0.
filename = phi_files[0]
self._log.info('Loading LOS %s from %s' % (pattern, filename))
return num.memmap(filename, mode='r', dtype='>f4')
[docs] def read(self, filename, **kwargs):
"""
:param filename: Gamma software parameter file
:type filename: str
:param par_file: Corresponding parameter (:file:`*par`) file.
(optional)
:type par_file: str
:returns: Import dictionary
:rtype: dict
:raises: ImportError
"""
par_file = kwargs.pop('par_file', filename)
params = self._getParameters(par_file, log=True)
try:
params_slc = self._getSLCParameters(par_file)
except ImportError as e:
raise e
fill = None
ncols = int(params['width'])
nlines = int(params['nlines'])
radar_frequency = params_slc.get('radar_frequency', None)
displ = num.fromfile(filename, dtype='>f4')
# Resize array if last line is not scanned completely
if (displ.size % ncols) != 0:
fill = num.empty(ncols - displ.size % ncols)
fill.fill(num.nan)
displ = num.append(displ, fill)
displ = displ.reshape(nlines, ncols)
displ[displ == -0.] = num.nan
displ = num.flipud(displ)
if radar_frequency is not None:
radar_frequency = float(radar_frequency)
self._log.info('Scaling displacement by radar_frequency %f GHz'
% (radar_frequency/1e9))
wavelength = util.C / radar_frequency
displ /= -4*num.pi
displ *= wavelength
else:
wavelength = 'None'
self._log.warning(
'Could not determine radar_frequency from *.slc.par file!'
' Leaving displacement to radians.')
phi = self._getLOSAngles(filename, '*phi*')
theta = self._getLOSAngles(filename, '*theta*')
theta = theta
if isinstance(phi, num.ndarray):
phi = phi.reshape(nlines, ncols)
phi = num.flipud(phi)
if isinstance(theta, num.ndarray):
theta = theta.reshape(nlines, ncols)
theta = num.flipud(theta)
if fill is not None:
theta = num.append(theta, fill)
phi = num.append(phi, fill)
c = self.container
c.displacement = displ
c.theta = theta
c.phi = phi
c.meta.wavelength = wavelength
c.meta.title = params.get('title', 'None')
c.bin_file = filename
c.par_file = par_file
if params['DEM_projection'] == 'UTM':
utm_zone = params['projection_zone']
try:
utm_zone_letter = utm.latitude_to_zone_letter(
params['center_latitude'])
except ValueError:
self._log.warning('Could not parse UTM Zone letter,'
' defaulting to N!')
utm_zone_letter = 'N'
self._log.info('Using UTM reference: Zone %d%s'
% (utm_zone, utm_zone_letter))
dN = params['post_north']
dE = params['post_east']
utm_corn_e = params['corner_east']
utm_corn_n = params['corner_north']
utm_corn_eo = utm_corn_e + dE * displ.shape[1]
utm_corn_no = utm_corn_n + dN * displ.shape[0]
utm_e = num.linspace(utm_corn_e, utm_corn_eo, displ.shape[1])
utm_n = num.linspace(utm_corn_n, utm_corn_no, displ.shape[0])
llLat, llLon = utm.to_latlon(utm_e.min(), utm_n.min(),
utm_zone, utm_zone_letter)
c.frame.llLat = llLat
c.frame.llLon = llLon
c.frame.dE = abs(dE)
c.frame.dN = abs(dN)
else:
self._log.info('Using Lat/Lon reference')
c.frame.spacing = 'degree'
c.frame.llLat = params['corner_lat'] \
+ params['post_lat'] * nlines
c.frame.llLon = params['corner_lon']
c.frame.dE = abs(params['post_lon'])
c.frame.dN = abs(params['post_lat'])
return c
[docs]class ROI_PAC(SceneIO):
"""
.. note:: Expects:
* Binary file from ROI_PAC (:file:`*`)
* Parameter file (:file:`<binary_file>.rsc`),
describing ``WIDTH, FILE_LENGTH, X_FIRST, Y_FIRST, X_STEP,
Y_STEP, WAVELENGTH``
* If the georeferencing is in UTM coordinates, further needed
entries in parameter file are 'X_UNIT' and 'Y_UNIT' that give
'meters' and 'LAT_REF3' as well as 'LON_REF3'.
The unwrapped displacement is expected in radians and will be scaled
to meters by ``WAVELENGTH`` parsed from the :file:`*.rsc` file.
"""
[docs] def validate(self, filename, **kwargs):
try:
par_file = kwargs.pop('par_file',
self._getParameterFile(filename))
self._parseParameterFile(par_file)
return True
except ImportError:
return False
def _getParameterFile(self, bin_file):
par_file = op.realpath(bin_file) + '.rsc'
try:
self._parseParameterFile(par_file)
self._log.info('Found parameter file %s' % par_file)
return par_file
except (ImportError, IOError):
raise ImportError('Could not find ROI_PAC parameter file (%s)'
% par_file)
@staticmethod
def _parseParameterFile(par_file):
params = {}
required_L0 = ['WIDTH', 'FILE_LENGTH', 'X_FIRST', 'Y_FIRST', 'X_STEP',
'Y_STEP', 'WAVELENGTH']
required_utm = ['X_UNIT', 'LAT_REF1', 'LON_REF1']
rc = re.compile(r'([\w]*)\s*([\w.+-]*)')
with open(par_file, 'r') as par:
for line in par:
parsed = rc.match(line)
if parsed is None:
continue
groups = parsed.groups()
params[groups[0]] = safe_cast(groups[1], float,
default=groups[1].strip())
if check_required(required_L0, params):
if check_required(required_utm, params):
geo_ref = 'all'
return params, geo_ref
else:
geo_ref = 'latlon'
return params, geo_ref
raise ImportError(
'Parameter file %s does not hold the basic \
required parameters' % par_file)
[docs] def read(self, filename, **kwargs):
"""
:param filename: ROI_PAC binary file
:type filename: str
:param par_file: Corresponding parameter (:file:`*rsc`) file.
(optional)
:type par_file: str
:returns: Import dictionary
:rtype: dict
:raises: ImportError
"""
par_file = kwargs.pop('par_file', self._getParameterFile(filename))
par, geo_ref = self._parseParameterFile(par_file)
nlines = int(par['FILE_LENGTH'])
ncols = int(par['WIDTH'])
wavelength = par['WAVELENGTH']
heading = par['HEADING_DEG']
if geo_ref == 'latlon':
lat_ref = par['Y_FIRST']
lon_ref = par['X_FIRST']
elif geo_ref == 'all':
lat_ref = par['LAT_REF3']
lon_ref = par['LON_REF3']
look_ref1 = par['LOOK_REF1']
look_ref2 = par['LOOK_REF2']
look_ref3 = par['LOOK_REF3']
look_ref4 = par['LOOK_REF4']
utm_zone_letter = utm.latitude_to_zone_letter(lat_ref)
utm_zone = utm.latlon_to_zone_number(lat_ref, lon_ref)
look = num.mean(
num.array([look_ref1, look_ref2, look_ref3, look_ref4]))
data = num.memmap(filename, dtype='<f4')
data = data.reshape(nlines, ncols*2)
displ = data[:, ncols:]
displ = num.flipud(displ)
displ[displ == -0.] = num.nan
displ = displ / (4.*num.pi) * wavelength
z_scale = par.get('Z_SCALE', 1.)
z_offset = par.get('Z_OFFSET', 0.)
displ += z_offset
displ *= z_scale
c = self.container
c.displacement = displ
c.theta = num.deg2rad(90. - look)
c.phi = num.deg2rad(-heading + 180.)
c.meta.title = par.get('TITLE', 'None')
c.meta.wavelength = par['WAVELENGTH']
c.bin_file = filename
c.par_file = par_file
if geo_ref == 'all':
if par['X_UNIT'] == 'meters':
c.frame.spacing = 'meter'
c.frame.dE = par['X_STEP']
c.frame.dN = -par['Y_STEP']
geo_ref = 'utm'
elif par['X_UNIT'] == 'degree':
c.frame.spacing = 'degree'
geo_ref = 'latlon'
elif geo_ref == 'latlon':
self._log.info('Georeferencing is in Lat-Lon [degrees].')
c.frame.spacing = 'degree'
c.frame.llLat = par['Y_FIRST'] + par['Y_STEP'] * nlines
c.frame.llLon = par['X_FIRST']
# c_utm_0 = utm.from_latlon(lat_ref, lon_ref)
# c_utm_1 = utm.from_latlon(lat_ref + par['Y_STEP'],
# lon_ref + par['X_STEP'])
# c.frame.dE = c_utm_1[0] - c_utm_0[0]
# c.frame.dN = abs(c_utm_1[1] - c_utm_0[1])
c.frame.dE = par['X_STEP']
c.frame.dN = -par['Y_STEP']
elif geo_ref == 'utm':
self._log.info('Georeferencing is in UTM (zone %d%s)',
utm_zone, utm_zone_letter)
y_ll = par['Y_FIRST'] + par['Y_STEP'] * nlines
c.frame.llLat, c.frame.llLon = utm.to_latlon(
par['X_FIRST'], y_ll, utm_zone,
zone_letter=utm_zone_letter)
return self.container
class ISCEXMLParser(object):
def __init__(self, filename):
import xml.etree.ElementTree as ET
self.root = ET.parse(filename).getroot()
@staticmethod
def type_convert(value):
for t in (float, int, str):
try:
return t(value)
except ValueError:
continue
raise ValueError('Could not convert value')
def getProperty(self, name):
name = name.lower()
for child in self.root.iter():
child_name = child.get('name')
if isinstance(child_name, str):
child_name = child_name.lower()
if child_name == name.lower():
if child.tag == 'property':
return self.type_convert(child.find('value').text)
elif child.tag == 'component':
values = {}
for prop in child.iter('property'):
values[prop.get('name')] =\
self.type_convert(prop.find('value').text)
return values
return None
[docs]class ISCE(SceneIO):
"""
Reading geocoded, unwraped displacement maps
processed with ISCE software (https://winsar.unavco.org/isce.html).
.. note :: Expects:
* Unwrapped displacement binary (:file:`*.unw.geo`)
* Metadata XML (:file:`*.unw.geo.xml`)
* LOS binary data (:file:`*.rdr.geo`)
.. note ::
When using ``gdal_translate`` to crop the scene, use the argument
``-co SCHEME=BIL`` to make the output
.. note ::
Data are in radians but no transformation to
meters yet, as ``wavelength`` or at least sensor name is not
provided in the XML file.
"""
[docs] def validate(self, filename, **kwargs):
try:
self._getDisplacementFile(filename)
self._getLOSFile(filename)
return True
except ImportError:
return False
def _getLOSFile(self, path):
if not op.isdir(path):
path = op.dirname(path)
rdr_files = glob.glob(op.join(path, '*.rdr.geo'))
if len(rdr_files) == 0:
raise ImportError('Could not find LOS file (*.rdr.geo)')
rdr_file = rdr_files[0]
self._log.info('Found LOS file: %s', rdr_file)
return rdr_file
def _getDisplacementFile(self, path):
if op.isfile(path):
disp_file = path
else:
files = glob.glob(op.join(path, '*.unw.geo'))
if len(files) == 0:
raise ImportError('Could not find displacement file '
'(.unw.geo) at %s', path)
disp_file = files[0]
if not op.isfile('%s.xml' % disp_file):
raise ImportError('Could not find displacement XML file '
'(%s.unw.geo.xml)' % op.basename(disp_file))
self._log.info('Found Displacement file: %s', disp_file)
return disp_file
[docs] def read(self, path, **kwargs):
path = op.abspath(path)
c = self.container
xml_file = self._getDisplacementFile(path) + '.xml'
self._log.info('Parsing ISCE XML file %s' % xml_file)
isce_xml = ISCEXMLParser(xml_file)
coord_lon = isce_xml.getProperty('coordinate1')
coord_lat = isce_xml.getProperty('coordinate2')
c.frame.dN = num.abs(coord_lat['delta'])
c.frame.dE = num.abs(coord_lon['delta'])
nlon = int(coord_lon['size'])
nlat = int(coord_lat['size'])
c.frame.spacing = 'degree'
c.frame.llLat = coord_lat['startingvalue'] +\
(nlat * coord_lat['delta'])
c.frame.llLon = coord_lon['startingvalue']
displ = num.memmap(self._getDisplacementFile(path),
dtype='<f4')\
.reshape(nlat, nlon*2)[:, nlon:]
displ = num.flipud(displ)
displ[displ == 0.] = num.nan
c.displacement = displ
los_file = self._getLOSFile(path)
los_data = num.fromfile(los_file, dtype='<f4')\
.reshape(nlat*2, nlon)
theta = num.flipud(los_data[0::2, :])
phi = num.flipud(los_data[1::2, :])
def los_is_degree():
return num.abs(theta).max() > num.pi or num.abs(phi).max() > num.pi
if not los_is_degree():
raise ImportError(
'The LOS file (%s) seems to be in radians! '
'Change it to degree!' % op.basename(los_file))
phi[phi == 0.] = num.nan
theta[theta == 0.] = num.nan
phi *= d2r
theta *= d2r
phi = num.pi/2 + phi
theta = num.pi/2 - theta
c.phi = phi
c.theta = theta
return c
[docs]class GMTSAR(SceneIO):
"""
Reading GMTSAR grid files.
.. note ::
Expects:
* Displacement grid (NetCDF, :file:`*los_ll.grd`) in meter
(in case use "gmt grdmath los_cm_ll.grd 0.01 MUL = los_m_ll.grd')
* LOS binary data (see instruction, :file:`*.los.enu`)
Calculate the corresponding unit look vectors with GMT5SAR ``SAT_look``:
.. code-block:: sh
gmt grd2xyz los_ll.grd | gmt grdtrack -Gdem.grd | \\
awk {'print $1, $2, $4'} | \\
SAT_look 20050731.PRM -bos > 20050731.los.enu
"""
[docs] def validate(self, filename, **kwargs):
try:
if self._getDisplacementFile(filename)[-4:] == '.grd':
return True
except ImportError:
return False
return False
def _getLOSFile(self, path):
if not op.isdir(path):
path = op.dirname(path)
los_files = glob.glob(op.join(path, '*.los.*'))
if len(los_files) == 0:
self._log.warning(GMTSAR.__doc__)
raise ImportError('Could not find LOS file (*.los.*)')
los_file = los_files[0]
self._log.debug('Found LOS file: %s', los_file)
return los_file
def _getDisplacementFile(self, path):
if op.isfile(path):
return path
else:
files = glob.glob(op.join(path, '*.grd'))
if len(files) == 0:
raise ImportError('Could not find displacement file '
'(*.grd) at %s', path)
disp_file = files[0]
self._log.debug('Found Displacement file: %s', disp_file)
return disp_file
[docs] def read(self, path, **kwargs):
from scipy.io import netcdf
path = op.abspath(path)
c = self.container
grd = netcdf.netcdf_file(self._getDisplacementFile(path),
mode='r', version=2)
displ = grd.variables['z'][:].copy()
c.displacement = displ
shape = c.displacement.shape
# LatLon
c.frame.spacing = 'degree'
c.frame.llLat = grd.variables['lat'][:].min()
c.frame.llLon = grd.variables['lon'][:].min()
c.frame.dN = (grd.variables['lat'][:].max() -
c.frame.llLat) / shape[0]
c.frame.dE = (grd.variables['lon'][:].max() -
c.frame.llLon) / shape[1]
# Theta and Phi
try:
los = num.memmap(self._getLOSFile(path), dtype='<f4')
e = los[3::6].copy().reshape(shape)
n = los[4::6].copy().reshape(shape)
u = los[5::6].copy().reshape(shape)
phi = num.arctan(n/e)
theta = num.arcsin(u)
# phi[n < 0] += num.pi
c.phi = phi
c.theta = theta
except ImportError:
self._log.warning(self.__doc__)
self._log.warning('Defaulting theta to pi/2 and phi to 0. [rad]')
c.theta = num.pi/2
c.phi = 0.
return c
[docs]class SARscape(SceneIO):
"""
Reading SARscape :file:`*_disp` files.
.. note ::
Expects:
* Header file in :file:`*_disp.hdr`
* Displacement data in cm in :file:`*_disp`
* LOS data in :file:`*disp_ILOS` and :file:`*disp_ALOS` files.
"""
[docs] def read(self, filename, **kwargs):
header = self.parseHeaderFile(filename)
def load_data(filename):
self._log.debug('Loading %s' % filename)
return num.flipud(
num.fromfile(filename, dtype=num.float32)
.reshape((header.lines, header.samples)))
displacement = load_data(filename)
theta_file, phi_file = self.getLOSFiles(filename)
if not theta_file:
theta = num.full_like(displacement, 0.)
else:
theta = load_data(theta_file)
theta = num.deg2rad(theta)
if not phi_file:
phi = num.full_like(displacement, num.pi/2)
else:
phi = load_data(phi_file)
phi = num.pi/2 - num.rad2deg(phi)
c = self.container
c.displacement = displacement
c.phi = phi
c.theta = theta
map_info = header.map_info
c.frame.dE = float(map_info[5])
c.frame.dN = dN = float(map_info[6])
c.frame.spacing = 'meter'
c.frame.llLat, c.frame.llLon = utm.to_latlon(
float(map_info[3]) - header.lines * dN,
float(map_info[4]),
zone_number=int(map_info[7]),
northern=True if map_info[8] == 'Northern' else False)
return c
def parseHeaderFile(self, filename):
hdr_file = self._getHDRFile(filename)
conf = re.compile(r'^(.+)\s+=\s+(.+)\n', re.MULTILINE)
header = AttribDict()
with open(hdr_file) as f:
s = f.read()
linebreaks = re.compile(r'{(.+)\n?(.+)}')
s = linebreaks.sub(r'{ \g<1> \g<2> }', s)
for match in conf.finditer(s):
groups = match.groups()
key = groups[0].strip().replace(' ', '_')
value = groups[1].strip()
try:
value = int(value)
except ValueError:
pass
header[key] = value
header.map_info = header.map_info.strip('{} ').split(', ')
if not len(header.map_info) == 11:
raise HeaderError('`map info` header is not consistent!')
if header.map_info[0] != 'UTM':
raise HeaderError('`map info` is not UTM!')
return header
def getLOSFiles(self, filename):
ilos_file = op.abspath(filename + '_ILOS')
if not op.exists(ilos_file):
self._log.warning('Could not find ILOS file! (%s)' % ilos_file)
ilos_file = False
alos_file = op.abspath(filename + '_ALOS')
if not op.exists(alos_file):
self._log.warning('Could not find ALOS file! (%s)' % alos_file)
alos_file = False
return ilos_file, alos_file
def _getHDRFile(self, filename):
hdr_file = op.abspath(op.splitext(filename)[0] + '.hdr')
if not op.exists(hdr_file):
raise OSError('SARscape .hdr file not found (%s)' % hdr_file)
return hdr_file
[docs] def validate(self, filename, **kwargs):
val = re.compile(r'SARscape|ENVI Standard', re.MULTILINE)
try:
hdr_file = self._getHDRFile(filename)
except OSError:
return False
with open(hdr_file) as f:
res = val.search(f.read())
if res is not None:
return True
return False
[docs]class LiCSAR(SceneIO):
'''
Import unwrapped Geotiffs from the
`COMET LiCSAR Portal <https://comet.nerc.ac.uk/COMET-LiCS-portal/>`_.
.. note ::
Requires the Python package
`gdal/osgeo <https://pypi.org/project/GDAL/>`_! Or through
Expects:
* Unwrapped geotiff in :file:`*.unw.tif`
* LOS data in :file:`*.geo.[NEU].tif` files
See also the download script in :mod:`kite.clients`.
'''
def _getLOS(self, filename, component):
path = op.dirname(filename)
fn = glob.glob(op.join(path, component))
if len(fn) != 1:
raise ImportError('Cannot find LOS vector file %s!' % component)
dataset = gdal.Open(fn[0], gdal.GA_ReadOnly)
return self._readBandData(dataset)
@staticmethod
def _readBandData(dataset, band=1):
band = dataset.GetRasterBand(band)
array = band.ReadAsArray()
array[array == band.GetNoDataValue()] = num.nan
return num.flipud(array)
[docs] def read(self, filename, **kwargs):
dataset = gdal.Open(filename, gdal.GA_ReadOnly)
georef = dataset.GetGeoTransform()
llLon = georef[0]
llLat = georef[3] + dataset.RasterYSize * georef[5]
c = self.container
c.frame.spacing = 'degree'
c.frame.llLat = llLat
c.frame.llLon = llLon
c.frame.dE = georef[1]
c.frame.dN = abs(georef[5])
displacement = self._readBandData(dataset)
c.displacement = -displacement / (4*num.pi) * LAMBDA_SENTINEL
try:
los_n = self._getLOS(filename, '*.geo.N.tif')
los_e = self._getLOS(filename, '*.geo.E.tif')
los_u = self._getLOS(filename, '*.geo.U.tif')
except ImportError:
self._log.warning(
'Cannot find LOS angle files *.geo.[NEU].tif,'
' using static Sentinel-1 descending angles.')
heading = 83.
incident = 50.
un = num.sin(d2r*incident) * num.cos(d2r*heading)
ue = num.sin(d2r*incident) * num.sin(d2r*heading)
uz = num.cos(d2r*incident)
los_n = num.full_like(c.displacement, un)
los_e = num.full_like(c.displacement, ue)
los_u = num.full_like(c.displacement, uz)
c.phi = num.arctan2(los_n, los_e)
c.theta = num.arcsin(los_u)
c.meta.title = dataset.GetDescription()
return c
[docs] def validate(self, filename, **kwargs):
if gdal.IdentifyDriver(filename) is None:
return False
return True
[docs]class ARIA(SceneIO):
'''
Import unwrapped InSAR scenes from the
`NASA/JPL ARIA <https://aria.jpl.nasa.gov/>`_ GUNW data products.
.. note ::
Requires the Python package
`gdal/osgeo <https://pypi.org/project/GDAL/>`_! Or through
Expects:
* Extracted layers: unwrappedPhase, lookAngle, incidenceAngle,
connectedComponents
Use ``ariaExtract.py`` to extract the layers:
.. code-block:: sh
ariaExtract.py -w ascending -f aria-data.nc -d download \\
-l unwrappedPhase,incidenceAngle,lookAngle
'''
@staticmethod
def _readBandData(dataset, band=1):
band = dataset.GetRasterBand(band)
array = band.ReadAsArray()
if array.dtype != num.int16 and array.dtype != num.int:
array[array == band.GetNoDataValue()] = num.nan
return num.flipud(array)
@staticmethod
def _dataset_from_dir(folder):
files = set(f for f in os.scandir(folder) if f.is_file())
for f in files:
if op.splitext(f.name)[-1] == '':
break
else:
raise ImportError('could not load dataset from %s' % folder)
return gdal.Open(f.path, gdal.GA_ReadOnly)
[docs] def read(self, folder, **kwargs):
unw_phase = self._dataset_from_dir(op.join(folder, 'unwrappedPhase'))
georef = unw_phase.GetGeoTransform()
llLon = georef[0]
llLat = georef[3] + unw_phase.RasterYSize * georef[5]
c = self.container
c.frame.spacing = 'degree'
c.frame.llLat = llLat
c.frame.llLon = llLon
c.frame.dE = georef[1]
c.frame.dN = abs(georef[5])
conn_comp = self._dataset_from_dir(op.join(
folder, 'connectedComponents'))
displacement = self._readBandData(unw_phase)
conn_mask = self._readBandData(conn_comp) # Mask from snaphu
displacement *= num.where(conn_mask, 1., num.nan)
c.displacement = displacement / (4*num.pi) * LAMBDA_SENTINEL
inc_angle = self._dataset_from_dir(op.join(folder, 'incidenceAngle'))
azi_angle = self._dataset_from_dir(op.join(folder, 'azimuthAngle'))
c.theta = num.pi/2 - self._readBandData(inc_angle) * d2r
c.phi = self._readBandData(azi_angle) * d2r
c.meta.scene_id = op.basename(unw_phase.GetDescription())
c.meta.scene_title = c.meta.scene_id
t_slave, t_master = c.meta.scene_id.split('_')
c.meta.time_master = datetime(*time.strptime(t_master, '%Y%m%d')[:6]) \
.timestamp()
c.meta.time_slave = datetime(*time.strptime(t_slave, '%Y%m%d')[:6]) \
.timestamp()
c.meta.satellite_name = 'undefined (ARIA)'
return c
[docs] def validate(self, folder, **kwargs):
expected_dirs = set(
['unwrappedPhase', 'incidenceAngle', 'lookAngle',
'connectedComponents'])
if not op.isdir(folder):
return False
dirs = set(d.name for d in os.scandir(folder) if d.is_dir())
if not expected_dirs - dirs:
return True
return False