Skip to content

Commit

Permalink
Inference on the TPLS model (with missing values now handled).
Browse files Browse the repository at this point in the history
  • Loading branch information
kgdunn committed Mar 9, 2025
1 parent 5ee42aa commit ee7276c
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ repos:

- repo: https://github.com/charliermarsh/ruff-pre-commit
# Ruff version. To eventually replace flake8?
rev: "v0.9.9"
rev: "v0.9.10"
hooks:
- id: ruff
5 changes: 4 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false
"justMyCode": false,
"env": {
"PYDEVD_WARN_SLOW_RESOLVE_TIMEOUT": "5"
}
}
]
}
125 changes: 90 additions & 35 deletions process_improve/multivariate/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,11 +605,13 @@ def fit(self, X: DataMatrix, Y: DataMatrix) -> PLS_sklearn: # noqa: PLR0915
Abdi, "Partial least squares regression and projection on latent structure
regression (PLS Regression)", 2010, DOI: 10.1002/wics.51
"""
self.N, self.K = X.shape
self.Ny, self.M = Y.shape
assert (
self.Ny == self.N
), f"The X and Y arrays must have the same number of rows: X has {self.N} and Y has {self.Ny}."
self.N: int = X.shape[0]
self.K: int = X.shape[1]
self.Ny: int = Y.shape[0]
self.M: int = Y.shape[1]
assert self.Ny == self.N, (
f"The X and Y arrays must have the same number of rows: X has {self.N} and Y has {self.Ny}."
)

# Check if number of components is supported against maximum requested
min_dim = min(self.N, self.K)
Expand Down Expand Up @@ -1763,12 +1765,16 @@ def fit(self, X: dict[str, dict[str, pd.DataFrame]], y: None = None) -> TPLS: #
# Model parameters
# ----------------
self.t_scores: pd.DataFrame = pd.DataFrame(index=self.observation_names)
self.r_scores: dict[str, pd.DataFrame] = {
self.r_loadings: dict[str, pd.DataFrame] = {
key: pd.DataFrame(index=self.property_names[key]) for key in group_keys
}
# self.u_scores: pd.DataFrame = pd.DataFrame()
# self.w_scores: pd.DataFrame = pd.DataFrame()
self.w_loadings_z: dict[str, pd.DataFrame] = {
key: pd.DataFrame(index=self.condition_names[key]) for key in z_mats
}
# self.q_scores: pd.DataFrame = pd.DataFrame() # corrected from {}
self.w_loadings_super = pd.DataFrame(index=["Z", "F"])

self.spe: dict[str, dict[str, pd.DataFrame]] = {key: {} for key in self.required_blocks_}
self.spe_limit: dict[str, dict[str, Callable]] = {key: {} for key in self.required_blocks_}
self.hotellings_t2: pd.DataFrame = pd.DataFrame()
Expand Down Expand Up @@ -1797,14 +1803,32 @@ def _has_converged(self, starting_vector: np.ndarray, revised_vector: np.ndarray
max_iter = iterations >= self.max_iterations_
return bool(np.any([max_iter, converged]))

def _store_model_coefficients(self, pc_a: int, t_super_i: np.ndarray, r_i: dict[str, np.ndarray]) -> None:
def _store_model_coefficients(
self,
pc_a: int,
t_super_i: np.ndarray,
r_i: dict[str, np.ndarray],
w_i_z: dict[str, np.ndarray],
w_super_i: np.ndarray,
) -> None:
"""Store the model coefficients for later use."""

self.t_scores = self.t_scores.join(pd.DataFrame(t_super_i, index=self.observation_names, columns=[pc_a]))
self.r_scores = {
key: self.r_scores[key].join(pd.DataFrame(r_i[key], index=self.property_names[key], columns=[pc_a]))

# These are loadings really, not scores, for each group in the F block.
self.r_loadings = {
key: self.r_loadings[key].join(pd.DataFrame(r_i[key], index=self.property_names[key], columns=[pc_a]))
for key in r_i
}

# These are the loadings for the Z space
self.w_loadings_z = {
key: self.w_loadings_z[key].join(pd.DataFrame(w_i_z[key], index=self.condition_names[key], columns=[pc_a]))
for key in w_i_z
}

self.w_loadings_super = self.w_loadings_super.join(pd.DataFrame(w_super_i, index=["Z", "F"], columns=[pc_a]))

def _calculate_and_store_deflation_matrices(
self,
t_super_i: np.ndarray,
Expand Down Expand Up @@ -1980,18 +2004,18 @@ def _fit_iterative_regressions(self) -> None:
if self.n_conditions > 0:
# Step 7: w_i = Z_i' u / u'u. Regress the columns of Z on u_i, and store the slope coefficients
# in vectors w_i.
w_i = {
w_i_z = {
key: regress_a_space_on_b_row(df_z.T, u_super_i.T, self.not_na_z[key].T)
for key, df_z in zip(self.z_mats.keys(), self.z_mats.values(), strict=True)
}

# Step 8: Normalize joint w to unit length. See MB-PLS by Westerhuis et al. 1998. This is normal.
w_i_normalized = {key: w / np.linalg.norm(w) for key, w in w_i.items()}
w_i_z = {key: w / np.linalg.norm(w) for key, w in w_i_z.items()}

# Step 9: regress rows of Z on w_i, and store slope coefficients in t_z. There is an error in the
# paper here, but in figure 4 it is clear what should be happening.
t_zb = {
key: regress_a_space_on_b_row(df_z, w_i_normalized[key].T, self.not_na_z[key])
key: regress_a_space_on_b_row(df_z, w_i_z[key].T, self.not_na_z[key])
for key, df_z in zip(self.z_mats.keys(), self.z_mats.values(), strict=True)
}
t_z = np.concatenate(list(t_zb.values()), axis=1)
Expand All @@ -2000,8 +2024,8 @@ def _fit_iterative_regressions(self) -> None:
# Step 7: No Z block. Take an empty matrix across to the the superblock.
t_z = np.zeros((t_f.shape[0], 0)) # empty matrix: in other words, no Z block

# Step 10: Combine t_f and t_z to form a joint t matrix.
t_combined = np.concatenate([t_f, t_z], axis=1)
# Step 10: Combine t_z and t_f to form a joint t matrix.
t_combined = np.concatenate([t_z, t_f], axis=1)

# Step 11: Build an inner PLS model: using the t_combined as the X matrix, and the Y (quality space)
# as the Y matrix.
Expand All @@ -2014,9 +2038,10 @@ def _fit_iterative_regressions(self) -> None:
u_super_i = inner_pls["u_i"] # only used for convergence check; not stored or used further
t_super_i = inner_pls["t_i"]
q_super_i = inner_pls["q_i"]
# wt_i = inner_pls["w_i"]
w_super_i = inner_pls["w_i"]

# After convergance. Step 12: Now store information.
# =================
delta_gap = float(np.linalg.norm(u_prior - u_super_i, ord=None) / np.linalg.norm(u_prior, ord=None))
self.fitting_statistics["iterations"].append(n_iter)
self.fitting_statistics["convergance_tolerance"].append(delta_gap)
Expand All @@ -2031,7 +2056,7 @@ def _fit_iterative_regressions(self) -> None:
# self.v_d_blocks
# self.q_y_blocks

self._store_model_coefficients(pc_a + 1, t_super_i=t_super_i, r_i=r_i) # , q_super_i=q_super_i, )
self._store_model_coefficients(pc_a + 1, t_super_i=t_super_i, r_i=r_i, w_i_z=w_i_z, w_super_i=w_super_i)

# Calculate and store the deflation vectors. See equation 7 on page 55.
self._calculate_and_store_deflation_matrices(t_super_i=t_super_i, q_super_i=q_super_i, r_i=r_i)
Expand All @@ -2042,7 +2067,7 @@ def _fit_iterative_regressions(self) -> None:
# Step 15: Calculate the final model limit
self._calculate_model_statistics_and_limits()

def predict(self, X: dict[str, dict[str, pd.DataFrame] | pd.DataFrame]) -> dict:
def predict(self, X: dict[str, dict[str, pd.DataFrame]]) -> dict:
"""
Model inference on new already pre-processed data.
Expand All @@ -2062,7 +2087,7 @@ def predict(self, X: dict[str, dict[str, pd.DataFrame] | pd.DataFrame]) -> dict:
Parameters
----------
X : dict[str, dict[str, pd.DataFrame] | pd.DataFrame])
X : dict[str, dict[str, pd.DataFrame]])
The input samples.
Returns
Expand All @@ -2074,35 +2099,65 @@ def predict(self, X: dict[str, dict[str, pd.DataFrame] | pd.DataFrame]) -> dict:

