Building My First Data Pipeline: A Python Tutorial for Complete Beginners
Step-by-step guide to building your first data pipeline with Python: Learn ETL process, data extraction from APIs, transformation with pandas, loading to database, and automating your pipeline. Perfect for data engineering beginners.

My First Pipeline (It Was Messy!)
I'd been reading about data engineering for weeksβETL this, data pipeline that. I thought I understood it.
Then I tried to build one.
Nothing worked. My code broke constantly. Data didn't look right. I had no idea if it was even working correctly.
But I kept at it. And after several attempts, failed experiments, and debugging sessions, I built my first functional data pipeline!
In this tutorial, I'm walking you through building a simple but real data pipeline from scratch. We'll extract data from an API, clean and transform it, and load it into a databaseβthe classic ETL process.
No fancy tools. Just Python. Perfect for beginners.
What We're Building
Project: Weather Data Pipeline
We'll build a pipeline that:
- Extracts weather data from a free API
- Transforms it (cleans, calculates averages)
- Loads it into a SQLite database
- Runs automatically on a schedule
Skills you'll learn:
- Working with APIs
- Data transformation with pandas
- Database operations
- Error handling
- Basic automation
Prerequisites
You'll need:
- Basic Python knowledge (variables, functions, loops)
- Python 3.7+ installed
- A code editor (VS Code, PyCharm, or similar)
- Internet connection (for the API)
Time needed: 1-2 hours
Let's get started!
Step 1: Project Setup
First, let's set up our project structure.
Create Project Folder
# Create project directory
mkdir weather-pipeline
cd weather-pipeline
# Create subdirectories
mkdir data logs
# Create Python files
touch pipeline.py
touch config.py
touch database.py
touch utils.py
Project structure:
weather-pipeline/
βββ data/ # For temporary data files
βββ logs/ # For log files
βββ pipeline.py # Main pipeline code
βββ config.py # Configuration settings
βββ database.py # Database operations
βββ utils.py # Helper functions
βββ requirements.txt
Install Required Libraries
Create requirements.txt:
requests==2.31.0
pandas==2.1.0
python-dotenv==1.0.0
Install them:
pip install -r requirements.txt
Step 2: Configuration Setup
Let's set up configuration in config.py:
import os
from datetime import datetime
# API Configuration
# Using OpenWeatherMap (free tier)
API_KEY = os.getenv('WEATHER_API_KEY', 'your_api_key_here')
API_BASE_URL = 'https://api.openweathermap.org/data/2.5/weather'
# Cities to track
CITIES = ['London', 'New York', 'Tokyo', 'Sydney', 'Mumbai']
# Database configuration
DATABASE_PATH = 'data/weather.db'
# Logging configuration
LOG_PATH = f'logs/pipeline_{datetime.now().strftime("%Y%m%d")}.log'
# Data quality thresholds
MIN_TEMP = -50 # Celsius
MAX_TEMP = 60 # Celsius
Getting an API key:
- Go to OpenWeatherMap
- Sign up for free
- Get your API key
- Replace
your_api_key_herewith it
Step 3: Setting Up the Database
Create database.py for database operations:
import sqlite3
from datetime import datetime
import config
def create_connection():
"""Create a database connection"""
try:
conn = sqlite3.connect(config.DATABASE_PATH)
return conn
except sqlite3.Error as e:
print(f"Error connecting to database: {e}")
return None
def create_tables():
"""Create necessary tables if they don't exist"""
conn = create_connection()
if conn is None:
return
cursor = conn.cursor()
# Create weather data table
cursor.execute('''
CREATE TABLE IF NOT EXISTS weather_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
city TEXT NOT NULL,
country TEXT,
temperature REAL,
feels_like REAL,
temp_min REAL,
temp_max REAL,
pressure INTEGER,
humidity INTEGER,
weather_main TEXT,
weather_description TEXT,
wind_speed REAL,
clouds INTEGER,
timestamp INTEGER,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create index on city and timestamp for faster queries
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_city_timestamp
ON weather_data(city, timestamp)
''')
conn.commit()
conn.close()
print("β Database tables created successfully")
def insert_weather_data(data):
"""Insert weather data into database"""
conn = create_connection()
if conn is None:
return False
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO weather_data (
city, country, temperature, feels_like,
temp_min, temp_max, pressure, humidity,
weather_main, weather_description,
wind_speed, clouds, timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['city'],
data['country'],
data['temperature'],
data['feels_like'],
data['temp_min'],
data['temp_max'],
data['pressure'],
data['humidity'],
data['weather_main'],
data['weather_description'],
data['wind_speed'],
data['clouds'],
data['timestamp']
))
conn.commit()
conn.close()
return True
except sqlite3.Error as e:
print(f"Error inserting data: {e}")
conn.close()
return False
def get_latest_weather(city, hours=24):
"""Get latest weather data for a city"""
conn = create_connection()
if conn is None:
return None
cursor = conn.cursor()
# Get data from last N hours
query = '''
SELECT * FROM weather_data
WHERE city = ?
AND datetime(recorded_at) >= datetime('now', ? || ' hours')
ORDER BY recorded_at DESC
'''
cursor.execute(query, (city, -hours))
results = cursor.fetchall()
conn.close()
return results
Step 4: Building the Extract Function
Create the extraction logic in pipeline.py:
import requests
import config
from datetime import datetime
def extract_weather_data(city):
"""
Extract weather data from API for a given city
"""
print(f"Extracting data for {city}...")
try:
# Build API request
params = {
'q': city,
'appid': config.API_KEY,
'units': 'metric' # Get temperature in Celsius
}
# Make API call
response = requests.get(
config.API_BASE_URL,
params=params,
timeout=10
)
# Check if request was successful
if response.status_code == 200:
data = response.json()
print(f"β Successfully extracted data for {city}")
return data
else:
print(f"β API error for {city}: {response.status_code}")
return None
except requests.exceptions.Timeout:
print(f"β Timeout error for {city}")
return None
except requests.exceptions.RequestException as e:
print(f"β Request error for {city}: {e}")
return None
except Exception as e:
print(f"β Unexpected error for {city}: {e}")
return None
def extract_all_cities():
"""Extract weather data for all configured cities"""
all_data = []
for city in config.CITIES:
data = extract_weather_data(city)
if data:
all_data.append(data)
print(f"\nExtracted data for {len(all_data)}/{len(config.CITIES)} cities")
return all_data
Step 5: Building the Transform Function
Add transformation logic to pipeline.py:
import pandas as pd
def transform_weather_data(raw_data):
"""
Transform raw API data into clean, structured format
"""
print("\nTransforming data...")
transformed_records = []
for record in raw_data:
try:
# Extract relevant fields
transformed = {
'city': record['name'],
'country': record['sys']['country'],
'temperature': record['main']['temp'],
'feels_like': record['main']['feels_like'],
'temp_min': record['main']['temp_min'],
'temp_max': record['main']['temp_max'],
'pressure': record['main']['pressure'],
'humidity': record['main']['humidity'],
'weather_main': record['weather'][0]['main'],
'weather_description': record['weather'][0]['description'],
'wind_speed': record['wind']['speed'],
'clouds': record['clouds']['all'],
'timestamp': record['dt']
}
transformed_records.append(transformed)
except KeyError as e:
print(f"β Missing field in data: {e}")
continue
except Exception as e:
print(f"β Error transforming record: {e}")
continue
# Create DataFrame for easier manipulation
df = pd.DataFrame(transformed_records)
# Data quality checks
print("\nRunning data quality checks...")
# Check for missing values
missing = df.isnull().sum()
if missing.any():
print(f"β Warning: Found missing values:\n{missing[missing > 0]}")
# Validate temperature ranges
invalid_temps = df[
(df['temperature'] < config.MIN_TEMP) |
(df['temperature'] > config.MAX_TEMP)
]
if len(invalid_temps) > 0:
print(f"β Warning: Found {len(invalid_temps)} invalid temperatures")
df = df[
(df['temperature'] >= config.MIN_TEMP) &
(df['temperature'] <= config.MAX_TEMP)
]
# Remove duplicates
original_count = len(df)
df = df.drop_duplicates(subset=['city', 'timestamp'])
if len(df) < original_count:
print(f"β Removed {original_count - len(df)} duplicate records")
# Add calculated fields
df['temp_range'] = df['temp_max'] - df['temp_min']
df['is_hot'] = df['temperature'] > 30
df['is_cold'] = df['temperature'] < 10
print(f"β Transformed {len(df)} records successfully")
return df.to_dict('records')
def generate_summary_statistics(df):
"""Generate summary statistics"""
summary = {
'total_cities': df['city'].nunique(),
'avg_temperature': df['temperature'].mean(),
'min_temperature': df['temperature'].min(),
'max_temperature': df['temperature'].max(),
'avg_humidity': df['humidity'].mean(),
'hottest_city': df.loc[df['temperature'].idxmax(), 'city'],
'coldest_city': df.loc[df['temperature'].idxmin(), 'city']
}
return summary
Step 6: Building the Load Function
Add loading logic to pipeline.py:
import database as db
def load_weather_data(transformed_data):
"""
Load transformed data into database
"""
print("\nLoading data to database...")
success_count = 0
fail_count = 0
for record in transformed_data:
if db.insert_weather_data(record):
success_count += 1
else:
fail_count += 1
print(f"β Loaded {success_count} records successfully")
if fail_count > 0:
print(f"β Failed to load {fail_count} records")
return success_count, fail_count
Step 7: Building the Complete Pipeline
Now let's put it all together in pipeline.py:
from datetime import datetime
import pandas as pd
def run_pipeline():
"""
Main pipeline orchestration function
"""
print("="*50)
print("WEATHER DATA PIPELINE")
print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50)
try:
# Step 1: Initialize database
print("\n[1/4] Initializing database...")
db.create_tables()
# Step 2: Extract data
print("\n[2/4] Extracting data from API...")
raw_data = extract_all_cities()
if not raw_data:
print("β No data extracted. Exiting pipeline.")
return
# Step 3: Transform data
print("\n[3/4] Transforming data...")
transformed_data = transform_weather_data(raw_data)
if not transformed_data:
print("β No data to load. Exiting pipeline.")
return
# Step 4: Load data
print("\n[4/4] Loading data to database...")
success_count, fail_count = load_weather_data(transformed_data)
# Generate summary
print("\n" + "="*50)
print("PIPELINE SUMMARY")
print("="*50)
df = pd.DataFrame(transformed_data)
summary = generate_summary_statistics(df)
print(f"Total cities processed: {summary['total_cities']}")
print(f"Average temperature: {summary['avg_temperature']:.2f}Β°C")
print(f"Temperature range: {summary['min_temperature']:.2f}Β°C to {summary['max_temperature']:.2f}Β°C")
print(f"Average humidity: {summary['avg_humidity']:.2f}%")
print(f"Hottest city: {summary['hottest_city']}")
print(f"Coldest city: {summary['coldest_city']}")
print(f"\nRecords loaded: {success_count}")
print("\nβ Pipeline completed successfully!")
print(f"Finished at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
print(f"\nβ Pipeline failed with error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
run_pipeline()
Step 8: Adding Logging
Create utils.py for logging functionality:
import logging
import config
from datetime import datetime
def setup_logging():
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(config.LOG_PATH),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
# Usage in pipeline.py
def run_pipeline_with_logging():
"""Pipeline with logging"""
logger = setup_logging()
logger.info("Pipeline started")
try:
# Your pipeline code
run_pipeline()
logger.info("Pipeline completed successfully")
except Exception as e:
logger.error(f"Pipeline failed: {e}")
raise
Step 9: Testing the Pipeline
Let's run our pipeline!
python pipeline.py
Expected output:
==================================================
WEATHER DATA PIPELINE
Started at: 2025-10-28 10:30:00
==================================================
[1/4] Initializing database...
β Database tables created successfully
[2/4] Extracting data from API...
Extracting data for London...
β Successfully extracted data for London
Extracting data for New York...
β Successfully extracted data for New York
...
Extracted data for 5/5 cities
[3/4] Transforming data...
Running data quality checks...
β Transformed 5 records successfully
[4/4] Loading data to database...
β Loaded 5 records successfully
==================================================
PIPELINE SUMMARY
==================================================
Total cities processed: 5
Average temperature: 18.45Β°C
Temperature range: 10.20Β°C to 28.30Β°C
Average humidity: 65.40%
Hottest city: Mumbai
Coldest city: London
Records loaded: 5
β Pipeline completed successfully!
Finished at: 2025-10-28 10:30:15
Step 10: Querying the Data
Create query_data.py to analyze the data:
import sqlite3
import pandas as pd
import config
def query_latest_weather():
"""Query latest weather for all cities"""
conn = sqlite3.connect(config.DATABASE_PATH)
query = '''
SELECT
city,
temperature,
humidity,
weather_description,
recorded_at
FROM weather_data
WHERE id IN (
SELECT MAX(id)
FROM weather_data
GROUP BY city
)
ORDER BY temperature DESC
'''
df = pd.read_sql_query(query, conn)
conn.close()
print("\nLatest Weather Data:")
print(df.to_string(index=False))
def query_temperature_trends(city, days=7):
"""Query temperature trends for a city"""
conn = sqlite3.connect(config.DATABASE_PATH)
query = '''
SELECT
datetime(recorded_at) as date,
AVG(temperature) as avg_temp,
MIN(temperature) as min_temp,
MAX(temperature) as max_temp
FROM weather_data
WHERE city = ?
AND datetime(recorded_at) >= datetime('now', ? || ' days')
GROUP BY date(recorded_at)
ORDER BY date DESC
'''
df = pd.read_sql_query(query, conn, params=(city, -days))
conn.close()
print(f"\nTemperature Trends for {city}:")
print(df.to_string(index=False))
if __name__ == "__main__":
query_latest_weather()
query_temperature_trends('London')
Step 11: Automating the Pipeline
Option 1: Using Cron (Linux/Mac)
# Open crontab editor
crontab -e
# Add this line to run every hour
0 * * * * cd /path/to/weather-pipeline && python pipeline.py
# Or run every 6 hours
0 */6 * * * cd /path/to/weather-pipeline && python pipeline.py
Option 2: Using Windows Task Scheduler
# Create a batch file: run_pipeline.bat
cd C:\path\to\weather-pipeline
python pipeline.py
pause
# Then schedule it using Task Scheduler
Option 3: Simple Python Scheduler
import schedule
import time
from pipeline import run_pipeline
def job():
"""Job to run"""
print("\n--- Scheduled pipeline run ---")
run_pipeline()
# Schedule the job
schedule.every().hour.do(job) # Run every hour
# schedule.every().day.at("09:00").do(job) # Run daily at 9 AM
# schedule.every(6).hours.do(job) # Run every 6 hours
print("Scheduler started. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
Common Issues and Solutions
Issue 1: API Rate Limits
Problem: Free tier has limited API calls.
Solution: Add delay between requests:
import time
def extract_all_cities():
all_data = []
for city in config.CITIES:
data = extract_weather_data(city)
if data:
all_data.append(data)
time.sleep(1) # Wait 1 second between calls
return all_data
Issue 2: Database Locked
Problem: SQLite database locked error.
Solution: Use proper connection management:
# Always close connections
try:
conn = create_connection()
# Do work
finally:
conn.close()
Issue 3: Missing Data
Problem: Some API responses missing fields.
Solution: Add defensive checks:
def safe_get(dictionary, keys, default=None):
"""Safely get nested dictionary values"""
for key in keys:
if dictionary is None:
return default
dictionary = dictionary.get(key, default)
return dictionary
# Usage
temperature = safe_get(record, ['main', 'temp'], 0)
Enhancing Your Pipeline
Once the basic pipeline works, try these enhancements:
1. Add Email Notifications
import smtplib
from email.message import EmailMessage
def send_alert(subject, message):
"""Send email alert"""
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = 'your-email@example.com'
msg['To'] = 'recipient@example.com'
msg.set_content(message)
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.login('your-email@example.com', 'your-password')
smtp.send_message(msg)
2. Add Data Visualization
import matplotlib.pyplot as plt
def visualize_temperatures(df):
"""Create temperature visualization"""
plt.figure(figsize=(10, 6))
plt.bar(df['city'], df['temperature'])
plt.xlabel('City')
plt.ylabel('Temperature (Β°C)')
plt.title('Current Temperatures')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('data/temperature_chart.png')
print("β Chart saved to data/temperature_chart.png")
3. Add More Data Sources
Try combining multiple APIs:
- Weather data (OpenWeatherMap)
- Air quality data
- News headlines
- Social media trends
What I Learned Building This
Technical lessons:
- Error handling is crucial
- Data quality checks save headaches
- Logging helps debugging
- Start simple, then enhance
General lessons:
- Building beats reading
- Break big problems into small steps
- Every pipeline has quirks
- Documentation helps future you
Next Steps
Now that you've built your first pipeline, try:
- Add more features:
- Historical data analysis
- Alerting for extreme weather
- Multiple data sources
- Data visualization dashboard
- Improve reliability:
- Better error handling
- Retry logic
- Monitoring and alerts
- Unit tests
- Scale up:
- Process more cities
- Store more data
- Add real-time processing
- Move to cloud (AWS, GCP)
- Learn new tools:
- Apache Airflow for orchestration
- PostgreSQL instead of SQLite
- Docker for containerization
- dbt for transformations
Conclusion: You Built a Real Pipeline!
Congratulations! You just built a complete data pipeline from scratch.
It might seem simple, but you've learned the core concepts:
- β Extracting data from APIs
- β Transforming and cleaning data
- β Loading data to databases
- β Data quality checks
- β Error handling
- β Automation
These are the same concepts used in production pipelines at major companies. The tools might be fancier, but the principles are identical.
My advice: Don't stop here. Build more pipelines. Try different data sources. Experiment with new tools. Each pipeline teaches you something new.
The best way to learn data engineering is to build. So keep building!
Happy coding, and welcome to the world of data pipelines!
Built your first pipeline? Ran into issues or have questions? Connect with me on Twitter or LinkedIn. I'd love to hear about your experience!
Support My Work
If this tutorial helped you build your first data pipeline or understand ETL concepts better, I'd really appreciate your support! Creating detailed, hands-on tutorials with working code takes significant time and testing. Your support helps me continue creating practical, beginner-friendly content for aspiring data engineers.
β Buy me a coffee - Every contribution, big or small, means the world to me and keeps me motivated to create more content!
Cover image by ε€ εη½ on Unsplash