Source code for tendril.utils.types.signalbase

# Copyright (C) 2015 Chintalagiri Shashank
#
# This file is part of Tendril.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from tendril.utils.types.time import timestamp_factory
from tendril.utils.types.time import TimeStamp
from tendril.utils.types.time import TimeSpan
from tendril.utils.types.time import TimeDelta

from tendril.utils.types.unitbase import Percentage

from collections import deque
import copy


[docs]class SignalBase(object): def __init__(self, unitclass): self._unitclass = unitclass @property def unitclass(self): return self._unitclass
[docs]class SignalErrorBar(object): def __init__(self, quantization=None, noise_floor=None, error_pc=None): self.quantization = quantization self.noise_floor = noise_floor self.error_pc = error_pc
[docs]class SignalPoint(SignalBase): def __init__(self, unitclass, value, ts=None): super(SignalPoint, self).__init__(unitclass) if not isinstance(value, unitclass): self.value = unitclass(value) else: self.value = value if ts is None: self.timestamp = timestamp_factory.now() else: self.timestamp = ts self._error_bar = None @property def error_bar(self): return self._error_bar @error_bar.setter def error_bar(self, value): self._error_bar = value def __repr__(self): return "<SignalPoint at " + repr(self.timestamp) + \ " :: " + repr(self.value) + " >"
[docs]class SignalWave(SignalBase): def __init__(self, unitclass, points=None, spacing=None, ts0=None, interpolation="piecewise_linear", buffer_size=None, use_point_ts=True, stabilization_length=5, stabilization_pc='1pc'): super(SignalWave, self).__init__(unitclass) self._stabilization_length = stabilization_length if isinstance(stabilization_pc, Percentage): self._stabilization_pc = stabilization_pc else: self._stabilization_pc = Percentage(stabilization_pc) self._buffer_size = buffer_size self._use_point_ts = use_point_ts if spacing is not None: if isinstance(spacing, TimeSpan): self._spacing = spacing.timedelta elif isinstance(spacing, TimeDelta): self._spacing = spacing else: raise TypeError("spacing must be an instance of TimeDelta or " "TimeSpan. Got " + repr(spacing)) if ts0 is not None: if not isinstance(ts0, TimeStamp): raise TypeError("ts0 must be an instance of TimeStamp") self._ts0 = ts0 else: self._ts0 = timestamp_factory.now() if points and isinstance(points, deque): if isinstance(points[0], tuple): self._points = points else: if not self._use_point_ts: span_spacing = self._spacing.timespan self._points = deque( [(ts0 + (span_spacing * idx).timedelta, point) for idx, point in enumerate(points)], maxlen=buffer_size ) else: self._points = deque([(point.timestamp, point) for point in points], maxlen=buffer_size) else: self._points = deque(maxlen=buffer_size) self._interpolation = interpolation @property def last_timestamp(self): if len(self._points) == 0: if self._ts0 is not None: return self._ts0 - self._spacing else: return None return self._points[-1][0] @property def stabilization_length(self): return self._stabilization_length @property def is_stable(self): lval = self._points[-1][1].value for i in range(self.stabilization_length): if abs(self._points[-1-i][1].value - lval) \ > abs(lval * self._stabilization_pc): return False return True @property def stabilized_value(self): acc = 0 for i in range(self.stabilization_length): acc += self._points[-1-i][1].value return acc / self.stabilization_length @property def ts0(self): return self._ts0 @property def max(self): return max(self._points, key=lambda x: x[1].value)[1].value @property def min(self): return min(self._points, key=lambda x: x[1].value)[1].value @property def spread(self): return self.max - self.min @property def mean(self): if self.unitclass == int: return int(round(float(sum([x[1].value for x in self._points])) / len(self._points))) return sum([x[1].value for x in self._points]) / len(self._points) @property def latest_point(self): return self._points[-1][1].value @property def points(self): return sorted(self._points, key=lambda x: x[0]) @property def spacing(self): return TimeSpan(self._spacing) @property def maxlen(self): return self._points.maxlen @maxlen.setter def maxlen(self, value): self._points = deque(self._points, maxlen=value)
[docs] def add_point(self, point, ts=None): if point.unitclass != self.unitclass: raise TypeError if ts is None: if self._use_point_ts is False: if self.last_timestamp is None: self._ts0 = timestamp_factory.now() ts = self.last_timestamp + self._spacing elif point.timestamp is not None: if self.last_timestamp is None: self._ts0 = point.timestamp ts = point.timestamp if ts is None: raise ValueError self._points.append((ts, point))
def __add__(self, other): if isinstance(other, SignalWave): if self.unitclass != other.unitclass: raise TypeError rval = copy.copy(self) rval.extend(other._points) return rval elif isinstance(other, SignalPoint): self.add_point(other) return self else: raise NotImplementedError def __radd__(self, other): if isinstance(other, SignalWave): if self.unitclass != other.unitclass: raise TypeError return self._points.extendleft(reversed(other._points)) elif isinstance(other, SignalPoint): if self.unitclass != other.unitclass: raise TypeError return self._points.appendleft(other) else: raise NotImplementedError def __len__(self): return len(self._points) def __getitem__(self, item): return self._points.__getitem__(item) def __setitem__(self, key, value): return self._points.__setitem__(key, value) def __delitem__(self, key): return self._points.__delitem__(key) def __iter__(self): return self._points.__iter__() def __reversed__(self): return self._points.__reversed__() def __contains__(self, item): return self._points.__contains__(item)
[docs] def clear(self): self._ts0 = self.last_timestamp self._points.clear()
[docs] def popleft(self, *args, **kwargs): return self._points.popleft(*args, **kwargs)
[docs] def pop(self, *args, **kwargs): return self._points.pop(*args, **kwargs)
[docs] def append(self, *args, **kwargs): return self._points.append(*args, **kwargs)
[docs] def appendleft(self, *args, **kwargs): return self._points.appendleft(*args, **kwargs)
[docs] def extend(self, *args, **kwargs): return self._points.extend(*args, **kwargs)
[docs] def extendleft(self, *args, **kwargs): return self._points.extendleft(*args, **kwargs)