-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
104 lines (71 loc) · 3.23 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
from typing import List, Tuple, Dict
import pandas as pd
from sklearn.decomposition import PCA
import sklearn.preprocessing
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.graphics.regressionplots import abline_plot
def load_data(directory: str):
_filepaths = [directory + "/" + filename for filename in os.listdir(directory)]
files = [pd.read_csv(path) for path in _filepaths]
# Merge C13-1 and C13-2 as well as C7-1 and C7-2
c13 = pd.concat([files[1], files[2]])
c7 = pd.concat([files[6], files[7]])
files[1] = c13
files[6] = c7
files.pop(2)
files.pop(7)
# Drop Timestamp column
files = [df.drop("Timestamp", axis=1) for df in files]
# Handle NaN
files = [df.dropna() for df in files]
return files
def normalize_files(files: Dict[str, pd.DataFrame], normalization_norm: str) -> Dict[str, pd.DataFrame]:
"""Normalize files inplace and make sure to return pd.DataFrames"""
for name, df in files.items():
column_names = df.columns
files[name] = pd.DataFrame(sklearn.preprocessing.normalize(df, norm=normalization_norm), columns=column_names)
return files
def pca_componentwise(files: List[pd.DataFrame], components: list) -> List[pd.DataFrame]:
processed_files = []
for file in files:
new_columns = []
for component in components:
component_subset = file[component]
pca = PCA(n_components=1)
new_col = pca.fit_transform(component_subset)
component_column = pd.DataFrame(new_col)
new_columns.append(component_column)
result = pd.concat(new_columns, axis=1)
processed_files.append(result)
return processed_files
def plot_all_experiments(datasets: list, experiment_names: list, test_train_split: float = None,
savefig: str = "...", ols_line: bool = False,
legend_kwargs: dict = None) -> None:
fig, ax = plt.subplots(8, 1)
fig.set_size_inches(6, 24)
for i, dataset in enumerate(datasets):
axis = ax[i]
axis.scatter(x=dataset.index, y=dataset[0], s=1)
axis.set_title(f"Experiment {experiment_names[i]}")
if test_train_split is not None: # plot vertical line
test_train_split_index = int(len(dataset) * test_train_split)
axis.axvline(x=test_train_split_index, color="grey", linestyle="--", linewidth=1)
if ols_line:
x = sm.add_constant(dataset.index)
y = dataset[0]
abline_plot(model_results=sm.OLS(y, x).fit(), ax=axis, color="black", linewidth=1)
if legend_kwargs:
axis.legend(**legend_kwargs)
plt.tight_layout()
if savefig:
plt.savefig(savefig)
plt.show()
def test_train_split(files: Dict[str, pd.DataFrame], split: float) -> Tuple[List[pd.DataFrame], List[pd.DataFrame]]:
# Split train and test data -> first x% taken (not randomly)
def split_df_not_randomly(df, split: float):
split_index = int(len(df) * split)
return df[:split_index], df[split_index:]
train_files, test_files = zip(*[split_df_not_randomly(df, split) for df in files])
return train_files, test_files