import abc from copy import deepcopy from numpy import binary_repr from .Env import CentralHeatingHistoryEnv from .Model import Model from .Cost import Cost from .SlidingWindow import SlidingWindow class Controller(abc.ABC): def __init__(self): pass @abc.abstractmethod def act(self, window: SlidingWindow) -> int: pass class StaticController(Controller): def __init__(self, output: int): self.output = output def act(self, window: SlidingWindow) -> int: return self.output class ThermostatController(Controller): def __init__(self, temp_min, temp_max): self.temp_min = temp_min self.temp_max = temp_max self.mode = 0 def act(self, window: SlidingWindow): temp_in = window.get_current_value('temp_in') if self.mode: if temp_in >= self.temp_max: self.mode = False return False else: return True if temp_in <= self.temp_min: self.mode = True return True return False class ReplayController(Controller): def act(self, window: SlidingWindow): return window.get_current_value('mode') class CostController(Controller): def __init__(self, model: Model, cost_calculator_class, *args, **kwargs): self.model = model self.cost_calculator_class = cost_calculator_class def act(self, window: SlidingWindow): saved_mode = window.get_current_value('mode') saved_temp = window.get_next_value('temp_in') window.set_current_value('mode', 0) window.set_next_value('temp_in', self.model.get_temperature(window.get_model_values())) cost_no = self.cost_calculator_class().compute_partial_cost(window) cost_no = sum(cost_no.values()) window.set_current_value('mode', 1) window.set_next_value('temp_in', self.model.get_temperature(window.get_model_values())) cost_yes = self.cost_calculator_class().compute_partial_cost(window) cost_yes = sum(cost_yes.values()) window.set_current_value('mode', saved_mode) window.set_next_value('temp_in', saved_temp) return cost_yes < cost_no class BruteForceController(CostController): def act(self, window: SlidingWindow): future_values = window.future_values - window.model_future_values last_mode = window.get_previous_value('mode') best_cost = float('+inf') best_option = None for i in range(1 << future_values): test_modes = binary_repr(i, future_values) test_diffs = (-0.02, 0.02) test_costs = [None] * len(test_diffs) for i, diff in enumerate(test_diffs): test_window = window.copy(plot=False) test_costs[i] = self.cost_calculator_class() for mode in test_modes: mode = int(mode) test_window.set_current_value('mode', mode) model_calc = self.model.get_temperature(test_window.get_model_values()) model_calc += diff test_window.set_next_value('temp_in', model_calc) test_window.set_next_value('temp_out', test_window.get_next_value('temperature')) test_costs[i].add_partial_cost(test_window) test_window.next() test_cost = max(*test_costs) if test_cost.total() < best_cost or (test_cost.total() == best_cost and int(test_modes[0]) == last_mode): best_cost = test_cost.total() best_option = test_modes return int(best_option[0]) def simulate(env: CentralHeatingHistoryEnv, controller: Controller, render=True): state, reward, done, info = env.reset(), 0, False, None while not done: action = controller.act(env.window) state, reward, done, info = env.step(action) print(env.cost) if render: env.render()