Source code for pyrocko.gui.talkie

# https://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

import difflib
from collections import defaultdict

from pyrocko import util
from pyrocko.guts import Object, List, clone, path_to_str
from weakref import ref


class listdict(dict):
    def __missing__(self, k):
        self[k] = []
        return self[k]


def root_and_path(obj, name=None):
    root = obj
    path = []
    if name is not None:
        path.append(name)

    while True:
        try:
            root, name_at_parent = root._talkie_parent
            path.append(name_at_parent)
        except AttributeError:
            break

    return root, '.'.join(path[::-1])


def lclone(xs):
    return [clone(x) for x in xs]


g_uid = 0


def new_uid():
    global g_uid
    g_uid += 1
    return '#%i' % g_uid


def has_computed(cls):
    cls._computed = defaultdict(list)
    cls._computed_rev = defaultdict(list)

    for prop_name in dir(cls):
        try:
            prop = getattr(cls, prop_name)
            depends_on = prop.fget._computed_depends_on
            for name in depends_on:
                cls._computed_rev[name].append(prop_name)
                cls._computed[prop_name].append(name)

        except AttributeError:
            pass

    return cls


def computed(depends_on):
    def wrapper(func):
        func._computed_depends_on = depends_on
        return property(func)

    return wrapper


[docs]class Talkie(Object): _computed = () _computed_rev = None def __setattr__(self, name, value): try: t = self.T.get_property(name) except ValueError: Object.__setattr__(self, name, value) return if isinstance(t, List.T): value = TalkieList(value) oldvalue = getattr(self, name, None) if oldvalue: if isinstance(oldvalue, (Talkie, TalkieList)): oldvalue.unset_parent() if isinstance(value, (Talkie, TalkieList)): value.set_parent(self, name) Object.__setattr__(self, name, value) self.fire([name], value) if self._computed_rev is not None: for dname in self._computed_rev[name]: self.fire([dname], None) def fire(self, path, value): self.fire_event(path, value) if hasattr(self, '_talkie_parent'): root, name_at_parent = self._talkie_parent path.append(name_at_parent) root.fire(path, value) def set_parent(self, parent, name): Object.__setattr__(self, '_talkie_parent', (parent, name)) def unset_parent(self): Object.__delattr__(self, '_talkie_parent') def fire_event(self, path, value): pass def diff(self, other, path=()): assert type(self) is type(other), '%s %s' % (type(self), type(other)) for (s_prop, s_val), (o_prop, o_val) in zip( self.T.ipropvals(self), other.T.ipropvals(other)): if not s_prop.multivalued: if isinstance(s_val, Talkie) \ and type(s_val) is type(o_val): for x in s_val.diff(o_val, path + (s_prop.name,)): yield x else: if not equal(s_val, o_val): yield 'set', path + (s_prop.name,), clone(o_val) else: if issubclass(s_prop.content_t._cls, Talkie): sm = difflib.SequenceMatcher( None, type_eq_proxy_seq(s_val), type_eq_proxy_seq(o_val)) mode = 1 else: sm = difflib.SequenceMatcher( None, eq_proxy_seq(s_val), eq_proxy_seq(o_val)) mode = 2 for tag, i1, i2, j1, j2 in list(sm.get_opcodes()): if tag == 'equal' and mode == 1: for koff, (s_element, o_element) in enumerate(zip( s_val[i1:i2], o_val[j1:j2])): for x in s_element.diff( o_element, path + ((s_prop.name, i1+koff),)): yield x if tag == 'replace': yield ( 'replace', path + ((s_prop.name, i1, i2),), lclone(o_val[j1:j2])) elif tag == 'delete': yield ( 'delete', path + ((s_prop.name, i1, i2),), None) elif tag == 'insert': yield ( 'insert', path + ((s_prop.name, i1, i2),), lclone(o_val[j1:j2])) def str_diff(self, other): lines = [] for tag, path, value in self.diff(other): lines.append('%s %s' % (tag, path_to_str(path))) return '\n'.join(lines) def diff_update(self, other, path=()): assert type(self) is type(other), '%s %s' % (type(self), type(other)) for (s_prop, s_val), (o_prop, o_val) in zip( self.T.ipropvals(self), other.T.ipropvals(other)): if not s_prop.multivalued: if isinstance(s_val, Talkie) \ and type(s_val) is type(o_val): s_val.diff_update(o_val, path + (s_prop.name,)) else: if not equal(s_val, o_val): setattr(self, s_prop.name, clone(o_val)) else: if issubclass(s_prop.content_t._cls, Talkie): sm = difflib.SequenceMatcher( None, type_eq_proxy_seq(s_val), type_eq_proxy_seq(o_val)) mode = 1 else: sm = difflib.SequenceMatcher( None, eq_proxy_seq(s_val), eq_proxy_seq(o_val)) mode = 2 ioff = 0 for tag, i1, i2, j1, j2 in list(sm.get_opcodes()): if tag == 'equal' and mode == 1: for koff, (s_element, o_element) in enumerate(zip( s_val[i1+ioff:i2+ioff], o_val[j1:j2])): s_element.diff_update( o_element, path + ((s_prop.name, i1+ioff+koff),)) elif tag == 'replace': for _ in range(i1, i2): s_val.pop(i1+ioff) for j in range(j1, j2): s_val.insert(i1+ioff, clone(o_val[j])) ioff += (j2 - j1) - (i2 - i1) elif tag == 'delete': for _ in range(i1, i2): s_val.pop(i1 + ioff) ioff -= (i2 - i1) elif tag == 'insert': for j in range(j1, j2): s_val.insert(i1+ioff, clone(o_val[j])) ioff += (j2 - j1)
def equal(a, b): return str(a) == str(b) # to be replaced by recursive guts.equal def ghash(obj): return hash(str(obj)) class GutsEqProxy(object): def __init__(self, obj): self._obj = obj def __eq__(self, other): return equal(self._obj, other._obj) def __hash__(self): return ghash(self._obj) class TypeEqProxy(object): def __init__(self, obj): self._obj = obj def __eq__(self, other): return type(self._obj) is type(other._obj) # noqa def __hash__(self): return hash(type(self._obj)) def eq_proxy_seq(seq): return list(GutsEqProxy(x) for x in seq) def type_eq_proxy_seq(seq): return list(TypeEqProxy(x) for x in seq) class TalkieConnection(object): def __init__(self, talkie_root, path, listener): self._talkie_root = talkie_root self._listener = listener self._path = path self._ref_listener = ref(listener) def release(self): self._talkie_root.disconnect(self)
[docs]class TalkieRoot(Talkie): def __init__(self, **kwargs): self._listeners = listdict() Talkie.__init__(self, **kwargs) def talkie_connect(self, path, listener): connection = TalkieConnection(self, path, listener) self._listeners[path].append(connection._ref_listener) return connection def talkie_disconnect(self, connection): try: self._listeners[connection._path].remove( connection._ref_listener) except ValueError: pass def fire_event(self, path, value): path = '.'.join(path[::-1]) # print('fire_event:', path, value) parts = path.split('.') for i in range(len(parts)+1): subpath = '.'.join(parts[:i]) target_refs = self._listeners[subpath] delete = [] for target_ref in target_refs: target = target_ref() if target: target(path, value) else: delete.append(target_ref) for target_ref in delete: target_refs.remove(target_ref) def get(self, path): x = self for s in path.split('.'): x = getattr(x, s) return x def set(self, path, value): x = self p = path.split('.') for s in p[:-1]: x = getattr(x, s) setattr(x, p[-1], value)
class TalkieList(list): def fire(self, path, value): if self._talkie_parent: root, name_at_parent = self._talkie_parent path.append(name_at_parent) root.fire(path, value) def set_parent(self, parent, name): list.__setattr__(self, '_talkie_parent', (parent, name)) def unset_parent(self): list.__delattr__(self, '_talkie_parent') def append(self, element): retval = list.append(self, element) name = new_uid() if isinstance(element, (Talkie, TalkieList)): element.set_parent(self, name) self.fire([], self) return retval def insert(self, index, element): retval = list.insert(self, index, element) name = new_uid() if isinstance(element, (Talkie, TalkieList)): element.set_parent(self, name) self.fire([], self) return retval def remove(self, element): list.remove(self, element) if isinstance(element, (Talkie, TalkieList)): element.unset_parent() self.fire([], self) def pop(self, index=-1): element = list.pop(self, index) if isinstance(element, (Talkie, TalkieList)): element.unset_parent() self.fire([], self) return element def extend(self, elements): for element in elements: self.append(element) self.fire([], self) def __setitem__(self, key, value): try: element = self[key] if isinstance(element, (Talkie, TalkieList)): element.unset_parent() except IndexError: pass list.__setitem__(self, key, value) self.fire([], self) def __setslice__(self, *args, **kwargs): raise Exception('not implemented') def __iadd__(self, *args, **kwargs): raise Exception('not implemented') def __imul__(self, *args, **kwargs): raise Exception('not implemented') for method_name in ['reverse', 'sort']: def x(): list_meth = getattr(list, method_name) def meth(self, *args, **kwargs): retval = list_meth(self, *args, **kwargs) self.fire([], self) return retval return meth try: setattr(TalkieList, method_name, x()) except AttributeError: pass def drop_args_wrapper(f): def f_drop_args(*args): f() return f_drop_args class TalkieConnectionOwner(object): def __init__(self): self._connections = [] def talkie_connect(self, state, path, listener, drop_args=False): if drop_args: listener = drop_args_wrapper(listener) if not isinstance(path, str): return [ self.talkie_connect(state, path_, listener, False) for path_ in path] connection = state.talkie_connect(path, listener) self._connections.append(connection) return connection def talkie_disconnect_all(self): while self._connections: try: self._connections.pop().release() except Exception: pass def none_or(f): def g(x): if x is None: return '' else: return f(x) return g def noop(x): return x class TalkieStringer: def __init__(self, root): self._root = root self._formatters = { 'date': none_or(lambda v: util.time_to_str(v, format='%Y-%m-%d')), 'datetime': none_or(lambda v: util.time_to_str(v))} self._paths = [] def __getitem__(self, key): stringer = self class Proxy: def __init__(self, val, formatter, path, talkie_root): self._formatter = formatter self._val = val self._path = path self._talkie_root = talkie_root def register_path(self): stringer._paths.append( (self._talkie_root, '.'.join(self._path))) def _computed(self): return getattr(self._val, '_computed', ()) def __getattr__(self, key): if key in self._val.T.propnames \ or key in self._computed(): self._path.append(key) val = getattr(self._val, key) pval = self._proxy(val) if not isinstance(pval, (Proxy, ListProxy)): self.register_path() return pval else: return '{' + key + '}' def __format__(self, *args, **kwargs): self.register_path() return self._val.__format__(*args, **kwargs) def _proxy(self, val): if isinstance(val, TalkieRoot): return Proxy( val, self._formatter, [], val) elif isinstance(val, Object): return Proxy( val, self._formatter, self._path, self._talkie_root) elif isinstance(val, list): return ListProxy( val, self._formatter, self._path, self._talkie_root) else: return self._formatter(val) class ListProxy(Proxy): def __getitem__(self, i): self._path[-1] += '[%i]' % i return self._proxy(self._val[i]) key = key.split('|', 1) if len(key) == 2: key, formatter = key[0], self._formatters[key[1]] else: key, formatter = key[0], noop return getattr(Proxy(self._root, formatter, [], self._root), key) def get_paths(self): return self._paths