Heating controller with neural thermal model written in Python
Jacek Kowalski
2018-06-24 66a9fb40efe1311b34a3cee3f83f10c6990759af
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from collections import Iterable
from copy import deepcopy
 
from .DataReader import DataReader
 
 
class SlidingWindow:
    def __init__(self,
                 model_past_values=5, model_past_fields=None,
                 model_future_values=0, model_future_fields=None,
                 past_values=0, past_fields=None,
                 future_values=0, future_fields=None,
                 plot_fields=None, **kwargs):
 
        self.model_past_values = model_past_values
        if model_past_fields is None:
            model_past_fields = ['temp_in', 'temp_out', 'mode']
        self.model_past_fields = model_past_fields
 
        self.past_values = past_values
        if past_fields is None:
            past_fields = ['temp_in', 'temp_out']
        self.past_fields = past_fields
 
        self.max_past = max(self.model_past_values, self.past_values, 2)
 
        self.model_future_values = model_future_values
        if model_future_fields is None:
            model_future_fields = []
        self.model_future_fields = model_future_fields
 
        self.future_values = future_values
        if future_fields is None:
            future_fields = model_future_fields
        self.future_fields = future_fields
 
        self.max_future = max(self.model_future_values, self.future_values, 1)
 
        self.observations = []  # list of dict
 
        self.plot = None
        if plot_fields:
            self.plot = {}
            for field in plot_fields:
                self.plot[field] = []
 
    def add_observation(self, observation: dict) -> bool:
        self.observations = self.observations[-self.max_past - self.max_future:]
        self.observations.append(observation)
 
        if self.plot:
            for field in self.plot:
                if field.endswith('_calc'):
                    self.plot[field].append(observation[field[:-5]])
                else:
                    self.plot[field].append(observation[field])
 
        return len(self.observations) > self.max_past + self.max_future
 
    # SLIDING
 
    def next(self):
        self.max_past += 1
        self.max_future -= 1
 
    def copy(self, plot=False):
        result = SlidingWindow(model_past_values=self.model_past_values,
                               model_past_fields=self.model_past_fields,
                               model_future_values=self.model_future_values,
                               model_future_fields=self.model_future_fields,
                               past_values=self.past_values, past_fields=self.past_fields,
                               future_values=self.future_values, future_fields=self.future_fields)
        result.observations = deepcopy(self.observations)
        if plot:
            result.plot = deepcopy(self.plot)
        return result
 
    # GET VALUE
 
    def get_value(self, field, offset=-1):
        return self.observations[self.max_past + offset][field]
 
    def get_previous_value(self, field):
        return self.get_value(field, -2)
 
    def get_current_value(self, field):
        return self.get_value(field, -1)
 
    def get_current_orig_value(self, field):
        offset = -1
        return self.plot[field][-self.max_future - 1 + offset]
 
    def get_next_value(self, field):
        return self.get_value(field, 0)
 
    # SET VALUE
 
    def set_value(self, field, value, offset=-1):
        self.observations[self.max_past + offset][field] = value
        if self.plot and field + '_calc' in self.plot:
            self.plot[field + '_calc'][-self.max_future - 1 + offset] = value
 
    def set_current_value(self, field, value):
        self.set_value(field, value, -1)
 
    def set_next_value(self, field, value):
        self.set_value(field, value, 0)
 
    # PAST/FUTURE HELPERS
 
    def _get_past_values(self, num=None, fields=None, offset=0) -> list:
        if num is None:
            num = self.past_values
        if fields is None:
            fields = self.past_fields
        result = [
            self.observations[i + offset][j]
            for j in fields
            for i in range(self.max_past - num, self.max_past)
        ]
        return result
 
    def _get_future_values(self, num=None, fields=None, offset=0) -> list:
        if num is None:
            num = self.future_values
        if fields is None:
            fields = self.future_fields
        return [
            self.observations[i + offset][j]
            for j in fields
            for i in range(self.max_past, self.max_past + num)
        ]
 
    # MODEL
 
    def get_model_values(self) -> list:
        return self._get_past_values(num=self.model_past_values, fields=self.model_past_fields) \
               + self._get_future_values(num=self.model_future_values, fields=self.model_future_fields)
 
    def get_model_size(self) -> int:
        return self.model_past_values * len(self.model_past_fields) \
               + self.model_future_values * len(self.model_future_fields)
 
    def get_model_target(self):
        return self.get_next_value('temp_in')
 
    # WINDOW
 
    def get_window_values(self, offset=0):
        return self._get_past_values(num=self.past_values, fields=self.past_fields, offset=offset) \
               + self._get_future_values(num=self.future_values, fields=self.future_fields, offset=offset)
 
    def get_window_size(self):
        return self.past_values * len(self.past_fields) + self.future_values * len(self.future_fields)
 
    # PLOT
 
    def get_plot_data(self, field):
        return self.plot[field][:-self.max_future-1]
 
 
class SlidingWindowAdapter(Iterable):
    def __init__(self, reader: DataReader, **kwargs):
        self.reader = reader
        self.aggregate = SlidingWindow(**kwargs)
 
    def __iter__(self):
        return self
 
    def __next__(self) -> SlidingWindow:
        line = self.reader.pop()
        while not self.aggregate.add_observation(line):
            line = self.reader.pop()
        return self.aggregate