from collections import Iterable
|
from copy import deepcopy
|
|
from .DataReader import DataReader
|
|
|
class SlidingWindow:
|
def __init__(self,
|
model_past_values=5, model_past_fields=None,
|
model_future_values=0, model_future_fields=None,
|
past_values=0, past_fields=None,
|
future_values=0, future_fields=None,
|
plot_fields=None, **kwargs):
|
|
self.model_past_values = model_past_values
|
if model_past_fields is None:
|
model_past_fields = ['temp_in', 'temp_out', 'mode']
|
self.model_past_fields = model_past_fields
|
|
self.past_values = past_values
|
if past_fields is None:
|
past_fields = ['temp_in', 'temp_out']
|
self.past_fields = past_fields
|
|
self.max_past = max(self.model_past_values, self.past_values, 2)
|
|
self.model_future_values = model_future_values
|
if model_future_fields is None:
|
model_future_fields = []
|
self.model_future_fields = model_future_fields
|
|
self.future_values = future_values
|
if future_fields is None:
|
future_fields = model_future_fields
|
self.future_fields = future_fields
|
|
self.max_future = max(self.model_future_values, self.future_values, 1)
|
|
self.observations = [] # list of dict
|
|
self.plot = None
|
if plot_fields:
|
self.plot = {}
|
for field in plot_fields:
|
self.plot[field] = []
|
|
def add_observation(self, observation: dict) -> bool:
|
self.observations = self.observations[-self.max_past - self.max_future:]
|
self.observations.append(observation)
|
|
if self.plot:
|
for field in self.plot:
|
if field.endswith('_calc'):
|
self.plot[field].append(observation[field[:-5]])
|
else:
|
self.plot[field].append(observation[field])
|
|
return len(self.observations) > self.max_past + self.max_future
|
|
# SLIDING
|
|
def next(self):
|
self.max_past += 1
|
self.max_future -= 1
|
|
def copy(self, plot=False):
|
result = SlidingWindow(model_past_values=self.model_past_values,
|
model_past_fields=self.model_past_fields,
|
model_future_values=self.model_future_values,
|
model_future_fields=self.model_future_fields,
|
past_values=self.past_values, past_fields=self.past_fields,
|
future_values=self.future_values, future_fields=self.future_fields)
|
result.observations = deepcopy(self.observations)
|
if plot:
|
result.plot = deepcopy(self.plot)
|
return result
|
|
# GET VALUE
|
|
def get_value(self, field, offset=-1):
|
return self.observations[self.max_past + offset][field]
|
|
def get_previous_value(self, field):
|
return self.get_value(field, -2)
|
|
def get_current_value(self, field):
|
return self.get_value(field, -1)
|
|
def get_current_orig_value(self, field):
|
offset = -1
|
return self.plot[field][-self.max_future - 1 + offset]
|
|
def get_next_value(self, field):
|
return self.get_value(field, 0)
|
|
# SET VALUE
|
|
def set_value(self, field, value, offset=-1):
|
self.observations[self.max_past + offset][field] = value
|
if self.plot and field + '_calc' in self.plot:
|
self.plot[field + '_calc'][-self.max_future - 1 + offset] = value
|
|
def set_current_value(self, field, value):
|
self.set_value(field, value, -1)
|
|
def set_next_value(self, field, value):
|
self.set_value(field, value, 0)
|
|
# PAST/FUTURE HELPERS
|
|
def _get_past_values(self, num=None, fields=None, offset=0) -> list:
|
if num is None:
|
num = self.past_values
|
if fields is None:
|
fields = self.past_fields
|
result = [
|
self.observations[i + offset][j]
|
for j in fields
|
for i in range(self.max_past - num, self.max_past)
|
]
|
return result
|
|
def _get_future_values(self, num=None, fields=None, offset=0) -> list:
|
if num is None:
|
num = self.future_values
|
if fields is None:
|
fields = self.future_fields
|
return [
|
self.observations[i + offset][j]
|
for j in fields
|
for i in range(self.max_past, self.max_past + num)
|
]
|
|
# MODEL
|
|
def get_model_values(self) -> list:
|
return self._get_past_values(num=self.model_past_values, fields=self.model_past_fields) \
|
+ self._get_future_values(num=self.model_future_values, fields=self.model_future_fields)
|
|
def get_model_size(self) -> int:
|
return self.model_past_values * len(self.model_past_fields) \
|
+ self.model_future_values * len(self.model_future_fields)
|
|
def get_model_target(self):
|
return self.get_next_value('temp_in')
|
|
# WINDOW
|
|
def get_window_values(self, offset=0):
|
return self._get_past_values(num=self.past_values, fields=self.past_fields, offset=offset) \
|
+ self._get_future_values(num=self.future_values, fields=self.future_fields, offset=offset)
|
|
def get_window_size(self):
|
return self.past_values * len(self.past_fields) + self.future_values * len(self.future_fields)
|
|
# PLOT
|
|
def get_plot_data(self, field):
|
return self.plot[field][:-self.max_future-1]
|
|
|
class SlidingWindowAdapter(Iterable):
|
def __init__(self, reader: DataReader, **kwargs):
|
self.reader = reader
|
self.aggregate = SlidingWindow(**kwargs)
|
|
def __iter__(self):
|
return self
|
|
def __next__(self) -> SlidingWindow:
|
line = self.reader.pop()
|
while not self.aggregate.add_observation(line):
|
line = self.reader.pop()
|
return self.aggregate
|