From Raw Data to Actionable Insights: Building an End-to-End Data Pipeline with Python and Apache Airflow

Data ArchitectureData Engineering

apurva prajapati

December 19, 2023
Cabecera
In todays blog, we look at how Python and Apache Airflow can be two of our greatest allies when we are ensuring data pipeline quality.

In modern times, data chaos is an all too familiar concept for many organizations. This chaos presents both an opportunity and a challenge: the opportunity to gain valuable insights that add business value and the challenge to extract those insights from the overwhelming volume of information. But how can we effectively navigate this data deluge and unlock its hidden potential?

Well, one of the most important aspects is ensuring that we have healthy data pipelines to begin with. In this blog we will look at how we can do this, with the help of Python and Apache Airflow.

Python & Apache Airflow: Mastering Data Pipeline Orchestration

Enter Python and Apache Airflow—a powerful duo that opens up a world of possibilities for building robust and scalable data pipelines. Maybe you have heard of them before, maybe not, but they are two of the most useful tools we have at our disposal to deal with common challenges for building, maintenance and scalability of the data pipelines.

Python, with its simplicity, versatility, and a rich ecosystem of libraries, has become a go-to language for data processing tasks. Its readability and ease of use make it an excellent choice for designing modular and maintainable data pipelines. Its extensive libraries, like pandas and NumPy, simplify working with large datasets.

While Python provides the building blocks, Apache Airflow serves as the orchestrator—the conductor guiding the entire symphony of data processing tasks. Airflow allows the design, schedule, and monitor workflows through Directed Acyclic Graphs (DAGs), providing a visual representation of a data pipeline. All of this helps us to define complex workflows and monitor their execution, ensuring that the data is always flowing smoothly.

Getting Started with Data Pipelines using Python & Apache Airflow

Building an end-to-end data pipeline with Python and Apache Airflow involves several steps. Let's break down the process in detail so you can start applying yourself!

Step 1: Install Required Tools and Libraries

Ensure you have the necessary tools and libraries installed on your machine:

  • Python: Install Python on your machine. You can download it from the official Python website.
  • Apache Airflow: Install Apache Airflow using “pip”:

pip install apache-airflow
  • Additional Libraries: Depending on your data processing requirements, install any additional Python libraries you might need. For example, if you're working with data manipulation, you might want to install pandas:
pip install pandas

Step 2: Initialize Airflow Environment

After installing Airflow, initialize the Airflow environment. Open a terminal and navigate to your preferred directory. Run the following commands:

airflow db init
airflow webserver
airflow scheduler

This initializes the Airflow metadata database and starts the webserver and scheduler.

Step 3: Define Your Data Pipeline Workflow

Once the Airflow environment is initialized, you will find an “airflow.cfg” file that contains the “dags_folder” parameter. The directory specified with this parameter is known as the “Airflow DAGs directory”. This directory would contain the python files that you will create for your data pipeline.

Now, create a Python script in your Airflow DAGs directory to define your data pipeline workflow. A DAG (Directed Acyclic Graph) represents the workflow, consisting of tasks and their dependencies.

Here's a simple example using the Openweather API for some inspiration:

# File: weather_data_pipeline.py
# The data pipeline illustrated below makes an API call to OpenWeather API,
# extracts weather data for Barcelona city, transforms the received weather
# data from kelvin unit to celsius unit and loads the transformed data into a
# local SQLite database.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
import sqlite3
default_args = {
    'owner': 'uptimal_lab',
    'start_date': datetime(2023, 12, 12),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'weather_data_pipeline',
    default_args=default_args,
    description='End-to-End Weather Data Pipeline',
    schedule_interval=timedelta(days=1),
)
def extract_data(**kwargs):
    api_key = 'YOUR_OPEN-WEATHER-MAP_API_KEY'
    city = 'Barcelona'  # Replace with your desired city
    url = f'http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}'
    response = requests.get(url)
    weather_data = response.json()
    return weather_data
def transform_data(**kwargs):
    weather_data = kwargs['task_instance'].xcom_pull(task_ids='extract_task')
    temperature_kelvin = weather_data['main']['temp']
    temperature_celsius = temperature_kelvin - 273.15  # Conversion to Celsius
    transformed_data = {
        'city': weather_data['name'],
        'temperature_celsius': round(temperature_celsius, 2)
    }
    return transformed_data
