-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathClimateEnvironment.py
180 lines (149 loc) · 6.2 KB
/
ClimateEnvironment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# NumPy
import numpy as np
# Gymnasium is a custom library for creating custom environments
import gymnasium as gym
from gymnasium import spaces
# Torch
import torch
import torch.nn.functional as F
# Math
import math
class ClimateEnv(gym.Env):
def __init__(self, data):
"""
Initialize the environment with the given dataset.
Args:
data (pd.DataFrame): The local climate dataset with columns:
[
"HourlyVisibility",
"HourlyStationPressure",
"HourlyRelativeHumidity",
"HourlyWindDirection",
"HourlyWindSpeed",
"HourlyAltimeterSetting",
"HourlyWetBulbTemperature",
"HourlyDewPointTemperature",
"HourlyDryBulbTemperature",
"DATE"
]
"""
super(ClimateEnv, self).__init__()
self.data = data
self.current_step = 0
# Identify feature columns (exclude the target and DATE)
self.feature_columns = [
"HourlyVisibility",
"HourlyStationPressure",
"HourlyRelativeHumidity",
"HourlyWindDirection",
"HourlyWindSpeed",
"HourlyAltimeterSetting",
"HourlyWetBulbTemperature",
"HourlyDewPointTemperature"
]
self.target_column = "HourlyDryBulbTemperature"
# Observation space: 8 continuous features
self.observation_space = spaces.Box(
low=-9999.0,
high=9999.0,
shape=(len(self.feature_columns),),
dtype=np.float32
)
# Action space: continuous prediction of temperature (for example, -50°C to 60°C)
self.action_space = spaces.Box(
low=np.array([-20.0]),
high=np.array([95.0]),
shape=(1,),
dtype=np.float32
)
# New
self.prev_action = None
self.current_data = self.data.copy()
def update_end_date(self, new_date):
"""
Adjust the subset of self.data that the environment uses for rollouts.
This avoids re-creating or re-registering the environment.
"""
if self.data.empty:
self.current_data = self.data
else:
self.current_data = self.data[self.data["DATE"] <= new_date].copy()
self.current_step = 0
self.prev_action = None
def reset(self, seed=None, options=None):
"""
Reset the environment to the initial step and return the first observation.
Returns:
obs (np.ndarray): The feature vector at the current step.
info (dict): Additional info dictionary (empty in this case).
"""
super().reset(seed=seed)
self.current_step = 0
self.prev_action = None
# Build the first observation
obs = self._get_observation(self.current_step)
return obs, {}
def step(self, action):
"""
Execute one step in the environment based on the chosen action.
Args:
action (np.ndarray): The predicted temperature (shape = (1,)).
Returns:
obs (np.ndarray): Next observation.
reward (float): Reward for the action taken.
done (bool): True if the episode is over.
truncated (bool): True if the episode was truncated.
info (dict): Additional debugging info.
"""
# Convert action to PyTorch tensor
action_tensor = torch.tensor(action, dtype=torch.float32)
predicted_temp = action_tensor.item() if action_tensor.numel() > 0 else 0.0
predicted_temp = torch.tensor(predicted_temp).clamp(min=-20, max=95).item() # Block predictions outside the range
# Get the true temperature for the current step
true_temp = self.data[self.target_column].iloc[self.current_step]
# Check for NaN values in the true temperature
if np.isnan(true_temp):
print(f"[WARNING] NaN detected at step {self.current_step}, assigning penalty")
reward = -10 # Penalization for missing data
else:
true_temp_tensor = torch.tensor(true_temp, dtype=torch.float32).unsqueeze(0)
# Mean Squared Error (MSE) Loss
mse_loss = F.mse_loss(action_tensor, true_temp_tensor)
# Mean Absolute Error (MAE) Loss
mae_loss = F.l1_loss(action_tensor, true_temp_tensor)
# Stability Penalty: L1 loss between current and previous prediction
if self.current_step > 0:
prev_pred_tensor = torch.tensor(self.prev_action, dtype=torch.float32)
stability_penalty = F.l1_loss(action_tensor.view(-1), prev_pred_tensor.view(-1)) / 10
else:
stability_penalty = 0.0
# Reward: Exponential decay of the combined loss
alpha = 0.001
raw_error = (mse_loss + mae_loss + stability_penalty).item()
scaled_error = alpha * raw_error
reward = math.exp(-scaled_error)
# Store the current action for the next step
self.prev_action = predicted_temp
# Move to the next step
self.current_step += 1
done = self.current_step >= (len(self.data) - 1)
truncated = False
# Reset the environment if the episode is over
if done:
obs, _ = self.reset()
else:
obs = self._get_observation(self.current_step)
if self.current_step % 100 == 0:
print(f"[DEBUG] Step {self.current_step} | True Temp: {true_temp:.2f} | Pred: {predicted_temp:.2f} | Reward: {reward:.8f}")
return obs, reward, done, truncated, {}
def _get_observation(self, step_idx):
"""
Return the feature vector at the given step index.
Args:
step_idx (int): The current index in the dataset.
Returns:
np.ndarray: The selected feature values as a float32 array.
"""
row = self.data.iloc[step_idx][self.feature_columns]
#row = self.data.loc[step_idx, self.feature_columns]
return row.values.astype(np.float32)