Heating controller with neural thermal model written in Python
Jacek Kowalski
2018-06-24 66a9fb40efe1311b34a3cee3f83f10c6990759af
commit | author | age
425bf7 1 import abc
JK 2 import json
3
4 import numpy
5 import scipy.linalg
6 from keras import Sequential
7 from keras.layers import Dense
8 from keras.models import load_model
9
10 model_types = {}
11
12
13 class Model:
14     @abc.abstractmethod
15     def get_temperature(self, flat_history: numpy.array) -> float:
16         pass
17
18     @abc.abstractmethod
19     def save(self, filename):
20         pass
21
22     @classmethod
23     def load(cls, config, filename):
24         return model_types[config['model_type']]._load(filename)
25
26     @abc.abstractclassmethod
27     def _load(cls, filename):
28         pass
29
30     @classmethod
31     def generate(cls, config: dict):
32         return model_types[config['model_type']].generate(config)
33
34
35 class LinearModel(Model):
36     def __init__(self, model):
37         self.model = model
38
39     def get_temperature(self, flat_history):
40         return sum(numpy.multiply(self.model, flat_history))
41
42     def save(self, filename):
43         with open(filename, 'w') as fp:
44             json.dump(self.model.tolist(), fp)
45
46     @classmethod
47     def _load(cls, filename):
48         with open(filename, 'r') as fp:
49             model = json.load(fp)
50         model = numpy.array(model)
51         return cls(model)
52
53     @classmethod
54     def generate(cls, config: dict):
55         from .ArgParser import get_sliding_window_from_config
56         sliding_window = get_sliding_window_from_config(config)
57         xs = []
58         ys = []
59         for x in sliding_window:
60             xs.append(x.get_model_values())
61             ys.append(x.get_model_target())
62         return cls(scipy.linalg.lstsq(xs, ys)[0])
63
64
65 model_types['linear'] = LinearModel
66
67
68 class NeuralModel(Model):
69     def __init__(self, model):
70         self.model = model
71
72     def get_temperature(self, flat_history):
73         state = numpy.reshape(flat_history, [1, len(flat_history)])
74         result = self.model.predict(state)
75         return result[0][0]
76
77     @staticmethod
78     def _build_model(state_size):
79         model = Sequential()
80         model.add(Dense(int(state_size / 2) or 1, activation='linear',
81                         input_dim=state_size,
82                         kernel_initializer='random_uniform'))
83         model.add(Dense(int(state_size / 4) or 1, activation='linear',
84                         kernel_initializer='random_uniform'))
85         model.add(Dense(int(state_size / 8) or 1, activation='linear',
86                         kernel_initializer='random_uniform'))
87         model.add(Dense(1, activation='linear',
88                         kernel_initializer='random_uniform'))
89         model.compile(optimizer='adagrad',
90                       loss='mean_squared_error',
91                       metrics=['accuracy'])
92         return model
93
94     def save(self, filename):
95         self.model.save(filename)
96
97     @classmethod
98     def _load(cls, filename):
99         model = load_model(filename)
100         return cls(model)
101
102     @classmethod
103     def generate(cls, config: dict):
104         from .ArgParser import get_sliding_window_from_config
105         sliding_window = get_sliding_window_from_config(config)
106         window = next(sliding_window)
107         model = cls._build_model(window.get_model_size())
108
109         xs = []
110         ys = []
111         for window in sliding_window:
112             xs.append(window.get_model_values())
113             ys.append([window.get_next_value('temp_in')])
114
115         model.fit(numpy.array(xs), numpy.array(ys), epochs=config['model_epochs'], batch_size=32, shuffle=True)
116         return cls(model)
117
118
119 model_types['neural'] = NeuralModel
120
121
122 class NeuralModelLess(NeuralModel):
123     @staticmethod
124     def _build_model(state_size):
125         model = Sequential()
126         model.add(Dense(int(state_size / 2) or 1, activation='linear',
127                         input_dim=state_size,
128                         kernel_initializer='random_uniform'))
129         model.add(Dense(int(state_size / 4) or 1, activation='linear',
130                         kernel_initializer='random_uniform'))
131         model.add(Dense(int(state_size / 8) or 1, activation='linear',
132                         kernel_initializer='random_uniform'))
133         model.add(Dense(1, activation='linear',
134                         kernel_initializer='random_uniform'))
135         model.compile(optimizer='adagrad',
136                       loss='mean_squared_error',
137                       metrics=['accuracy'])
138         return model
139
140 model_types['neural_less'] = NeuralModelLess
141
142
143 class NeuralModelMore(NeuralModel):
144     @staticmethod
145     def _build_model(state_size):
146         model = Sequential()
147         model.add(Dense(int(state_size / 2) or 1, activation='linear',
148                         input_dim=state_size,
149                         kernel_initializer='random_uniform'))
150         model.add(Dense(int(state_size / 4) or 1, activation='linear',
151                         kernel_initializer='random_uniform'))
152         model.add(Dense(int(state_size / 8) or 1, activation='linear',
153                         kernel_initializer='random_uniform'))
154         model.add(Dense(1, activation='linear',
155                         kernel_initializer='random_uniform'))
156         model.compile(optimizer='adagrad',
157                       loss='mean_squared_error',
158                       metrics=['accuracy'])
159         return model
160
161 model_types['neural_more'] = NeuralModelMore
162
163
164 class NeuralModelRelu(NeuralModel):
165     @staticmethod
166     def _build_model(state_size):
167         model = Sequential()
168         model.add(Dense(int(state_size / 2) or 1, activation='linear',
169                         input_dim=state_size,
170                         kernel_initializer='random_uniform'))
171         model.add(Dense(int(state_size / 4) or 1, activation='relu',
172                         kernel_initializer='random_uniform'))
173         model.add(Dense(int(state_size / 8) or 1, activation='relu',
174                         kernel_initializer='random_uniform'))
175         model.add(Dense(1, activation='linear',
176                         kernel_initializer='random_uniform'))
177         model.compile(optimizer='adagrad',
178                       loss='mean_squared_error',
179                       metrics=['accuracy'])
180         return model
181
182 model_types['neural_relu'] = NeuralModelRelu
183
184
185 class NeuralModelReduced(NeuralModel):
186     @staticmethod
187     def _build_model(state_size):
188         model = Sequential()
189         model.add(Dense(int(state_size / 2) or 1, activation='linear',
190                         input_dim=state_size,
191                         kernel_initializer='random_uniform'))
192         model.add(Dense(1, activation='linear',
193                         kernel_initializer='random_uniform'))
194         model.compile(optimizer='adagrad',
195                     loss='mean_squared_error',
196                     metrics=['accuracy'])
197         return model
198
199 model_types['neural_half'] = NeuralModelReduced
200
201
202 class NeuralModelSqueezeExpand(NeuralModel):
203     @staticmethod
204     def _build_model(state_size):
205         model = Sequential()
206         model.add(Dense(int(state_size / 2) or 1, activation='linear',
207                         input_dim=state_size,
208                         kernel_initializer='random_uniform'))
209         model.add(Dense(int(state_size / 4) or 1, activation='linear',
210                         kernel_initializer='random_uniform'))
211         model.add(Dense(int(state_size / 2) or 1, activation='linear',
212                         kernel_initializer='random_uniform'))
213         model.add(Dense(1, activation='linear',
214                         kernel_initializer='random_uniform'))
215         model.compile(optimizer='adagrad',
216                       loss='mean_squared_error',
217                       metrics=['accuracy'])
218         return model
219
220 model_types['neural_hqh'] = NeuralModelSqueezeExpand