Data Collection Techniques: Practical Web Scraping Methods
Master practical data collection techniques with Python. Learn API integration, web scraping strategies, handling dynamic content, data extraction patterns, and building robust collection pipelines.

The Evolution of My Data Collection Strategy
Three years ago, I manually copied data from websites into spreadsheets. Eight hours of mind-numbing work every week. Then I discovered automated data collection—my eight-hour task became a five-minute script.
But I learned the hard way: not all data collection methods are equal. Some websites block scrapers. Others change structure weekly. I needed a robust toolkit of techniques, not just one approach.
This guide shares practical data collection methods I've refined through countless projects. You'll learn when to use APIs, when to scrape, how to handle dynamic content, and how to build resilient collection systems.
The Data Collection Hierarchy
Before writing code, choose the right approach:
1. Official APIs (Best Option)
Pros: Legal, documented, stable, structured data
Cons: Rate limits, authentication required, limited data
import requests
# Example: GitHub API
headers = {'Authorization': 'token YOUR_TOKEN'}
response = requests.get('https://api.github.com/repos/python/cpython', headers=headers)
data = response.json()
print(f"Stars: {data['stargazers_count']}")
print(f"Forks: {data['forks_count']}")
2. Web Scraping (When No API Exists)
Pros: Access any public data, flexible
Cons: Legal gray area, maintenance heavy, can be blocked
3. Third-Party Data Services
Pros: Clean, reliable, legal
Cons: Expensive, limited customization
API Integration Strategies
RESTful API Basics
import requests
import time
class APIClient:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {api_key}'}
self.session = requests.Session()
def get(self, endpoint, params=None):
"""Make GET request with error handling"""
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.get(url, headers=self.headers, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e}")
return None
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
def paginate(self, endpoint, max_pages=10):
"""Handle pagination automatically"""
all_data = []
page = 1
while page <= max_pages:
params = {'page': page, 'per_page': 100}
data = self.get(endpoint, params)
if not data or len(data) == 0:
break
all_data.extend(data)
page += 1
time.sleep(0.5) # Rate limiting
return all_data
# Usage
client = APIClient('https://api.example.com', 'your_api_key')
repos = client.paginate('repositories')
print(f"Collected {len(repos)} repositories")
Handling Rate Limits
import time
from datetime import datetime, timedelta
class RateLimitedAPI:
def __init__(self, requests_per_hour=1000):
self.requests_per_hour = requests_per_hour
self.requests = []
def make_request(self, url):
"""Make request respecting rate limits"""
self._wait_if_needed()
response = requests.get(url)
self.requests.append(datetime.now())
# Check for rate limit headers
if 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers['X-RateLimit-Remaining'])
if remaining < 10:
reset_time = int(response.headers['X-RateLimit-Reset'])
wait_seconds = reset_time - time.time()
if wait_seconds > 0:
print(f"Near rate limit. Waiting {wait_seconds:.0f}s...")
time.sleep(wait_seconds)
return response.json()
def _wait_if_needed(self):
"""Wait if we've hit our self-imposed rate limit"""
now = datetime.now()
hour_ago = now - timedelta(hours=1)
# Remove requests older than 1 hour
self.requests = [r for r in self.requests if r > hour_ago]
if len(self.requests) >= self.requests_per_hour:
oldest = self.requests[0]
wait_seconds = 3600 - (now - oldest).total_seconds()
if wait_seconds > 0:
print(f"Rate limit reached. Waiting {wait_seconds:.0f}s...")
time.sleep(wait_seconds)
Advanced Web Scraping Techniques
Handling Dynamic JavaScript Content
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class DynamicScraper:
def __init__(self, headless=True):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def scrape_dynamic_page(self, url, selector):
"""Scrape page with JavaScript rendering"""
self.driver.get(url)
# Wait for content to load
element = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
# Scroll to load lazy content
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
# Extract data
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
data = [elem.text for elem in elements]
return data
def handle_infinite_scroll(self, url, item_selector, max_scrolls=10):
"""Handle infinite scrolling pages"""
self.driver.get(url)
all_items = set()
for _ in range(max_scrolls):
# Get current items
items = self.driver.find_elements(By.CSS_SELECTOR, item_selector)
all_items.update(item.text for item in items)
# Scroll down
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
# Check if new items loaded
new_items = self.driver.find_elements(By.CSS_SELECTOR, item_selector)
if len(new_items) == len(items):
break # No new items loaded
return list(all_items)
def close(self):
self.driver.quit()
Handling Authentication
import requests
from http.cookies import SimpleCookie
class AuthenticatedScraper:
def __init__(self):
self.session = requests.Session()
self.logged_in = False
def login(self, login_url, credentials):
"""Login and maintain session"""
response = self.session.post(login_url, data=credentials)
if response.status_code == 200:
self.logged_in = True
print("Login successful")
else:
print("Login failed")
return self.logged_in
def scrape_protected_page(self, url):
"""Scrape page requiring authentication"""
if not self.logged_in:
print("Not logged in!")
return None
response = self.session.get(url)
return response.text
def save_cookies(self, filename='cookies.txt'):
"""Save session cookies"""
with open(filename, 'w') as f:
for cookie in self.session.cookies:
f.write(f"{cookie.name}={cookie.value}\n")
def load_cookies(self, filename='cookies.txt'):
"""Load saved cookies"""
try:
with open(filename, 'r') as f:
for line in f:
name, value = line.strip().split('=', 1)
self.session.cookies.set(name, value)
self.logged_in = True
except FileNotFoundError:
print("Cookie file not found")
Robust Error Handling
Retry Logic with Exponential Backoff
import time
from functools import wraps
def retry_with_backoff(max_retries=3, backoff_factor=2):
"""Decorator for retrying failed requests"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time}s...")
time.sleep(wait_time)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def fetch_with_retry(url):
"""Fetch URL with automatic retries"""
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
Circuit Breaker Pattern
from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == 'OPEN':
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = 'HALF_OPEN'
print("Circuit breaker: Attempting recovery...")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
"""Handle successful call"""
self.failures = 0
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
print("Circuit breaker: Recovered")
def on_failure(self):
"""Handle failed call"""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = 'OPEN'
print(f"Circuit breaker: OPEN after {self.failures} failures")
Data Extraction Patterns
Structured Data Extraction
from bs4 import BeautifulSoup
import re
class DataExtractor:
def __init__(self, html):
self.soup = BeautifulSoup(html, 'lxml')
def extract_prices(self):
"""Extract prices with various formats"""
price_patterns = [
r'\$[\d,]+\.?\d*',
r'€[\d,]+\.?\d*',
r'£[\d,]+\.?\d*',
r'[\d,]+\.?\d*\s*USD'
]
prices = []
text = self.soup.get_text()
for pattern in price_patterns:
matches = re.findall(pattern, text)
prices.extend(matches)
return prices
def extract_emails(self):
"""Extract email addresses"""
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
text = self.soup.get_text()
return re.findall(email_pattern, text)
def extract_phone_numbers(self):
"""Extract phone numbers"""
phone_patterns = [
r'\+?\d{1,3}[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}',
r'\d{3}[-.\s]?\d{3}[-.\s]?\d{4}'
]
phones = []
text = self.soup.get_text()
for pattern in phone_patterns:
matches = re.findall(pattern, text)
phones.extend(matches)
return phones
def extract_dates(self):
"""Extract dates in various formats"""
date_patterns = [
r'\d{4}-\d{2}-\d{2}',
r'\d{2}/\d{2}/\d{4}',
r'[A-Z][a-z]+\s+\d{1,2},\s+\d{4}'
]
dates = []
text = self.soup.get_text()
for pattern in date_patterns:
matches = re.findall(pattern, text)
dates.extend(matches)
return dates
Complete Collection Pipeline
import pandas as pd
import logging
from pathlib import Path
class DataCollectionPipeline:
def __init__(self, output_dir='data'):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.output_dir / 'pipeline.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def collect(self, sources):
"""Collect data from multiple sources"""
all_data = []
for source in sources:
self.logger.info(f"Collecting from {source['name']}...")
try:
if source['type'] == 'api':
data = self._collect_from_api(source)
elif source['type'] == 'scrape':
data = self._collect_from_scraping(source)
else:
self.logger.warning(f"Unknown source type: {source['type']}")
continue
all_data.extend(data)
self.logger.info(f"Collected {len(data)} items from {source['name']}")
except Exception as e:
self.logger.error(f"Failed to collect from {source['name']}: {e}")
return all_data
def _collect_from_api(self, source):
"""Collect data from API"""
response = requests.get(source['url'], headers=source.get('headers', {}))
response.raise_for_status()
return response.json()
def _collect_from_scraping(self, source):
"""Collect data via web scraping"""
response = requests.get(source['url'])
soup = BeautifulSoup(response.text, 'lxml')
items = soup.select(source['selector'])
data = []
for item in items:
data.append({
'text': item.get_text(strip=True),
'url': item.get('href', ''),
'source': source['name']
})
return data
def validate(self, data):
"""Validate collected data"""
self.logger.info("Validating data...")
# Remove duplicates
df = pd.DataFrame(data)
before = len(df)
df = df.drop_duplicates()
after = len(df)
if before > after:
self.logger.info(f"Removed {before - after} duplicates")
# Remove empty entries
df = df.dropna(how='all')
return df.to_dict('records')
def save(self, data, filename='collected_data'):
"""Save data in multiple formats"""
df = pd.DataFrame(data)
# Save as CSV
csv_path = self.output_dir / f'{filename}.csv'
df.to_csv(csv_path, index=False)
self.logger.info(f"Saved CSV: {csv_path}")
# Save as JSON
json_path = self.output_dir / f'{filename}.json'
df.to_json(json_path, orient='records', indent=2)
self.logger.info(f"Saved JSON: {json_path}")
return csv_path
def run(self, sources, filename='collected_data'):
"""Run complete pipeline"""
self.logger.info("Starting data collection pipeline...")
# Collect
data = self.collect(sources)
self.logger.info(f"Total items collected: {len(data)}")
# Validate
data = self.validate(data)
self.logger.info(f"Items after validation: {len(data)}")
# Save
output_file = self.save(data, filename)
self.logger.info("Pipeline completed successfully")
return output_file
# Usage
pipeline = DataCollectionPipeline()
sources = [
{
'name': 'API Source',
'type': 'api',
'url': 'https://api.example.com/data'
},
{
'name': 'Web Source',
'type': 'scrape',
'url': 'https://example.com/items',
'selector': '.item'
}
]
output = pipeline.run(sources, 'my_data')
Monitoring and Maintenance
Health Checks
import smtplib
from email.message import EmailMessage
class CollectionMonitor:
def __init__(self, email_config=None):
self.email_config = email_config
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_items_collected': 0
}
def record_request(self, success=True, items_count=0):
"""Record request metrics"""
self.metrics['total_requests'] += 1
if success:
self.metrics['successful_requests'] += 1
self.metrics['total_items_collected'] += items_count
else:
self.metrics['failed_requests'] += 1
def get_success_rate(self):
"""Calculate success rate"""
if self.metrics['total_requests'] == 0:
return 0
return (self.metrics['successful_requests'] / self.metrics['total_requests']) * 100
def check_health(self):
"""Check if collection is healthy"""
success_rate = self.get_success_rate()
if success_rate < 80:
alert_msg = f"Low success rate: {success_rate:.2f}%"
self.send_alert(alert_msg)
return False
return True
def send_alert(self, message):
"""Send alert email"""
if not self.email_config:
print(f"ALERT: {message}")
return
msg = EmailMessage()
msg['Subject'] = 'Data Collection Alert'
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
msg.set_content(message)
try:
with smtplib.SMTP(self.email_config['smtp_server']) as server:
server.send_message(msg)
except Exception as e:
print(f"Failed to send alert: {e}")
def get_report(self):
"""Generate collection report"""
return f"""
Data Collection Report
=====================
Total Requests: {self.metrics['total_requests']}
Successful: {self.metrics['successful_requests']}
Failed: {self.metrics['failed_requests']}
Success Rate: {self.get_success_rate():.2f}%
Items Collected: {self.metrics['total_items_collected']}
"""
Best Practices Summary
1. Start with APIs
Always check for official APIs before scraping:
- ProgrammableWeb API directory
- Check
/robots.txtfor API hints - Look for developer documentation
2. Implement Robust Error Handling
# Always wrap in try-except
try:
data = collect_data()
except requests.exceptions.Timeout:
# Handle timeout
pass
except requests.exceptions.HTTPError as e:
# Handle HTTP errors
pass
except Exception as e:
# Catch-all for unexpected errors
logger.error(f"Unexpected error: {e}")
3. Respect Websites
- Follow
robots.txt - Use appropriate User-Agent
- Implement rate limiting
- Cache responses during development
- Scrape during off-peak hours
4. Monitor and Log
import logging
logging.basicConfig(
filename='scraper.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
Your Data Collection Toolkit
You now have comprehensive data collection techniques:
- APIs - Integration and authentication
- Scraping - Static and dynamic content
- Error Handling - Retries and circuit breakers
- Extraction - Structured data patterns
- Pipelines - End-to-end automation
- Monitoring - Health checks and alerts
Remember: Good data collection is reliable, respectful, and maintainable.
My Data Collection Journey
Three years ago, I manually copied data into spreadsheets. Now I maintain automated pipelines collecting millions of records daily.
The transformation didn't happen overnight. It started with a single Python script using requests and BeautifulSoup. Then I learned proper error handling after my first scraper broke production. Added logging when debugging took hours. Implemented monitoring when I missed data gaps for weeks.
Each failure taught valuable lessons. Each success built confidence. The scrapers I build today reflect thousands of hours of trial, error, and iteration.
Your path will be similar: Start simple. Break things. Learn from failures. Build incrementally. Don't try to create the perfect solution immediately.
The secret nobody tells you: Even experienced engineers have scrapers break constantly. The difference is they build systems that detect and recover from failures automatically.
Next Steps
- Learn Scrapy framework for production-grade scraping
- Explore Apache Airflow for pipeline orchestration
- Study BeautifulSoup documentation deeply
- Practice with public APIs before scraping
- Join data engineering communities to learn from others
Start small, test thoroughly, and scale gradually. Every expert was once a beginner who kept practicing. Happy collecting!
Found this guide helpful? Share it with your data engineering community! Connect with me on Twitter or LinkedIn for more data collection tips.
Support My Work
If this guide helped you master data collection techniques, build robust scrapers, or automate your data pipelines, I'd greatly appreciate your support, I'd really appreciate your support! Creating comprehensive, free content like this takes significant time and effort. Your support helps me continue sharing knowledge and creating more helpful resources for developers.
☕ Buy me a coffee - Every contribution, big or small, means the world to me and keeps me motivated to create more content!
Cover image by Kike Salazar N on Unsplash