Data preprocessing is a critical step in the machine learning pipeline. Automating this process ensures consistency, efficiency, and scalability. Apache Airflow is a powerful orchestration tool that allows you to schedule and monitor data preprocessing tasks programmatically.
This guide walks through setting up Apache Airflow on an Ubuntu cloud GPU server and automating data preprocessing using Airflow DAGs.
Prerequisites
Before proceeding, ensure you have the following:
- An Atlantic.Net Cloud GPU server running Ubuntu 24.04.
- CUDA Toolkit and cuDNN Installed.
- A root or sudo privileges.
Step 1: Install Python and Required Libraries
1. Ensure your system has Python and necessary dependencies installed.
apt install python3 python3-pip python3-venv
2. Create and activate a virtual environment:
python3 -m venv dag-env
source dag-env/bin/activate
3. Install required Python libraries:
pip install numpy pandas scikit-learn tensorflow torch graphviz
Step 2: Install Apache Airflow
Apache Airflow is a powerful platform to programmatically author, schedule, and monitor workflows.
1. Install Airflow.
pip install apache-airflow
2. Initialize the Airflow database.
airflow db init
3. Create a user for the Airflow web interface.
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email [email protected]
4. Start the Airflow web server and scheduler.
airflow webserver --port 8080 &
airflow scheduler &
5. Access the Airflow UI at http://your-server-ip:8080.
6. Log in with the username and password you created.
Step 3: Create the DAG Directory
Apache Airflow looks for DAGs in a specific directory. By default, this directory is ~/airflow/dags. If it doesn’t exist, create it:
mkdir -p ~/airflow/dags
This is where you’ll place your Python scripts defining the workflows (DAGs).
Step 4: Create Sample Data Files
Before defining the DAG, let’s create a sample dataset (raw_data.csv) to use in the preprocessing pipeline.
Create a directory for your data:
mkdir -p ~/airflow/data
Create a Python script to generate sample data:
nano ~/airflow/data/generate_sample_data.py
Add the following code to generate a sample CSV file:
import pandas as pd
import numpy as np
# Create a sample dataset
data = {
'feature1': np.random.rand(100),
'feature2': np.random.rand(100),
'target': np.random.randint(0, 2, 100)
}
# Introduce some missing values and duplicates
data['feature1'][10:15] = np.nan
data['feature2'][20:25] = np.nan
df = pd.DataFrame(data)
df = pd.concat([df, df.iloc[:5]]) # Add duplicates
# Save to CSV
df.to_csv('~/airflow/data/raw_data.csv', index=False)
print("Sample data saved to ~/airflow/data/raw_data.csv")
Run the script to generate the sample data:
python3 ~/airflow/data/generate_sample_data.py
Step 5: Define Your Data Preprocessing Pipeline
Create a Python script (data_preprocessing_dag.py) in the ~/airflow/dags directory to define your data preprocessing workflow.
nano ~/airflow/dags/data_preprocessing_dag.py
Add the following code to define the data preprocessing pipeline:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pandas as pd
from sklearn.preprocessing import StandardScaler
# Define file paths
RAW_DATA_PATH = '/root/airflow/data/raw_data.csv'
CLEANED_DATA_PATH = '/root/airflow/data/cleaned_data.pkl'
TRANSFORMED_DATA_PATH = '/root/airflow/data/transformed_data.pkl'
PROCESSED_DATA_PATH = '/root/airflow/data/processed_data.csv'
# Define Python functions for each preprocessing step
def load_data():
data = pd.read_csv(RAW_DATA_PATH)
print("Data loaded successfully!")
data.to_pickle(CLEANED_DATA_PATH) # Save as pickle for the next step
def clean_data():
data = pd.read_pickle(CLEANED_DATA_PATH)
data.dropna(inplace=True) # Remove missing values
data.drop_duplicates(inplace=True) # Remove duplicates
print("Data cleaned successfully!")
data.to_pickle(TRANSFORMED_DATA_PATH) # Save as pickle for the next step
def transform_data():
data = pd.read_pickle(TRANSFORMED_DATA_PATH)
scaler = StandardScaler()
data[["feature1", "feature2"]] = scaler.fit_transform(data[["feature1", "feature2"]])
print("Data transformed successfully!")
data.to_pickle(PROCESSED_DATA_PATH) # Save as pickle for the next step
def save_data():
data = pd.read_pickle(PROCESSED_DATA_PATH)
data.to_csv(PROCESSED_DATA_PATH, index=False)
print("Processed data saved successfully!")
# Define the DAG
default_args = {
'owner': 'admin',
'start_date': datetime(2023, 10, 1),
'retries': 1,
}
dag = DAG(
'data_preprocessing_pipeline',
default_args=default_args,
description='Automated Data Preprocessing Pipeline',
schedule_interval='@daily', # Run daily
)
# Define tasks
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag,
)
clean_task = PythonOperator(
task_id='clean_data',
python_callable=clean_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)
save_task = PythonOperator(
task_id='save_data',
python_callable=save_data,
dag=dag,
)
# Set task dependencies
load_task >> clean_task >> transform_task >> save_task
Step 6: Run the Pipeline
1. Trigger the DAG.
airflow dags trigger data_preprocessing_pipeline
2. Check the status of the DAG run.
airflow dags list-runs --dag-id data_preprocessing_pipeline
Output.
dag_id | run_id | state | execution_date | start_date | end_date
============================+======================================+=========+===========================+==================================+=========
data_preprocessing_pipeline | manual__2025-03-09T12:08:01+00:00 | running | 2025-03-09T12:08:01+00:00 | 2025-03-09T12:11:25.601992+00:00 |
data_preprocessing_pipeline | scheduled__2023-10-15T00:00:00+00:00 | running | 2023-10-15T00:00:00+00:00 | 2025-03-09T12:12:11.941213+00:00 |
3. Refresh the Airflow UI to see your new DAG (data_preprocessing_pipeline).
4. Double click on the data_preprocessing_pipeline.
Step 7: Verify the Output
After the DAG runs successfully, check the output files in the ~/airflow/data directory:
- Cleaned Data: cleaned_data.pkl
- Transformed Data: transformed_data.pkl
- Processed Data: processed_data.csv
Conclusion
You have successfully set up an automated data preprocessing pipeline using Apache Airflow on an Ubuntu cloud GPU server. This pipeline loads, cleans, transforms, and saves data for further machine learning tasks, ensuring a streamlined and reproducible preprocessing workflow.