import datetime import gym import gym.spaces import matplotlib.dates import matplotlib.pyplot as plot import numpy from .ArgParser import get_sliding_window_from_config from .Cost import NegativeCost class CentralHeatingHistoryEnv(gym.Env): metadata = {'render.modes': ['human']} def __init__(self): self.config = None self.model = None self.sliding_window_adapter = None self.window = None self.cost_class = NegativeCost self.cost = None # Required by OpenAI Gym self.action_space = gym.spaces.Discrete(2) def reset(self): if not self.config or not self.model: raise Exception("config and model must be set before use!") self.cost = self.cost_class() self.sliding_window_adapter = get_sliding_window_from_config(self.config) self.window = next(self.sliding_window_adapter) observation_min = numpy.array([-50.] * self.window.get_window_size()) observation_max = numpy.array([50.] * self.window.get_window_size()) # Required by OpenAI Gym self.observation_space = gym.spaces.Box(observation_min, observation_max, dtype=numpy.float32) return numpy.array(self.window.get_window_values()) def step(self, action): return_observation = None return_cost = None return_done = False return_info = None try: self.window.set_current_value('mode', action) next_temp_in = self.model.get_temperature(self.window.get_model_values()) self.window.set_next_value('temp_in', next_temp_in) if next_temp_in > 27 or next_temp_in < 15: raise StopIteration() self.window = next(self.sliding_window_adapter) return_observation = self.window.get_window_values() partial_cost = self.cost.add_partial_cost(self.window) return_cost = sum(partial_cost.values()) except StopIteration: return_observation = self.window.get_window_values(offset=1) return_done = True return numpy.array(return_observation), return_cost, return_done, return_info def render(self, mode='human', close=False): if close or not self.window.plot: return x = matplotlib.dates.date2num([datetime.datetime.fromtimestamp(i) for i in self.window.get_plot_data('time')]) for i in self.window.plot.keys(): if i == "time": continue plot.plot_date(x, self.window.get_plot_data(i), '-', label=i) plot.grid(True) plot.legend() plot.xlabel('date') plot.ylabel('temperature/state') plot.show(block=True) class CentralHeatingInfiniteEnv(CentralHeatingHistoryEnv): def __init__(self): super().__init__() def step(self, action): try: return super().step(action) except KeyboardInterrupt: return None, 0, True, None