from collections import Iterable from copy import deepcopy from .DataReader import DataReader class SlidingWindow: def __init__(self, model_past_values=5, model_past_fields=None, model_future_values=0, model_future_fields=None, past_values=0, past_fields=None, future_values=0, future_fields=None, plot_fields=None, **kwargs): self.model_past_values = model_past_values if model_past_fields is None: model_past_fields = ['temp_in', 'temp_out', 'mode'] self.model_past_fields = model_past_fields self.past_values = past_values if past_fields is None: past_fields = ['temp_in', 'temp_out'] self.past_fields = past_fields self.max_past = max(self.model_past_values, self.past_values, 2) self.model_future_values = model_future_values if model_future_fields is None: model_future_fields = [] self.model_future_fields = model_future_fields self.future_values = future_values if future_fields is None: future_fields = model_future_fields self.future_fields = future_fields self.max_future = max(self.model_future_values, self.future_values, 1) self.observations = [] # list of dict self.plot = None if plot_fields: self.plot = {} for field in plot_fields: self.plot[field] = [] def add_observation(self, observation: dict) -> bool: self.observations = self.observations[-self.max_past - self.max_future:] self.observations.append(observation) if self.plot: for field in self.plot: if field.endswith('_calc'): self.plot[field].append(observation[field[:-5]]) else: self.plot[field].append(observation[field]) return len(self.observations) > self.max_past + self.max_future # SLIDING def next(self): self.max_past += 1 self.max_future -= 1 def copy(self, plot=False): result = SlidingWindow(model_past_values=self.model_past_values, model_past_fields=self.model_past_fields, model_future_values=self.model_future_values, model_future_fields=self.model_future_fields, past_values=self.past_values, past_fields=self.past_fields, future_values=self.future_values, future_fields=self.future_fields) result.observations = deepcopy(self.observations) if plot: result.plot = deepcopy(self.plot) return result # GET VALUE def get_value(self, field, offset=-1): return self.observations[self.max_past + offset][field] def get_previous_value(self, field): return self.get_value(field, -2) def get_current_value(self, field): return self.get_value(field, -1) def get_current_orig_value(self, field): offset = -1 return self.plot[field][-self.max_future - 1 + offset] def get_next_value(self, field): return self.get_value(field, 0) # SET VALUE def set_value(self, field, value, offset=-1): self.observations[self.max_past + offset][field] = value if self.plot and field + '_calc' in self.plot: self.plot[field + '_calc'][-self.max_future - 1 + offset] = value def set_current_value(self, field, value): self.set_value(field, value, -1) def set_next_value(self, field, value): self.set_value(field, value, 0) # PAST/FUTURE HELPERS def _get_past_values(self, num=None, fields=None, offset=0) -> list: if num is None: num = self.past_values if fields is None: fields = self.past_fields result = [ self.observations[i + offset][j] for j in fields for i in range(self.max_past - num, self.max_past) ] return result def _get_future_values(self, num=None, fields=None, offset=0) -> list: if num is None: num = self.future_values if fields is None: fields = self.future_fields return [ self.observations[i + offset][j] for j in fields for i in range(self.max_past, self.max_past + num) ] # MODEL def get_model_values(self) -> list: return self._get_past_values(num=self.model_past_values, fields=self.model_past_fields) \ + self._get_future_values(num=self.model_future_values, fields=self.model_future_fields) def get_model_size(self) -> int: return self.model_past_values * len(self.model_past_fields) \ + self.model_future_values * len(self.model_future_fields) def get_model_target(self): return self.get_next_value('temp_in') # WINDOW def get_window_values(self, offset=0): return self._get_past_values(num=self.past_values, fields=self.past_fields, offset=offset) \ + self._get_future_values(num=self.future_values, fields=self.future_fields, offset=offset) def get_window_size(self): return self.past_values * len(self.past_fields) + self.future_values * len(self.future_fields) # PLOT def get_plot_data(self, field): return self.plot[field][:-self.max_future-1] class SlidingWindowAdapter(Iterable): def __init__(self, reader: DataReader, **kwargs): self.reader = reader self.aggregate = SlidingWindow(**kwargs) def __iter__(self): return self def __next__(self) -> SlidingWindow: line = self.reader.pop() while not self.aggregate.add_observation(line): line = self.reader.pop() return self.aggregate