Building a Data Pipeline in Python – Part 5 of N – Database, ORM & SQLAlchemy
This article is originally published at https://www.stoltzmaniac.com
Adding data to your database
Many people focusing on ETL will eventually be utilizing a database. We will be examining a relational database, SQLite in this case, to store and process our data. If you are not a SQL expert, this can be a daunting task. Most relational databases require you to know keys, indices, relationships, data types, etc. While you still need an understanding of these to do things properly, you do not need to write the SQL when utilizing an object relational mapper (ORM) such as SQLAlchemy in Python.
While the ORM handles a lot of the operations, there is one other very important thing to keep in mind about an ORM, these types of tools can utilize many types of databases. In our case, we’re using SQLite, but if you needed to switch it over to MySQL or SQL Server, you wouldn’t have to change your code! *This is mostly true, some operations are available in certain databases but not in others.*
SQLAlchemy will write all of the SQL behind the scenes for you and this type of abstraction can be extremely powerful for those who do not need extremely high performance reads / writes.
In this example, we will take a look at a script that would take your data and insert it into a database utilizing SQLAlchemy. For simplicity, we are only going to utilize a few columns of data to create tables for: country, orders, status.
If you have not gone through the previous posts, please do so in order to understand where we are at in terms of functionality. For convenience, we moved all of the analysis into one folder within our repository to keep life simple.
First, let’s go through the first few lines of our utils.py
file:
This code imports our required libraries and gives a function to read in the file. In practice, you will want to add more cleaning steps in order to get your transform your data into the format you need to clean and prepare your data for insertion into the database.
import pandas as pd import numpy as np def read_clean_data(filename: str): return pd.read_csv(filename).drop(['Unnamed: 0', 'QTR_ID'], axis=1)
In our ETL.py
file we will do our entire small project. Again, you should split this up and make the proper adjustments when going to a production setting.
Step 1, import dependencies.
sqlalchemy
and sqlalchemy.orm
are imported to create a connection to the database and start a “session” with the database. Think of a session as being similar to visiting a website where you are shopping online. First, you connect to the website and browse around, but during your “session” you add things to the cart. You are able to see things get added to your cart, and browsing between pages has not affected your cart at all. At the very end, you will checkout.
In SQLAlchemy, you are creating a “session” in which you connect to the database, temporarily add / modify / remove data, and then make your changes final.
from os import listdir import sys import sqlalchemy from sqlalchemy.orm import sessionmaker from utils import read_clean_data from test_data import test_data_integrity from models import Base, Status, Country, Orders
As you may have noticed, we also imported objects from models.py
– a file we created to make our code a bit easier to read. Within this file, we created the structure of the database that we desire. We described the tables as objects.
As always, we place all of our imports at the top. Here we see everything we need to get started. The basic process we are following is this:
- Declare a base class that our objects will inherit from
- Create a class with a
__tablename__
to be utilized in the database. This step is “optional” because SQLAlchemy can handle this for you, but it is wise to be explicit - Create your primary key columns, these will allow you to easily create relationships between tables later on
- Create columns to house your data. These are based off of the type of data you will store (integer, string, etc.)
- Add relationships. These columns will allow you to add objects that check relationships and utilize the power of SQLAlchemy.
In our case, we are creating 3 tables to hold the data we want: status
, country
, and orders
. Therefore, we name our classes accordingly and set the table names.
from sqlalchemy import Column, ForeignKey, Float, String, Integer from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship Base = declarative_base() class Status(Base): __tablename__ = 'status' id = Column(Integer, primary_key=True, autoincrement=True) status = Column(String, unique=True) class Country(Base): __tablename__ = 'country' id = Column(Integer, primary_key=True, autoincrement=True) country = Column(String, unique=True) class Orders(Base): __tablename__ = 'orders' order_number = Column(Integer, primary_key=True) order_line_number = Column(Integer, primary_key=True) quantity_ordered = Column(Integer) price_each = Column(Float) sales = Column(Float) status_id = Column(Integer, ForeignKey('status.id')) status = relationship(Status) country_id = Column(Integer, ForeignKey('country.id')) country = relationship(Country)
You will notice that we include primary_key=True
for our id columns. This tells SQLAlchemy that this value must be unique. Due to the fact that it is of the type Integer
we do not actually need to use the autoincrement=True
parameter, but I wanted to show it for those who have never used it.
The first 2 classes can be thought of as “dimension” tables. These basically house the metadata surrounding our orders. We do not want to repeat this data throughout the main table so it is linked to rather than stored in the main table. Database design must be done properly and thoroughly to avoid problems (please use an expert for this portion of your work).
Our Orders
class shows all of our transactions, sometimes referred to as a “fact” table. This links to the dimension tables via the status_id
and country_id
columns. We add the ForeignKey(...)
to be explicit in how these are linked. You will also notice that there are 2 primary keys in this table.
Remember, keys must be unique. In our case, neither order_number
nor order_line_number
are unique on their own, but when they are combined, we know that there should never be more than one row containing both. SQLAlchemy will ensure that an error if we violate this rule.
Now that we have a basic understanding of the tables we are looking to populate, let’s head back to our ETL.py
file. After reading in our new data, we will insert it into our database after it is tested. Previously, we utilized a Jupyter Notebook to look at a report. In this example, we assuming that our tests are rigorous enough to simply add the data if it passes these conditions.
Finally, we get to our database! First, we need to tell SQLAlchemy where our database is located. This is a connection string that can link to most relational SQL databases. In our case, we are simply using SQLite. For SQLite, a database is created if it doesn’t already exist, this will not be the case for others.
The engine
is critical for a successful experience. It is quite complicated behind the scenes because it handles the drivers and knows what SQL to write for us.
new_filename = 'new_data/' + listdir('new_data')[0] new_data = read_clean_data(new_filename) integrity_test = test_data_integrity(new_data) if not integrity_test: sys.exit("[ERROR] - ETL Aborted - New data did not pass integrity tests!") # Create a connection to the database - if this does not exist, one is created (for SQLite) database_location = "sqlite:///my_sales_db.sqlite" print(f"Creating a connection to the database: {database_location}") engine = sqlalchemy.create_engine(database_location)
Next, we take our Base
class, that houses all of our models and the method create_all
will build our database schema! This is a HUGE time saver and my favorite piece of the library. The create_all
method only needs to be run one time because the schema will persist in the database. There are ways to change configurations of your database with SQLAlchemy (called a migration) but that is another topic altogether.
Next, we utilize sessionmaker
to create the “session” we discussed earlier, and this is where the magic happens.
# Create / Build Schema Base.metadata.create_all(engine) # Access data via a session Session = sessionmaker(bind=engine) session = Session()
Here’s a diagram of the schema that has been created! You can see the keys and relationships laid out.
In our example, we are going to simply pull all of our data and insert it into the database. This practice will not be used in a production environment for a number of reasons but illustrates what we are doing. We pull in our “unique” values from our `STATUS` column and put them into a list. We iterate through these values and utilize the session.add()
method to tell SQLAlchemy we would like to add that string to our `status` table. When using .add
we are simply loading it into the session, but it has not been added to the database at this point. Remember our shopping cart analysis (this has been added to the cart but we have not checked out). Once we use the `session.commit()` method, we have now written it to the database. If this were to fail, we need to use `session.rollback()` to clear out the session and start over (in shopping cart terms, we have now completed checkout).
# Insert data iteratively, begin by filling out the dimension tables for status_ in list(new_data['STATUS'].unique()): status = Status(status=status_) session.add(status) session.commit()
We follow the same steps for country
.
for country_ in list(new_data['COUNTRY'].unique()): country = Country(country=country_) session.add(country) session.commit()
Order
is a bit trickier…
Since we are adding multiple columns, we will utilize the pandas
method to_dict
in order to convert our DataFrame
into a list of dictionaries.
Here is more SQLAlchemy magic. In order to add relationships to the database, we simply lookup our dimensions and set them as objects, and then add them as parameters within our Orders
object!
Let’s take a look at the status
and country
objects:
- Query is sent and looks up whether or not this value already exists in our
status
table. If it exists, it returns the object and we know it can be related to theOrder
- The same process happens for
country
Next, we take a look at the Orders
object.
- We add in the rest of the data for the
Order
and at the end, we are not populating thestatus_id
or thecountry_id
fields, we are simply adding thestatus
andcountry
objects that we connected to in our previous queries. - We follow our steps to “add” and then “commit” like before
for order_ in new_data.to_dict(orient='records'): status = session.query(Status).filter(Status.status == order_['STATUS']).first() country = session.query(Country).filter(Country.country == order_['COUNTRY']).first() order = Orders( order_number=order_['ORDERNUMBER'], order_line_number=order_['ORDERLINENUMBER'], quantity_ordered=order_['QUANTITYORDERED'], price_each=order_['PRICEEACH'], sales=order_['SALES'], status=status, country=country) session.add(order) session.commit()
At this point, we have now added all of our data to the database! Here’s what the orders table looks like. You can see that status_id
and country_id
have been automatically populated as integers, that’s fantastic…
We would continue on this road with new data, however, rather than listing unique items and adding them, we would be a bit more clever and access some deeper powers of SQLAlchemy.
Please understand, there are many ways to tackle these problems and this is just one way to think about it. SQLAlchemy even has Table
rather than Base
objects to create tables in a different way (however, this is far easier to interpret). There are also many ways to utilize get_or_create
type of methods that would remove the need to populate the country
and status
tables separately. You could simply add this data and if it did not exist in the other table, a new entry would be made!
Thanks for visiting r-craft.org
This article is originally published at https://www.stoltzmaniac.com
Please visit source website for post related comments.