# Check consistency on the data: the columns names in the new data must match the columns names in the training
# data.

not_na_f = {key: ~np.isnan(X["F"][key].values) for key in X["F"]}
not_na_z = {key: ~np.isnan(X["Z"][key].values) for key in X["Z"]}
num_obs = X["F"][next(iter(X["F"]))].shape[0]
for key in X["F"]:
assert set(X["F"][key].columns) == set(
self.property_names[key]
), f"Columns in block F, group [{key}] must match training data column names"
assert X["F"][key].shape[0] == num_obs, "All formula blocks must have the same number of rows."
assert set(X["F"][key].columns) == set(self.property_names[key]), (
f"Columns in block F, group [{key}] must match training data column names"
)

for key in X["Z"]:
assert set(X["Z"][key].columns) == set(
self.condition_names[key]
), f"Columns names in block Z, group [{key}] must match training data column names."
assert X["Z"][key].shape[0] == num_obs, "All condition blocks must have the same number of rows."
assert set(X["Z"][key].columns) == set(self.condition_names[key]), (
f"Columns names in block Z, group [{key}] must match training data column names."
)

for pc_a in range(self.n_components):
# Regress the row of each new formula block on the r_scores, to get the t-score for that pc_a component.
# Add up the t-score as you go block by block.
t_super_i = 0
denom = 0
# Regress the row of each new formula block on the r_loadings (more like loadings), to get the t-score for
# that pc_a component. Add up the t-score as you go block by block.
score_f_a = np.zeros(num_obs)
denominators = np.zeros(num_obs)
for key in X["F"]:
# TODO: handle missing values
t_super_i += X["F"][key].values @ (denom_key := self.r_scores[key].iloc[:, pc_a].values.reshape(-1, 1))
denom += ssq(denom_key)
t_super_i /= denom
b_row = np.array(self.r_loadings[key].iloc[:, pc_a].values)
# Tile row-by-row to create `n_rows`, and maps missing entries to zero, so they have no effect
denom = np.tile(b_row, (num_obs, 1)) * not_na_f[key]
score_f_a += np.array(np.sum(X["F"][key].values * denom, axis=1)) # numerator portion
denominators += np.sum((denom * not_na_f[key]) ** 2, axis=1)