def load_data(**kwargs):
    transformed_data = kwargs['task_instance'].xcom_pull(task_ids='transform_task')
    
    conn = sqlite3.connect('/path/to/your/database.db')  # Replace with your SQLite database path
    cursor = conn.cursor()
    create_table_query = """
    CREATE TABLE IF NOT EXISTS weather (
        city TEXT,
        temperature_celsius REAL
    )
    """
    cursor.execute(create_table_query)
    insert_query = """
    INSERT INTO weather (city, temperature_celsius)
    VALUES (?, ?)
    """
    cursor.execute(insert_query, (transformed_data['city'], transformed_data['temperature_celsius']))
    conn.commit()
    conn.close()
extract_task = PythonOperator(
    task_id='extract_task',
    python_callable=extract_data,
    provide_context=True,
    dag=dag,
)
transform_task = PythonOperator(
    task_id='transform_task',
    python_callable=transform_data,
    provide_context=True,
    dag=dag,
)
load_task = PythonOperator(
    task_id='load_task',
    python_callable=load_data,
    provide_context=True,
    dag=dag,
)
# Define task dependencies
extract_task >> transform_task >> load_task

Step 4: Implement Data Processing Logic

In each task, we need to implement the specific data processing logic using Python. Let's took a look at the tasks and the steps required for each including extraction, transformation, and data loading.

4.1 Data Extraction

Data Extraction (extract_data) : In this phase, retrieve data from a specified source. Depending on the nature of your project, the source could be an API, a database, or a file. Here's how you can implement this step:

I - API Data Extraction:

  • - Use a Python library like requests to send HTTP requests to the API endpoint.
  • - Handle authentication, if required, by including API keys or tokens in your requests.
  • - Parse the API response using tools like json or xml modules.

II - Database Data Extraction:

  • - Connect to the database using a library such as sqlite3, psycopg2 (for PostgreSQL), or pymysql (for MySQL).
  • - Execute SQL queries to retrieve the necessary data.
  • - Fetch and store the results in a suitable data structure.

III - File Data Extraction:

  • - Open and read files using built-in Python functions like open().
  • - Parse the file content based on its format (e.g., JSON, CSV, XML) using relevant libraries.

4.2 Data Transformation

Data Transformation (transform_data) : Once you have the raw data, you might need to clean, reformat, or manipulate it to suit your analysis or application requirements. Here's how you can implement data transformation:

I - Cleaning and Validation:

  • - Identify and handle missing or inconsistent data.
  • - Validate data against predefined criteria.

II - Transformation Operations:

  • - Use libraries like pandas for efficient data manipulation.
  • - Apply transformations such as filtering, sorting, or aggregating data.
  • - Convert data types and handle any necessary encoding or decoding.

4.3 Data Loading

Data Loading (load_data) : In this final phase, you'll store the processed data in a destination of your choice. This could be another database, a file, or any other storage system. Here's how you can implement data loading:

I - Database Data Loading:

  • - Connect to the destination database.
  • - Use SQL queries or ORM frameworks (e.g., SQLAlchemy) to insert the transformed data.

II - File Data Loading:

  • - Open a new file or overwrite an existing one based on your requirements.
  • - Write the transformed data to the file using appropriate file-writing methods.

Other Data Loading Scenarios: For other scenarios, like pushing data to a messaging queue or a cloud storage service, use relevant APIs or libraries.

Step 5: Pipeline Orchestration: Task Scheduling with Apache Airflow

Apache Airflow's DAGs allow the scheduling and execution of data pipeline tasks in a controlled and automated manner and we will configure and schedule tasks within Airflow, enabling us to visualise how it triggers data processing, transformation and storage. The diagram presented below explains the working process of Apache Airflow.

Step 6: Data Analysis and Insights Generation

With cleaned and processed data, delve into exploratory data analysis and apply statistical and machine learning techniques using Python. This phase extracts meaningful insights, visualizations, and predictive models from the data, enabling informed decision-making.

Step 7: Monitor and Manage Your Data Pipeline

Run the Apache Airflow by entering below command in your command line tool or terminal.

airflow webserver --port 8080

This command starts the Airflow web server, typically on port 8080. Once the Apache Webserver is running, you can perform below actions to measure access Apache Airflow UI.

  1. - Open http://localhost:8080 URL in your browser

  2. - On the Airflow UI, navigate to Admin > DAGs

  3. - Find your DAG (weather_data_pipeline), a data pipeline we created earlier and toggle the switch to enable it

In this UI, you can view the progress of tasks, check logs for task execution details and manually trigger or pause the pipelines.

Summary: From Data Deluge to Strategic Decisions

So, hopefully, this brief guide will help you get started and serve as a source of inspiration for creating pipelines that will assist in growing your business as well as in growing your professional career.

Maybe it’s time to give it a try!

Posts that might be of your interest