Building My First Data Pipeline: A Python Tutorial for Complete Beginners

Step-by-step guide to building your first data pipeline with Python: Learn ETL process, data extraction from APIs, transformation with pandas, loading to database, and automating your pipeline. Perfect for data engineering beginners.

πŸ“… Published: October 15, 2025 ✏️ Updated: October 28, 2025 By Ojaswi Athghara
#pipeline #tutorial #project #etl #beginner #python

Building My First Data Pipeline: A Python Tutorial for Complete Beginners

My First Pipeline (It Was Messy!)

I'd been reading about data engineering for weeksβ€”ETL this, data pipeline that. I thought I understood it.

Then I tried to build one.

Nothing worked. My code broke constantly. Data didn't look right. I had no idea if it was even working correctly.

But I kept at it. And after several attempts, failed experiments, and debugging sessions, I built my first functional data pipeline!

In this tutorial, I'm walking you through building a simple but real data pipeline from scratch. We'll extract data from an API, clean and transform it, and load it into a databaseβ€”the classic ETL process.

No fancy tools. Just Python. Perfect for beginners.

What We're Building

Project: Weather Data Pipeline

We'll build a pipeline that:

  1. Extracts weather data from a free API
  2. Transforms it (cleans, calculates averages)
  3. Loads it into a SQLite database
  4. Runs automatically on a schedule

Skills you'll learn:

  • Working with APIs
  • Data transformation with pandas
  • Database operations
  • Error handling
  • Basic automation

Prerequisites

You'll need:

  • Basic Python knowledge (variables, functions, loops)
  • Python 3.7+ installed
  • A code editor (VS Code, PyCharm, or similar)
  • Internet connection (for the API)

Time needed: 1-2 hours

Let's get started!

Step 1: Project Setup

First, let's set up our project structure.

Create Project Folder

# Create project directory
mkdir weather-pipeline
cd weather-pipeline

# Create subdirectories
mkdir data logs

# Create Python files
touch pipeline.py
touch config.py
touch database.py
touch utils.py

Project structure:

weather-pipeline/
β”œβ”€β”€ data/           # For temporary data files
β”œβ”€β”€ logs/           # For log files
β”œβ”€β”€ pipeline.py     # Main pipeline code
β”œβ”€β”€ config.py       # Configuration settings
β”œβ”€β”€ database.py     # Database operations
β”œβ”€β”€ utils.py        # Helper functions
└── requirements.txt

Install Required Libraries

Create requirements.txt:

requests==2.31.0
pandas==2.1.0
python-dotenv==1.0.0

Install them:

pip install -r requirements.txt

Step 2: Configuration Setup

Let's set up configuration in config.py:

import os
from datetime import datetime

# API Configuration
# Using OpenWeatherMap (free tier)
API_KEY = os.getenv('WEATHER_API_KEY', 'your_api_key_here')
API_BASE_URL = 'https://api.openweathermap.org/data/2.5/weather'

# Cities to track
CITIES = ['London', 'New York', 'Tokyo', 'Sydney', 'Mumbai']

# Database configuration
DATABASE_PATH = 'data/weather.db'

# Logging configuration
LOG_PATH = f'logs/pipeline_{datetime.now().strftime("%Y%m%d")}.log'

# Data quality thresholds
MIN_TEMP = -50  # Celsius
MAX_TEMP = 60   # Celsius

Getting an API key:

  1. Go to OpenWeatherMap
  2. Sign up for free
  3. Get your API key
  4. Replace your_api_key_here with it

Step 3: Setting Up the Database

Create database.py for database operations:

import sqlite3
from datetime import datetime
import config

def create_connection():
    """Create a database connection"""
    try:
        conn = sqlite3.connect(config.DATABASE_PATH)
        return conn
    except sqlite3.Error as e:
        print(f"Error connecting to database: {e}")
        return None

def create_tables():
    """Create necessary tables if they don't exist"""
    conn = create_connection()
    if conn is None:
        return
    
    cursor = conn.cursor()
    
    # Create weather data table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS weather_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            city TEXT NOT NULL,
            country TEXT,
            temperature REAL,
            feels_like REAL,
            temp_min REAL,
            temp_max REAL,
            pressure INTEGER,
            humidity INTEGER,
            weather_main TEXT,
            weather_description TEXT,
            wind_speed REAL,
            clouds INTEGER,
            timestamp INTEGER,
            recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    # Create index on city and timestamp for faster queries
    cursor.execute('''
        CREATE INDEX IF NOT EXISTS idx_city_timestamp 
        ON weather_data(city, timestamp)
    ''')
    
    conn.commit()
    conn.close()
    print("βœ“ Database tables created successfully")

