Principal Component Analysis (PCA) – Part 4 – Python ML – OOP Basics
This article is originally published at https://www.stoltzmaniac.com
Goal of this post:
- Add principal component analysis (PCA)
- Refactor using inheritance
- Convert gradient descent to stochastic gradient descent
- Add new tests via
pytest
What we are leaving for the next post:
- Discussing the need for packaging
- Start creating an actual package
In the previous post, we updated our model to handle the general case of linear regression. In doing so, we realized that we would be expanding regression to include linear regression, logistic regression, decision trees, etc. Because these all have some characteristics in common, we created a base regression class (class Regression
). After creating regression, we realized that we would always need data input in a specific way.
All of this code can be found on GitHub.
We decided to plan ahead. For all of our ML library we need our data to have the following input (at a minimum):
- Predictor variable(s)
- Response variable
- Consistent data format
- Test / Train split
- Randomization seed
- Preprocessing
Those 6 elements make up our data_handler
. Next, we can look at how we process our data. For all types of regression we want to perform, we will need the following input:
- All of the previous
data_handler
input - Learning rate for stochastic gradient descent (SGD)
For linear regression, we specifically need:
- All of the previous base
regression
input - Error tolerance cutoff
- Batch size for SGD
- Maximum number epochs cutoff
- Decay rate for SGD
For PCA, we specifically need:
- All of the previous
data_handler
input - Percent of variance explained cutoff
Note that PCA does not require regression
inheritance. However, the dimensionalizer
may require a base class, we have not completed anything else in that section so it’s unclear, at this point, what we may need. Planning for these types of cases is a much better idea, but we are winging it in this series, so we will put that on hold until we need to add additional techniques.
Due to the fact that this blog primarily focuses on data science topics, we’ll look at this approach from the top down. First, look at the machine learning model we care about, and then delve into what it’s pulling from. That being said, let’s take a look at our PrincipalComponentAnalysis
class.
import numpy as np from models.data_handler import PreProcessData class PrincipalComponentAnalysis(PreProcessData): def __init__( self, predictor_vars, response_var, train_split, seed, scale_type, variance_explained_cutoff: float, ): """ Returns object with PCA matrix and can be used to predict :param variance_explained_cutoff: float with value between 0 and 1, max cumulative variance explained cutoff """ super().__init__( predictor_vars=predictor_vars, response_var=response_var, train_split=train_split, seed=seed, scale_type=scale_type, )
You’ll notice that this object inherits everything from the PreProcessData
class. That means, everything that lives within that class can be utilized by PrincipalComponentAnalysis
. In fact, the only thing that PCA needs is the variance_explained_cutoff
parameter. All of the others come straight from the data. This was designed because many models utilize PCA in preprocessing, and therefore, this could potentially be refactored in the future to be utilized within another class.
We also see super().__init__()
being called. This is a magical constructor in Python. If you are not familiar with it, please take a look at some documentation and examples. Basically, it pulls everything in from the class being inherited (also applies to multiple inheritance). Next, we add the following (showing the entire class each time).
class PrincipalComponentAnalysis(PreProcessData): def __init__( self, predictor_vars, response_var, train_split, seed, scale_type, variance_explained_cutoff: float, ): """ Returns object with PCA matrix and can be used to predict :param variance_explained_cutoff: float with value between 0 and 1, max cumulative variance explained cutoff """ super().__init__( predictor_vars=predictor_vars, response_var=response_var, train_split=train_split, seed=seed, scale_type=scale_type, ) self.variance_explained_cutoff = variance_explained_cutoff self.eigenvalues_all = [] self.eigenvectors_all = [] self.eigenvalues = [] self.eigenvectors = [] self.pca_predictor_vars = np.ndarray if ( type(self.variance_explained_cutoff) != float or not 0 < self.variance_explained_cutoff < 1 ): raise ValueError( f"variance_explained_cutoff needs to be a float between 0 and 1, it is {self.variance_explained_cutoff}" )
What we are doing here is simply instantiating new variables within self
to allow it to live within the object and be called. This is not necessary but is a best practice because it shows what you are expecting in terms of a data type and structure. We see self.variance_explained_cutoff = variance_explained_cutoff
which will allow us to limit the number of eigenvectors we expect to utilize for our PCA projections. This number represents the cumulative variance explained which helps to contain the number of vectors required for our model. The following shows the entire class:
import numpy as np from models.data_handler import PreProcessData class PrincipalComponentAnalysis(PreProcessData): def __init__( self, predictor_vars, response_var, train_split, seed, scale_type, variance_explained_cutoff: float, ): """ Returns object with PCA matrix and can be used to predict :param variance_explained_cutoff: float with value between 0 and 1, max cumulative variance explained cutoff """ super().__init__( predictor_vars=predictor_vars, response_var=response_var, train_split=train_split, seed=seed, scale_type=scale_type, ) self.variance_explained_cutoff = variance_explained_cutoff self.eigenvalues_all = [] self.eigenvectors_all = [] self.eigenvalues = [] self.eigenvectors = [] self.pca_predictor_vars = np.ndarray if ( type(self.variance_explained_cutoff) != float or not 0 < self.variance_explained_cutoff < 1 ): raise ValueError( f"variance_explained_cutoff needs to be a float between 0 and 1, it is {self.variance_explained_cutoff}" ) self.calculate_eigens() def calculate_eigens(self): """ Create eigenvalues and eigenvectors in descending order, check cutoff :return: """ covariance_matrix = np.cov(self.predictor_vars_train.T) eigenvalues, eigenvectors = np.linalg.eig(covariance_matrix) idx = eigenvalues.argsort()[::-1] # Create "All" version self.eigenvalues_all = eigenvalues[idx] self.eigenvectors_all = eigenvectors[:, idx] # Create selected percentage version with cutoff eigenvalues_pct = self.eigenvalues_all / np.sum(self.eigenvalues_all) self.pct_var_exp_cumulative_all = np.cumsum(eigenvalues_pct) self.pct_var_exp_cumulative = self.pct_var_exp_cumulative_all[ self.pct_var_exp_cumulative_all <= self.variance_explained_cutoff ] self.eigenvectors = self.eigenvectors_all[:, : len(self.pct_var_exp_cumulative)] self.eigenvalues = self.eigenvalues_all[: len(self.pct_var_exp_cumulative)] def build(self, data: np.ndarray): """ Converts outside of the train set :param data: np.ndarray :return: """ ret = data.dot(self.eigenvectors) self.pca_predictor_vars = ret return ret def __str__(self): return f""" Variance Explained Cutoff: {self.variance_explained_cutoff} PCA Variance Explained: {self.pct_var_exp_cumulative} Eigenvalues: {self.eigenvalues} Eigenvectors: {self.eigenvectors} """
It’s interesting to note that self.calculate_eigens()
is called before the calculate_eigens()
method is even defined. This is a handy little trick that will process our eigenvectors as soon as we instantiate our object.
In looking at the calculate_eigens()
method we can see a whole bunch of linear algebra taking place, this is up to you to interpret due to the amount of effort it takes to describe it. However, it is straightforward and there is nothing clever about its implementation. Within the method, it also finds the cumulative variance explained and limits our return to that data.
The build()
method allows us to project any new data by utilizing these eigenvectors.
When we run our model in the test suite, our results are:
Did you notice that this is all being trained on a training set? How did this data get split up? … This is the beauty of inheritance. As mentioned, we inherited the PreProcessData
class which inherits the SplitTestTrain
class which inherits the InputData
class which inherits the InputBase
class. Each of these classes helps us break things up and makes our code more flexible and maintainable. The OOP experts out there would have large problems with this setup, but each time we go through this project, improvements are being made and the code is becoming cleaner and moving toward best practices.
Without going deep into the code, we can trace our data back through the chain to find that we can input data, have exceptions thrown along the way if there are issues, and truly gain confidence in our data pipeline.
In a nutshell, we are doing the following after putting data and parameters into the PrincipalComponentAnalysis
class:
- The data goes all the way back to the beginning of the
data_handler
and kicks off an entire chain of events - It is checked to be either a
numpy
array orpandas
series, and converted to anumpy
array if necessary - Moves on to be split into a test and training set based off of the
train_split
parameter and a seed is set to ensure that results are reproducible - Moves on to be preprocessed (only includes scaling at this point)
- Enters into the PCA calculations
The LinearRegression
class is very similar, the same chain of events occur (except PCA). One major thing has changed within LinearRegression
. It now utilizes SGD rather than simple gradient descent. While there are advantages to using SGD, the reason I implemented it was simply to better understand how it works.
Here’s how the class is defined:
class LinearRegression(Regression): def __init__( self, predictor_vars, response_var, train_split, seed, scale_type, learning_rate, tolerance, batch_size, max_epochs, decay, ): """ All inherited from Regression class """ super().__init__( predictor_vars, response_var, train_split, seed, scale_type, learning_rate ) self.tolerance = tolerance self.batch_size = batch_size self.max_epochs = max_epochs self.decay = decay if not type(self.tolerance) == float or not 0 < self.tolerance < 1: raise ValueError( f"tolerance needs to be a float between 0 and 1, it is {self.tolerance}" ) if not type(self.batch_size) == int: raise ValueError( f"batch_size needs to be an int and shorter than the predictor_vars_train, it is {self.batch_size}" ) if not type(self.max_epochs) == int: raise ValueError(f"max_epochs needs to be an int, it is {self.max_epochs}") if not type(self.decay) == float or not 0 < self.decay < 1: raise ValueError( f"decay needs to be a float between 0 and 1, it is {self.decay}" ) # Add ones column to allow for beta 0 self.predictor_vars_train = np.c_[ np.ones(len(self.predictor_vars_train), dtype="int64"), self.predictor_vars_train, ] self.predictor_vars_test = np.c_[ np.ones(len(self.predictor_vars_test), dtype="int64"), self.predictor_vars_test, ] # Initialize betas if len(self.predictor_vars_train.shape) == 1: self.B = np.random.randn(1) else: self.B = np.random.randn(self.predictor_vars_train.shape[1]) # Automatically fit # self.fit_stochastic_gradient_descent() def predict(self, values_to_predict: np.ndarray): data = np.c_[np.ones(len(values_to_predict), dtype="int64"), values_to_predict] predicted_values = data.dot(self.B).flatten() return predicted_values def predict_(self, values_to_predict: np.ndarray): predicted_values = values_to_predict.dot(self.B).flatten() return predicted_values def find_gradient(self, x, y): estimate = self.predict_(values_to_predict=x) error = y.flatten() - estimate mse = (1.0 / len(x)) * np.sum(np.power(error, 2)) gradient = -(1.0 / len(x)) * error.dot(x) return {"gradient": gradient, "error": mse} def fit_stochastic_gradient_descent(self): epoch = 0 error = 1 while True: order = np.random.permutation(len(self.predictor_vars_train)) self.predictor_vars_train = self.predictor_vars_train[order] self.response_var_train = self.response_var_train[order] b = 0 while b < len(self.predictor_vars_train): tx = self.predictor_vars_train[b : b + self.batch_size] ty = self.response_var_train[b : b + self.batch_size] gradient_data = self.find_gradient(x=tx, y=ty) gradient = gradient_data["gradient"] error = gradient_data["error"] self.B -= self.learning_rate * gradient b += self.batch_size if epoch % 100 == 0: epoch_gradient = self.find_gradient( self.predictor_vars_train, self.response_var_train ) print(f"Epoch: {epoch} - Error: {epoch_gradient['error']}") if abs(error - epoch_gradient["error"]) < self.tolerance: print("Converged") break if epoch >= self.max_epochs: print("Max Epochs limit reached") break epoch += 1 self.learning_rate = self.learning_rate * (self.decay ** int(epoch / 1000)) def calculate_r_squared(self, predictor_vars, response_var): sum_sq_r = np.sum((response_var - self.predict_(predictor_vars)) ** 2) sum_sq_t = np.sum((response_var - response_var.mean()) ** 2) return 1 - (sum_sq_r / sum_sq_t) def __str__(self): return f""" Model Results ------------- Betas: {[i for i in zip(range(len(self.B)), self.B)]} R^2 Train: {self.calculate_r_squared(self.predictor_vars_train, self.response_var_train[:, 0])} R^2 Test: {self.calculate_r_squared(self.predictor_vars_test, self.response_var_test[:, 0])} """
Without going through the whole thing, we can focus on the fit_stochastic_gradient_descent()
method. Again, this is a very straightforward implementation and lacks all of the optimization and capabilities of the fantastic tools available via tensorflow
or other ML frameworks. This implementation allows us to tune many of the hyperparameters in the model. Obviously, this type of implementation from an object oriented perspective will make a big impact when doing a grid search for optimal values.
An example of the output from our model is shown in the README.md section of the GitHub repo associated with this project. Here’s an example of the output from our tests
Obviously, our contrived example was built to ensure that we could get an R^2 and weights that we knew to be accurate before starting (important when testing). Again, we see that R^2 is being shown for both train and test sets. We could do this for all of the other metrics associated with evaluating our model as well. This is one example of a handy feature which is not a default in many out-of-the-box packages.
As always, we need to write tests for our new functionality to ensure everything still holds up and cross our fingers that this does not break other code. All tests passed!
Whew! That was another very long post, please take a look at the code to ensure you understand what’s going on. Cloning the repo and taking a look is also a great way to tinker around.
Thanks for visiting r-craft.org
This article is originally published at https://www.stoltzmaniac.com
Please visit source website for post related comments.