denominators[denominators == 0] = np.nan # Guard should not be needed; should never be zeros in here.
score_f_a /= denominators

# Repeat for the Z-space: regress the row of each new Z block on the w-loadings, to get the
# t-score for that pc_a. It seems redundant to divide by w'w, since w is already normalized, but if there
# are missing values, then that correction is needed, to avoid dividing by a larger value than is fair.
score_z_a = np.zeros(num_obs)
denominators = np.zeros(num_obs)
for key in X["Z"]:
b_row = np.array(self.w_loadings_z[key].iloc[:, pc_a].values)
denom = np.tile(b_row, (num_obs, 1)) * not_na_z[key]
score_z_a += np.array(np.sum(X["Z"][key].values * denom, axis=1))
denominators += np.sum((denom * not_na_z[key]) ** 2, axis=1)

denominators[denominators == 0] = np.nan # Guard should not be needed; should never be zeros in here.
score_z_a /= denominators

# Multiple the individual block scores by the super-weights, to get the super-scores.
# After transposing below, rows are the observations, and columns are the blocks: [Z, F]
super_scores = np.vstack([score_z_a, score_f_a]).T @ self.w_loadings_super.iloc[:, pc_a].values.reshape(
-1, 1
)

# Multiple by loadings_p
# Deflate to get the new F matrix for the next iteration

# Collect A of these values.
# Multiply by Q matrix to get Y-hat
# Calculate the SPE and T2 values: for all the spaces
# return this in a dict structure

return {}
return {"Y": pd.DataFrame(), "SPE_Z": pd.DataFrame(), "SPE_F": pd.DataFrame(), "T2": pd.DataFrame()}


class Plot:
Expand Down
20 changes: 10 additions & 10 deletions tests/test_multivariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,22 +1455,22 @@ def test_tpls_model_fitting(fixture_tpls_example: dict) -> None:
tpls_test.fit(transformed_data)

# Test the model's predictions. Use the first sample of the data as a testing data point.
testing_sample = "L001"
testing_samples = ["L001", "L002", "L003"]
new_observation_raw = {
"Z": {"Conditions": fixture_tpls_example["Z"]["Conditions"].loc[[testing_sample]]},
"F": {key: val.loc[[testing_sample]] for key, val in fixture_tpls_example["F"].items()},
"Z": {"Conditions": fixture_tpls_example["Z"]["Conditions"].loc[testing_samples]},
"F": {key: val.loc[testing_samples] for key, val in fixture_tpls_example["F"].items()},
}
new_observation = preproc.transform(new_observation_raw)
assert len(new_observation["D"]) == 0
assert len(new_observation["Y"]) == 0
new_observations = preproc.transform(new_observation_raw)
assert len(new_observations["D"]) == 0
assert len(new_observations["Y"]) == 0

# Assert that these match the training data after it was preprocessed:
assert np.allclose(new_observation["Z"]["Conditions"], transformed_data["Z"]["Conditions"].loc[[testing_sample]])
for key in new_observation["F"]:
assert np.allclose(new_observation["F"][key], transformed_data["F"][key].loc[[testing_sample]])
assert np.allclose(new_observations["Z"]["Conditions"], transformed_data["Z"]["Conditions"].loc[testing_samples])
for key in new_observations["F"]:
assert np.allclose(new_observations["F"][key], transformed_data["F"][key].loc[testing_samples])

# OK, now use these to make predictions
predictions = tpls_test.predict(new_observation)
predictions = tpls_test.predict(new_observations)

# NEXT: do predictions for a selected subset of samples, and
# NEXT: do predictions for the entire data set
Expand Down

0 comments on commit ee7276c

Please # to comment.