import abc
|
from copy import deepcopy
|
|
from numpy import binary_repr
|
from .Env import CentralHeatingHistoryEnv
|
from .Model import Model
|
from .Cost import Cost
|
from .SlidingWindow import SlidingWindow
|
|
|
class Controller(abc.ABC):
|
def __init__(self):
|
pass
|
|
@abc.abstractmethod
|
def act(self, window: SlidingWindow) -> int:
|
pass
|
|
|
class StaticController(Controller):
|
def __init__(self, output: int):
|
self.output = output
|
|
def act(self, window: SlidingWindow) -> int:
|
return self.output
|
|
|
class ThermostatController(Controller):
|
def __init__(self, temp_min, temp_max):
|
self.temp_min = temp_min
|
self.temp_max = temp_max
|
self.mode = 0
|
|
def act(self, window: SlidingWindow):
|
temp_in = window.get_current_value('temp_in')
|
|
if self.mode:
|
if temp_in >= self.temp_max:
|
self.mode = False
|
return False
|
else:
|
return True
|
|
if temp_in <= self.temp_min:
|
self.mode = True
|
return True
|
|
return False
|
|
|
class ReplayController(Controller):
|
def act(self, window: SlidingWindow):
|
return window.get_current_value('mode')
|
|
|
class CostController(Controller):
|
def __init__(self, model: Model, cost_calculator_class, *args, **kwargs):
|
self.model = model
|
self.cost_calculator_class = cost_calculator_class
|
|
def act(self, window: SlidingWindow):
|
saved_mode = window.get_current_value('mode')
|
saved_temp = window.get_next_value('temp_in')
|
|
window.set_current_value('mode', 0)
|
window.set_next_value('temp_in', self.model.get_temperature(window.get_model_values()))
|
cost_no = self.cost_calculator_class().compute_partial_cost(window)
|
cost_no = sum(cost_no.values())
|
window.set_current_value('mode', 1)
|
window.set_next_value('temp_in', self.model.get_temperature(window.get_model_values()))
|
cost_yes = self.cost_calculator_class().compute_partial_cost(window)
|
cost_yes = sum(cost_yes.values())
|
|
window.set_current_value('mode', saved_mode)
|
window.set_next_value('temp_in', saved_temp)
|
return cost_yes < cost_no
|
|
|
class BruteForceController(CostController):
|
def act(self, window: SlidingWindow):
|
future_values = window.future_values - window.model_future_values
|
|
last_mode = window.get_previous_value('mode')
|
|
best_cost = float('+inf')
|
best_option = None
|
|
for i in range(1 << future_values):
|
test_modes = binary_repr(i, future_values)
|
test_diffs = (-0.02, 0.02)
|
test_costs = [None] * len(test_diffs)
|
for i, diff in enumerate(test_diffs):
|
test_window = window.copy(plot=False)
|
test_costs[i] = self.cost_calculator_class()
|
for mode in test_modes:
|
mode = int(mode)
|
test_window.set_current_value('mode', mode)
|
model_calc = self.model.get_temperature(test_window.get_model_values())
|
model_calc += diff
|
test_window.set_next_value('temp_in', model_calc)
|
test_window.set_next_value('temp_out', test_window.get_next_value('temperature'))
|
test_costs[i].add_partial_cost(test_window)
|
test_window.next()
|
test_cost = max(*test_costs)
|
if test_cost.total() < best_cost or (test_cost.total() == best_cost and int(test_modes[0]) == last_mode):
|
best_cost = test_cost.total()
|
best_option = test_modes
|
return int(best_option[0])
|
|
|
def simulate(env: CentralHeatingHistoryEnv, controller: Controller, render=True):
|
state, reward, done, info = env.reset(), 0, False, None
|
while not done:
|
action = controller.act(env.window)
|
state, reward, done, info = env.step(action)
|
|
|
print(env.cost)
|
if render:
|
env.render()
|