def insert_weather_data(data):
    """Insert weather data into database"""
    conn = create_connection()
    if conn is None:
        return False
    
    cursor = conn.cursor()
    
    try:
        cursor.execute('''
            INSERT INTO weather_data (
                city, country, temperature, feels_like, 
                temp_min, temp_max, pressure, humidity,
                weather_main, weather_description,
                wind_speed, clouds, timestamp
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            data['city'],
            data['country'],
            data['temperature'],
            data['feels_like'],
            data['temp_min'],
            data['temp_max'],
            data['pressure'],
            data['humidity'],
            data['weather_main'],
            data['weather_description'],
            data['wind_speed'],
            data['clouds'],
            data['timestamp']
        ))
        
        conn.commit()
        conn.close()
        return True
        
    except sqlite3.Error as e:
        print(f"Error inserting data: {e}")
        conn.close()
        return False

def get_latest_weather(city, hours=24):
    """Get latest weather data for a city"""
    conn = create_connection()
    if conn is None:
        return None
    
    cursor = conn.cursor()
    
    # Get data from last N hours
    query = '''
        SELECT * FROM weather_data
        WHERE city = ?
        AND datetime(recorded_at) >= datetime('now', ? || ' hours')
        ORDER BY recorded_at DESC
    '''
    
    cursor.execute(query, (city, -hours))
    results = cursor.fetchall()
    
    conn.close()
    return results

Step 4: Building the Extract Function

Create the extraction logic in pipeline.py:

import requests
import config
from datetime import datetime

def extract_weather_data(city):
    """
    Extract weather data from API for a given city
    """
    print(f"Extracting data for {city}...")
    
    try:
        # Build API request
        params = {
            'q': city,
            'appid': config.API_KEY,
            'units': 'metric'  # Get temperature in Celsius
        }
        
        # Make API call
        response = requests.get(
            config.API_BASE_URL,
            params=params,
            timeout=10
        )
        
        # Check if request was successful
        if response.status_code == 200:
            data = response.json()
            print(f"βœ“ Successfully extracted data for {city}")
            return data
        else:
            print(f"βœ— API error for {city}: {response.status_code}")
            return None
            
    except requests.exceptions.Timeout:
        print(f"βœ— Timeout error for {city}")
        return None
    except requests.exceptions.RequestException as e:
        print(f"βœ— Request error for {city}: {e}")
        return None
    except Exception as e:
        print(f"βœ— Unexpected error for {city}: {e}")
        return None

def extract_all_cities():
    """Extract weather data for all configured cities"""
    all_data = []
    
    for city in config.CITIES:
        data = extract_weather_data(city)
        if data:
            all_data.append(data)
    
    print(f"\nExtracted data for {len(all_data)}/{len(config.CITIES)} cities")
    return all_data

Step 5: Building the Transform Function

Add transformation logic to pipeline.py:

import pandas as pd

def transform_weather_data(raw_data):
    """
    Transform raw API data into clean, structured format
    """
    print("\nTransforming data...")
    
    transformed_records = []
    
    for record in raw_data:
        try:
            # Extract relevant fields
            transformed = {
                'city': record['name'],
                'country': record['sys']['country'],
                'temperature': record['main']['temp'],
                'feels_like': record['main']['feels_like'],
                'temp_min': record['main']['temp_min'],
                'temp_max': record['main']['temp_max'],
                'pressure': record['main']['pressure'],
                'humidity': record['main']['humidity'],
                'weather_main': record['weather'][0]['main'],
                'weather_description': record['weather'][0]['description'],
                'wind_speed': record['wind']['speed'],
                'clouds': record['clouds']['all'],
                'timestamp': record['dt']
            }
            
            transformed_records.append(transformed)
            
        except KeyError as e:
            print(f"βœ— Missing field in data: {e}")
            continue
        except Exception as e:
            print(f"βœ— Error transforming record: {e}")
            continue
    
    # Create DataFrame for easier manipulation
    df = pd.DataFrame(transformed_records)
    
    # Data quality checks
    print("\nRunning data quality checks...")
    
    # Check for missing values
    missing = df.isnull().sum()
    if missing.any():
        print(f"⚠ Warning: Found missing values:\n{missing[missing > 0]}")
    
    # Validate temperature ranges
    invalid_temps = df[
        (df['temperature'] < config.MIN_TEMP) | 
        (df['temperature'] > config.MAX_TEMP)
    ]
    if len(invalid_temps) > 0:
        print(f"⚠ Warning: Found {len(invalid_temps)} invalid temperatures")
        df = df[
            (df['temperature'] >= config.MIN_TEMP) & 
            (df['temperature'] <= config.MAX_TEMP)
        ]
    
    # Remove duplicates
    original_count = len(df)
    df = df.drop_duplicates(subset=['city', 'timestamp'])
    if len(df) < original_count:
        print(f"⚠ Removed {original_count - len(df)} duplicate records")
    
    # Add calculated fields
    df['temp_range'] = df['temp_max'] - df['temp_min']
    df['is_hot'] = df['temperature'] > 30
    df['is_cold'] = df['temperature'] < 10
    
    print(f"βœ“ Transformed {len(df)} records successfully")
    
    return df.to_dict('records')

def generate_summary_statistics(df):
    """Generate summary statistics"""
    summary = {
        'total_cities': df['city'].nunique(),
        'avg_temperature': df['temperature'].mean(),
        'min_temperature': df['temperature'].min(),
        'max_temperature': df['temperature'].max(),
        'avg_humidity': df['humidity'].mean(),
        'hottest_city': df.loc[df['temperature'].idxmax(), 'city'],
        'coldest_city': df.loc[df['temperature'].idxmin(), 'city']
    }
    
    return summary

Step 6: Building the Load Function

Add loading logic to pipeline.py:

import database as db

def load_weather_data(transformed_data):
    """
    Load transformed data into database
    """
    print("\nLoading data to database...")
    
    success_count = 0
    fail_count = 0
    
    for record in transformed_data:
        if db.insert_weather_data(record):
            success_count += 1
        else:
            fail_count += 1
    
    print(f"βœ“ Loaded {success_count} records successfully")
    if fail_count > 0:
        print(f"βœ— Failed to load {fail_count} records")
    
    return success_count, fail_count

Step 7: Building the Complete Pipeline

Now let's put it all together in pipeline.py:

from datetime import datetime
import pandas as pd

def run_pipeline():
    """
    Main pipeline orchestration function
    """
    print("="*50)
    print("WEATHER DATA PIPELINE")
    print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("="*50)
    
    try:
        # Step 1: Initialize database
        print("\n[1/4] Initializing database...")
        db.create_tables()
        
        # Step 2: Extract data
        print("\n[2/4] Extracting data from API...")
        raw_data = extract_all_cities()
        
        if not raw_data:
            print("βœ— No data extracted. Exiting pipeline.")
            return
        
        # Step 3: Transform data
        print("\n[3/4] Transforming data...")
        transformed_data = transform_weather_data(raw_data)
        
        if not transformed_data:
            print("βœ— No data to load. Exiting pipeline.")
            return
        
        # Step 4: Load data
        print("\n[4/4] Loading data to database...")
        success_count, fail_count = load_weather_data(transformed_data)
        
        # Generate summary
        print("\n" + "="*50)
        print("PIPELINE SUMMARY")
        print("="*50)
        
        df = pd.DataFrame(transformed_data)
        summary = generate_summary_statistics(df)
        
        print(f"Total cities processed: {summary['total_cities']}")
        print(f"Average temperature: {summary['avg_temperature']:.2f}Β°C")
        print(f"Temperature range: {summary['min_temperature']:.2f}Β°C to {summary['max_temperature']:.2f}Β°C")
        print(f"Average humidity: {summary['avg_humidity']:.2f}%")
        print(f"Hottest city: {summary['hottest_city']}")
        print(f"Coldest city: {summary['coldest_city']}")
        print(f"\nRecords loaded: {success_count}")
        
        print("\nβœ“ Pipeline completed successfully!")
        print(f"Finished at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        
    except Exception as e:
        print(f"\nβœ— Pipeline failed with error: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    run_pipeline()

Step 8: Adding Logging

Create utils.py for logging functionality:

import logging
import config
from datetime import datetime

def setup_logging():
    """Setup logging configuration"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(config.LOG_PATH),
            logging.StreamHandler()
        ]
    )
    
    return logging.getLogger(__name__)

# Usage in pipeline.py
def run_pipeline_with_logging():
    """Pipeline with logging"""
    logger = setup_logging()
    logger.info("Pipeline started")
    
    try:
        # Your pipeline code
        run_pipeline()
        logger.info("Pipeline completed successfully")
    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        raise

Step 9: Testing the Pipeline

Let's run our pipeline!

python pipeline.py

Expected output:

==================================================
WEATHER DATA PIPELINE
Started at: 2025-10-28 10:30:00
==================================================

[1/4] Initializing database...
βœ“ Database tables created successfully

[2/4] Extracting data from API...
Extracting data for London...
βœ“ Successfully extracted data for London
Extracting data for New York...
βœ“ Successfully extracted data for New York
...

Extracted data for 5/5 cities

[3/4] Transforming data...
Running data quality checks...
βœ“ Transformed 5 records successfully

[4/4] Loading data to database...
βœ“ Loaded 5 records successfully

==================================================
PIPELINE SUMMARY
==================================================
Total cities processed: 5
Average temperature: 18.45Β°C
Temperature range: 10.20Β°C to 28.30Β°C
Average humidity: 65.40%
Hottest city: Mumbai
Coldest city: London

Records loaded: 5

βœ“ Pipeline completed successfully!
Finished at: 2025-10-28 10:30:15

Step 10: Querying the Data

Create query_data.py to analyze the data:

import sqlite3
import pandas as pd
import config

def query_latest_weather():
    """Query latest weather for all cities"""
    conn = sqlite3.connect(config.DATABASE_PATH)
    
    query = '''
        SELECT 
            city,
            temperature,
            humidity,
            weather_description,
            recorded_at
        FROM weather_data
        WHERE id IN (
            SELECT MAX(id)
            FROM weather_data
            GROUP BY city
        )
        ORDER BY temperature DESC
    '''
    
    df = pd.read_sql_query(query, conn)
    conn.close()
    
    print("\nLatest Weather Data:")
    print(df.to_string(index=False))

def query_temperature_trends(city, days=7):
    """Query temperature trends for a city"""
    conn = sqlite3.connect(config.DATABASE_PATH)
    
    query = '''
        SELECT 
            datetime(recorded_at) as date,
            AVG(temperature) as avg_temp,
            MIN(temperature) as min_temp,
            MAX(temperature) as max_temp
        FROM weather_data
        WHERE city = ?
        AND datetime(recorded_at) >= datetime('now', ? || ' days')
        GROUP BY date(recorded_at)
        ORDER BY date DESC
    '''
    
    df = pd.read_sql_query(query, conn, params=(city, -days))
    conn.close()
    
    print(f"\nTemperature Trends for {city}:")
    print(df.to_string(index=False))

if __name__ == "__main__":
    query_latest_weather()
    query_temperature_trends('London')

Step 11: Automating the Pipeline

Option 1: Using Cron (Linux/Mac)

# Open crontab editor
crontab -e

# Add this line to run every hour
0 * * * * cd /path/to/weather-pipeline && python pipeline.py

# Or run every 6 hours
0 */6 * * * cd /path/to/weather-pipeline && python pipeline.py

Option 2: Using Windows Task Scheduler

# Create a batch file: run_pipeline.bat
cd C:\path\to\weather-pipeline
python pipeline.py
pause

# Then schedule it using Task Scheduler

Option 3: Simple Python Scheduler

import schedule
import time
from pipeline import run_pipeline

def job():
    """Job to run"""
    print("\n--- Scheduled pipeline run ---")
    run_pipeline()

# Schedule the job
schedule.every().hour.do(job)  # Run every hour
# schedule.every().day.at("09:00").do(job)  # Run daily at 9 AM
# schedule.every(6).hours.do(job)  # Run every 6 hours

print("Scheduler started. Press Ctrl+C to stop.")

while True:
    schedule.run_pending()
    time.sleep(60)  # Check every minute

Common Issues and Solutions

Issue 1: API Rate Limits

Problem: Free tier has limited API calls.

Solution: Add delay between requests:

import time

def extract_all_cities():
    all_data = []
    for city in config.CITIES:
        data = extract_weather_data(city)
        if data:
            all_data.append(data)
        time.sleep(1)  # Wait 1 second between calls
    return all_data

Issue 2: Database Locked

Problem: SQLite database locked error.

Solution: Use proper connection management:

# Always close connections
try:
    conn = create_connection()
    # Do work
finally:
    conn.close()

Issue 3: Missing Data

Problem: Some API responses missing fields.

Solution: Add defensive checks:

def safe_get(dictionary, keys, default=None):
    """Safely get nested dictionary values"""
    for key in keys:
        if dictionary is None:
            return default
        dictionary = dictionary.get(key, default)
    return dictionary

# Usage
temperature = safe_get(record, ['main', 'temp'], 0)

Enhancing Your Pipeline

Once the basic pipeline works, try these enhancements:

1. Add Email Notifications

import smtplib
from email.message import EmailMessage

def send_alert(subject, message):
    """Send email alert"""
    msg = EmailMessage()
    msg['Subject'] = subject
    msg['From'] = 'your-email@example.com'
    msg['To'] = 'recipient@example.com'
    msg.set_content(message)
    
    with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
        smtp.starttls()
        smtp.login('your-email@example.com', 'your-password')
        smtp.send_message(msg)

2. Add Data Visualization

import matplotlib.pyplot as plt

def visualize_temperatures(df):
    """Create temperature visualization"""
    plt.figure(figsize=(10, 6))
    plt.bar(df['city'], df['temperature'])
    plt.xlabel('City')
    plt.ylabel('Temperature (Β°C)')
    plt.title('Current Temperatures')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('data/temperature_chart.png')
    print("βœ“ Chart saved to data/temperature_chart.png")

3. Add More Data Sources

Try combining multiple APIs:

  • Weather data (OpenWeatherMap)
  • Air quality data
  • News headlines
  • Social media trends

What I Learned Building This

Technical lessons:

  • Error handling is crucial
  • Data quality checks save headaches
  • Logging helps debugging
  • Start simple, then enhance

General lessons:

  • Building beats reading
  • Break big problems into small steps
  • Every pipeline has quirks
  • Documentation helps future you

Next Steps

Now that you've built your first pipeline, try:

  1. Add more features:
    • Historical data analysis
    • Alerting for extreme weather
    • Multiple data sources
    • Data visualization dashboard
  2. Improve reliability:
    • Better error handling
    • Retry logic
    • Monitoring and alerts
    • Unit tests
  3. Scale up:
    • Process more cities
    • Store more data
    • Add real-time processing
    • Move to cloud (AWS, GCP)
  4. Learn new tools:
    • Apache Airflow for orchestration
    • PostgreSQL instead of SQLite
    • Docker for containerization
    • dbt for transformations

Conclusion: You Built a Real Pipeline!

Congratulations! You just built a complete data pipeline from scratch.

It might seem simple, but you've learned the core concepts:

  • βœ… Extracting data from APIs
  • βœ… Transforming and cleaning data
  • βœ… Loading data to databases
  • βœ… Data quality checks
  • βœ… Error handling
  • βœ… Automation

These are the same concepts used in production pipelines at major companies. The tools might be fancier, but the principles are identical.

My advice: Don't stop here. Build more pipelines. Try different data sources. Experiment with new tools. Each pipeline teaches you something new.

The best way to learn data engineering is to build. So keep building!

Happy coding, and welcome to the world of data pipelines!


Built your first pipeline? Ran into issues or have questions? Connect with me on Twitter or LinkedIn. I'd love to hear about your experience!

Support My Work

If this tutorial helped you build your first data pipeline or understand ETL concepts better, I'd really appreciate your support! Creating detailed, hands-on tutorials with working code takes significant time and testing. Your support helps me continue creating practical, beginner-friendly content for aspiring data engineers.

β˜• Buy me a coffee - Every contribution, big or small, means the world to me and keeps me motivated to create more content!


Cover image by 倜 ε’”η½— on Unsplash

Related Blogs

Ojaswi Athghara

SDE, 4+ Years

Β© ojaswiat.com 2025-2027