Heating controller with neural thermal model written in Python
Jacek Kowalski
2018-06-24 425bf71fc0b24b547006686d83404c54b983de0b
commit | author | age
425bf7 1 from collections import Iterable
JK 2 from copy import deepcopy
3
4 from .DataReader import DataReader
5
6
7 class SlidingWindow:
8     def __init__(self,
9                  model_past_values=5, model_past_fields=None,
10                  model_future_values=0, model_future_fields=None,
11                  past_values=0, past_fields=None,
12                  future_values=0, future_fields=None,
13                  plot_fields=None, **kwargs):
14
15         self.model_past_values = model_past_values
16         if model_past_fields is None:
17             model_past_fields = ['temp_in', 'temp_out', 'mode']
18         self.model_past_fields = model_past_fields
19
20         self.past_values = past_values
21         if past_fields is None:
22             past_fields = ['temp_in', 'temp_out']
23         self.past_fields = past_fields
24
25         self.max_past = max(self.model_past_values, self.past_values, 2)
26
27         self.model_future_values = model_future_values
28         if model_future_fields is None:
29             model_future_fields = []
30         self.model_future_fields = model_future_fields
31
32         self.future_values = future_values
33         if future_fields is None:
34             future_fields = model_future_fields
35         self.future_fields = future_fields
36
37         self.max_future = max(self.model_future_values, self.future_values, 1)
38
39         self.observations = []  # list of dict
40
41         self.plot = None
42         if plot_fields:
43             self.plot = {}
44             for field in plot_fields:
45                 self.plot[field] = []
46
47     def add_observation(self, observation: dict) -> bool:
48         self.observations = self.observations[-self.max_past - self.max_future:]
49         self.observations.append(observation)
50
51         if self.plot:
52             for field in self.plot:
53                 if field.endswith('_calc'):
54                     self.plot[field].append(observation[field[:-5]])
55                 else:
56                     self.plot[field].append(observation[field])
57
58         return len(self.observations) > self.max_past + self.max_future
59
60     # SLIDING
61
62     def next(self):
63         self.max_past += 1
64         self.max_future -= 1
65
66     def copy(self, plot=False):
67         result = SlidingWindow(model_past_values=self.model_past_values,
68                                model_past_fields=self.model_past_fields,
69                                model_future_values=self.model_future_values,
70                                model_future_fields=self.model_future_fields,
71                                past_values=self.past_values, past_fields=self.past_fields,
72                                future_values=self.future_values, future_fields=self.future_fields)
73         result.observations = deepcopy(self.observations)
74         if plot:
75             result.plot = deepcopy(self.plot)
76         return result
77
78     # GET VALUE
79
80     def get_value(self, field, offset=-1):
81         return self.observations[self.max_past + offset][field]
82
83     def get_previous_value(self, field):
84         return self.get_value(field, -2)
85
86     def get_current_value(self, field):
87         return self.get_value(field, -1)
88
89     def get_current_orig_value(self, field):
90         offset = -1
91         return self.plot[field][-self.max_future - 1 + offset]
92
93     def get_next_value(self, field):
94         return self.get_value(field, 0)
95
96     # SET VALUE
97
98     def set_value(self, field, value, offset=-1):
99         self.observations[self.max_past + offset][field] = value
100         if self.plot and field + '_calc' in self.plot:
101             self.plot[field + '_calc'][-self.max_future - 1 + offset] = value
102
103     def set_current_value(self, field, value):
104         self.set_value(field, value, -1)
105
106     def set_next_value(self, field, value):
107         self.set_value(field, value, 0)
108
109     # PAST/FUTURE HELPERS
110
111     def _get_past_values(self, num=None, fields=None, offset=0) -> list:
112         if num is None:
113             num = self.past_values
114         if fields is None:
115             fields = self.past_fields
116         result = [
117             self.observations[i + offset][j]
118             for j in fields
119             for i in range(self.max_past - num, self.max_past)
120         ]
121         return result
122
123     def _get_future_values(self, num=None, fields=None, offset=0) -> list:
124         if num is None:
125             num = self.future_values
126         if fields is None:
127             fields = self.future_fields
128         return [
129             self.observations[i + offset][j]
130             for j in fields
131             for i in range(self.max_past, self.max_past + num)
132         ]
133
134     # MODEL
135
136     def get_model_values(self) -> list:
137         return self._get_past_values(num=self.model_past_values, fields=self.model_past_fields) \
138                + self._get_future_values(num=self.model_future_values, fields=self.model_future_fields)
139
140     def get_model_size(self) -> int:
141         return self.model_past_values * len(self.model_past_fields) \
142                + self.model_future_values * len(self.model_future_fields)
143
144     def get_model_target(self):
145         return self.get_next_value('temp_in')
146
147     # WINDOW
148
149     def get_window_values(self, offset=0):
150         return self._get_past_values(num=self.past_values, fields=self.past_fields, offset=offset) \
151                + self._get_future_values(num=self.future_values, fields=self.future_fields, offset=offset)
152
153     def get_window_size(self):
154         return self.past_values * len(self.past_fields) + self.future_values * len(self.future_fields)
155
156     # PLOT
157
158     def get_plot_data(self, field):
159         return self.plot[field][:-self.max_future-1]
160
161
162 class SlidingWindowAdapter(Iterable):
163     def __init__(self, reader: DataReader, **kwargs):
164         self.reader = reader
165         self.aggregate = SlidingWindow(**kwargs)
166
167     def __iter__(self):
168         return self
169
170     def __next__(self) -> SlidingWindow:
171         line = self.reader.pop()
172         while not self.aggregate.add_observation(line):
173             line = self.reader.pop()
174         return self.aggregate