logo
Data Warehousing
Data warehousing in practice
26 December 2023
github

Definition

Data warehousing is a way for us to perform analytical queries without disrupting the application performance. Data warehousing is also known as OLAP(online analytical processing). Generally, we are going to perform computationally expensive queries to analyze so we cannot use OLTP(online transaction processing) system to achieve our goals.


OLTPOLAP
Updates in real timeCurrent and historical data
Optimized for faster transaction/processing Optimized for complex queries with aggregations
Effectiveness measured by number of transaction Effectiveness measured by response time
Usually in 3NF formatStar schema/ snowflake schema

Design


Star Schema

Combining multiple table into one fact table without normalizing record. In this configuration, there will be a lot of duplicates therefore disk efficiency might be compromised. The query processing, however, is better than snowflake schema since there is minimal joining involved.


star

Snowflake Schema

Snowflake schema takes a different approach on how we construct our data warehouse. Instead of making duplicate records, we normalize on every dimension in the table so that it would take less space on disk and it has stronger data integrity


star

Process


A common way to construct data warehouse is through extract, transform, load. Extract is where you access data from various resources and forms. It can be from API, RDBMS, web scrapping and many more. After those data is gathered, then we transform them into the designed that we have decided and finally we load them into our data warehouse. In this article, I am going to demonstrate this process with Apache Airflow.


Apache Airflow is an open source software that is commonly used to make a data pipeline. The whole idea behind Apache Airflow is defining a data pipeline with DAG(Directed Acylic Graph) so that it has a clear pathway/process and repeatable. More details for Apache Airflow can be found in this link


Extract

In this step, we are extracting the data from the data source. The data source can be an API, flat file, external database, and any source of data. It is preferable that the data source is updatable so that our data warehouse can be updated automatically. In this project, however, I am using a CSV file just for the sake of simplicity. Extracting the data with python is quite straightforward, in this project I am going to use pandas to extract the data from csv file. I found this data set on kaggle


Transform

After extracting the data, I transform it to a format in which that I want it to be. First, I selected a few columns that I am interested in which are product_id, product_name, category, actual_price, and discounted_price. Next, I reformatted the prices column to remove the currency symbol and convert them to numeric type. Finally, I dropped all the duplicates.


@task()
def transform():
  amz = pd.read_csv(sales_source)
  sales = amz.loc[:,["product_id", "product_name", "category", "actual_price","discounted_price"]]
  sales['actual_price'] = pd.to_numeric(sales['actual_price'].str[1:].replace(',','', regex=True),errors='coerce')
  sales['discounted_price'] = pd.to_numeric(sales['discounted_price'].str[1:].replace(',','', regex=True),errors='coerce')
  sales['product_name'] = sales['product_name'].str.replace(';','')
  sales.drop_duplicates(inplace=True)

Load

The next step is loading the data to our data warehouse. In this case, I chose PostgreSQL. I defined a conneciton to the database, which in this case is a Docker container and I create the table alongside with all of its column. Loading the data is quite simple, we just need to use .copy_from function provided and our data should be populated.


@task()
  def load():
    connection = psycopg2.connect(user="postgres",
                                      password="postgres",
                                      host="db",
                                      port="5432",
                                      database="products")
    cursor = connection.cursor()
    cursor.execute(
        """
        DROP TABLE IF EXISTS sales CASCADE;
        """)
    connection.commit()
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS sales (
          product_id  varchar(255) NOT NULL,
          product_name text NOT NULL,
          category varchar(255) NOT NULL,
          actual_price numeric NOT NULL,
          discounted_price numeric NOT NULL
        )
        """)
    connection.commit()
    f = open(sales_target, 'r')
    cursor.copy_from(f, 'sales', sep=';')
    connection.commit()

Analyze

Finally, we can make our analytical queries which usually a agregate function or a group by function. Noticed that I created a materialized view instead of regular view so that the result is pre-calculated thus improve the query performance.


@task()
  def analyze():
    cursor.execute(
        """
        CREATE MATERIALIZED VIEW total_sales
        AS
        SELECT sum(actual_price) from sales;
        """)
    connection.commit()

    cursor.execute(
        """
        CREATE MATERIALIZED VIEW average_sales
        AS
        SELECT AVG(actual_price) from sales;
        """)
    connection.commit()

    cursor.execute(
        """
        CREATE MATERIALIZED VIEW top5_expensive
        AS
        SELECT product_id, product_name, actual_price FROM sales
        ORDER BY actual_price DESC
        LIMIT 5;
        """)
    connection.commit()

    cursor.execute(
          """
          CREATE MATERIALIZED VIEW total_discount
          AS
          SELECT sum(discounted_price) from sales;
          """)
    connection.commit()

    cursor.execute(
        """
        CREATE MATERIALIZED VIEW average_discount
        AS
        SELECT AVG(discounted_price) FROM sales;
        """)
    connection.commit()

Result

Up until this point, we should be able to see our DAG. If we click the DAG title, and press play button, the process will run and the database should be populated. As we can see from screenshot below, in our adminer dashboard there are a few materialized views which is the result that we wanted. Every query on those materialized view will be performant.


star

star

star