Data Collection Techniques: Practical Web Scraping Methods

Master practical data collection techniques with Python. Learn API integration, web scraping strategies, handling dynamic content, data extraction patterns, and building robust collection pipelines.

📅 Published: October 22, 2025 ✏️ Updated: October 28, 2025 By Ojaswi Athghara
#data-collection #scraping #api #python #automation #methods

Data Collection Techniques: Practical Web Scraping Methods

The Evolution of My Data Collection Strategy

Three years ago, I manually copied data from websites into spreadsheets. Eight hours of mind-numbing work every week. Then I discovered automated data collection—my eight-hour task became a five-minute script.

But I learned the hard way: not all data collection methods are equal. Some websites block scrapers. Others change structure weekly. I needed a robust toolkit of techniques, not just one approach.

This guide shares practical data collection methods I've refined through countless projects. You'll learn when to use APIs, when to scrape, how to handle dynamic content, and how to build resilient collection systems.

The Data Collection Hierarchy

Before writing code, choose the right approach:

1. Official APIs (Best Option)

Pros: Legal, documented, stable, structured data
Cons: Rate limits, authentication required, limited data

import requests

# Example: GitHub API
headers = {'Authorization': 'token YOUR_TOKEN'}
response = requests.get('https://api.github.com/repos/python/cpython', headers=headers)
data = response.json()

print(f"Stars: {data['stargazers_count']}")
print(f"Forks: {data['forks_count']}")

2. Web Scraping (When No API Exists)

Pros: Access any public data, flexible
Cons: Legal gray area, maintenance heavy, can be blocked

3. Third-Party Data Services

Pros: Clean, reliable, legal
Cons: Expensive, limited customization

API Integration Strategies

RESTful API Basics

import requests
import time

class APIClient:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.headers = {'Authorization': f'Bearer {api_key}'}
        self.session = requests.Session()
    
    def get(self, endpoint, params=None):
        """Make GET request with error handling"""
        url = f"{self.base_url}/{endpoint}"
        
        try:
            response = self.session.get(url, headers=self.headers, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            print(f"HTTP Error: {e}")
            return None
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            return None
    
    def paginate(self, endpoint, max_pages=10):
        """Handle pagination automatically"""
        all_data = []
        page = 1
        
        while page <= max_pages:
            params = {'page': page, 'per_page': 100}
            data = self.get(endpoint, params)
            
            if not data or len(data) == 0:
                break
            
            all_data.extend(data)
            page += 1
            time.sleep(0.5)  # Rate limiting
        
        return all_data

# Usage
client = APIClient('https://api.example.com', 'your_api_key')
repos = client.paginate('repositories')
print(f"Collected {len(repos)} repositories")

Handling Rate Limits

import time
from datetime import datetime, timedelta

class RateLimitedAPI:
    def __init__(self, requests_per_hour=1000):
        self.requests_per_hour = requests_per_hour
        self.requests = []
    
    def make_request(self, url):
        """Make request respecting rate limits"""
        self._wait_if_needed()
        
        response = requests.get(url)
        self.requests.append(datetime.now())
        
        # Check for rate limit headers
        if 'X-RateLimit-Remaining' in response.headers:
            remaining = int(response.headers['X-RateLimit-Remaining'])
            if remaining < 10:
                reset_time = int(response.headers['X-RateLimit-Reset'])
                wait_seconds = reset_time - time.time()
                if wait_seconds > 0:
                    print(f"Near rate limit. Waiting {wait_seconds:.0f}s...")
                    time.sleep(wait_seconds)
        
        return response.json()
    
    def _wait_if_needed(self):
        """Wait if we've hit our self-imposed rate limit"""
        now = datetime.now()
        hour_ago = now - timedelta(hours=1)
        
        # Remove requests older than 1 hour
        self.requests = [r for r in self.requests if r > hour_ago]
        
        if len(self.requests) >= self.requests_per_hour:
            oldest = self.requests[0]
            wait_seconds = 3600 - (now - oldest).total_seconds()
            if wait_seconds > 0:
                print(f"Rate limit reached. Waiting {wait_seconds:.0f}s...")
                time.sleep(wait_seconds)

Advanced Web Scraping Techniques

Handling Dynamic JavaScript Content

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

class DynamicScraper:
    def __init__(self, headless=True):
        options = webdriver.ChromeOptions()
        if headless:
            options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        
        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)
    
    def scrape_dynamic_page(self, url, selector):
        """Scrape page with JavaScript rendering"""
        self.driver.get(url)
        
        # Wait for content to load
        element = self.wait.until(
            EC.presence_of_element_located((By.CSS_SELECTOR, selector))
        )
        
        # Scroll to load lazy content
        self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)
        
        # Extract data
        elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
        data = [elem.text for elem in elements]
        
        return data
    
    def handle_infinite_scroll(self, url, item_selector, max_scrolls=10):
        """Handle infinite scrolling pages"""
        self.driver.get(url)
        all_items = set()
        
        for _ in range(max_scrolls):
            # Get current items
            items = self.driver.find_elements(By.CSS_SELECTOR, item_selector)
            all_items.update(item.text for item in items)
            
            # Scroll down
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)
            
            # Check if new items loaded
            new_items = self.driver.find_elements(By.CSS_SELECTOR, item_selector)
            if len(new_items) == len(items):
                break  # No new items loaded
        
        return list(all_items)
    
    def close(self):
        self.driver.quit()

Handling Authentication

import requests
from http.cookies import SimpleCookie

class AuthenticatedScraper:
    def __init__(self):
        self.session = requests.Session()
        self.logged_in = False
    
    def login(self, login_url, credentials):
        """Login and maintain session"""
        response = self.session.post(login_url, data=credentials)
        
        if response.status_code == 200:
            self.logged_in = True
            print("Login successful")
        else:
            print("Login failed")
        
        return self.logged_in
    
    def scrape_protected_page(self, url):
        """Scrape page requiring authentication"""
        if not self.logged_in:
            print("Not logged in!")
            return None
        
        response = self.session.get(url)
        return response.text
    
    def save_cookies(self, filename='cookies.txt'):
        """Save session cookies"""
        with open(filename, 'w') as f:
            for cookie in self.session.cookies:
                f.write(f"{cookie.name}={cookie.value}\n")
    
    def load_cookies(self, filename='cookies.txt'):
        """Load saved cookies"""
        try:
            with open(filename, 'r') as f:
                for line in f:
                    name, value = line.strip().split('=', 1)
                    self.session.cookies.set(name, value)
            self.logged_in = True
        except FileNotFoundError:
            print("Cookie file not found")

Robust Error Handling

Retry Logic with Exponential Backoff

import time
from functools import wraps

def retry_with_backoff(max_retries=3, backoff_factor=2):
    """Decorator for retrying failed requests"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    
                    wait_time = backoff_factor ** attempt
                    print(f"Attempt {attempt + 1} failed: {e}")
                    print(f"Retrying in {wait_time}s...")
                    time.sleep(wait_time)
        
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def fetch_with_retry(url):
    """Fetch URL with automatic retries"""
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response

Circuit Breaker Pattern

from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        if self.state == 'OPEN':
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = 'HALF_OPEN'
                print("Circuit breaker: Attempting recovery...")
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e
    
    def on_success(self):
        """Handle successful call"""
        self.failures = 0
        if self.state == 'HALF_OPEN':
            self.state = 'CLOSED'
            print("Circuit breaker: Recovered")
    
    def on_failure(self):
        """Handle failed call"""
        self.failures += 1
        self.last_failure_time = datetime.now()
        
        if self.failures >= self.failure_threshold:
            self.state = 'OPEN'
            print(f"Circuit breaker: OPEN after {self.failures} failures")

Data Extraction Patterns

Structured Data Extraction

from bs4 import BeautifulSoup
import re

class DataExtractor:
    def __init__(self, html):
        self.soup = BeautifulSoup(html, 'lxml')
    
    def extract_prices(self):
        """Extract prices with various formats"""
        price_patterns = [
            r'\$[\d,]+\.?\d*',
            r'[\d,]+\.?\d*',
            r'£[\d,]+\.?\d*',
            r'[\d,]+\.?\d*\s*USD'
        ]
        
        prices = []
        text = self.soup.get_text()
        
        for pattern in price_patterns:
            matches = re.findall(pattern, text)
            prices.extend(matches)
        
        return prices
    
    def extract_emails(self):
        """Extract email addresses"""
        email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
        text = self.soup.get_text()
        return re.findall(email_pattern, text)
    
    def extract_phone_numbers(self):
        """Extract phone numbers"""
        phone_patterns = [
            r'\+?\d{1,3}[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}',
            r'\d{3}[-.\s]?\d{3}[-.\s]?\d{4}'
        ]
        
        phones = []
        text = self.soup.get_text()
        
        for pattern in phone_patterns:
            matches = re.findall(pattern, text)
            phones.extend(matches)
        
        return phones
    
    def extract_dates(self):
        """Extract dates in various formats"""
        date_patterns = [
            r'\d{4}-\d{2}-\d{2}',
            r'\d{2}/\d{2}/\d{4}',
            r'[A-Z][a-z]+\s+\d{1,2},\s+\d{4}'
        ]
        
        dates = []
        text = self.soup.get_text()
        
        for pattern in date_patterns:
            matches = re.findall(pattern, text)
            dates.extend(matches)
        
        return dates

Complete Collection Pipeline

import pandas as pd
import logging
from pathlib import Path

class DataCollectionPipeline:
    def __init__(self, output_dir='data'):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.output_dir / 'pipeline.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def collect(self, sources):
        """Collect data from multiple sources"""
        all_data = []
        
        for source in sources:
            self.logger.info(f"Collecting from {source['name']}...")
            
            try:
                if source['type'] == 'api':
                    data = self._collect_from_api(source)
                elif source['type'] == 'scrape':
                    data = self._collect_from_scraping(source)
                else:
                    self.logger.warning(f"Unknown source type: {source['type']}")
                    continue
                
                all_data.extend(data)
                self.logger.info(f"Collected {len(data)} items from {source['name']}")
                
            except Exception as e:
                self.logger.error(f"Failed to collect from {source['name']}: {e}")
        
        return all_data
    
    def _collect_from_api(self, source):
        """Collect data from API"""
        response = requests.get(source['url'], headers=source.get('headers', {}))
        response.raise_for_status()
        return response.json()
    
    def _collect_from_scraping(self, source):
        """Collect data via web scraping"""
        response = requests.get(source['url'])
        soup = BeautifulSoup(response.text, 'lxml')
        
        items = soup.select(source['selector'])
        data = []
        
        for item in items:
            data.append({
                'text': item.get_text(strip=True),
                'url': item.get('href', ''),
                'source': source['name']
            })
        
        return data
    
    def validate(self, data):
        """Validate collected data"""
        self.logger.info("Validating data...")
        
        # Remove duplicates
        df = pd.DataFrame(data)
        before = len(df)
        df = df.drop_duplicates()
        after = len(df)
        
        if before > after:
            self.logger.info(f"Removed {before - after} duplicates")
        
        # Remove empty entries
        df = df.dropna(how='all')
        
        return df.to_dict('records')
    
    def save(self, data, filename='collected_data'):
        """Save data in multiple formats"""
        df = pd.DataFrame(data)
        
        # Save as CSV
        csv_path = self.output_dir / f'{filename}.csv'
        df.to_csv(csv_path, index=False)
        self.logger.info(f"Saved CSV: {csv_path}")
        
        # Save as JSON
        json_path = self.output_dir / f'{filename}.json'
        df.to_json(json_path, orient='records', indent=2)
        self.logger.info(f"Saved JSON: {json_path}")
        
        return csv_path
    
    def run(self, sources, filename='collected_data'):
        """Run complete pipeline"""
        self.logger.info("Starting data collection pipeline...")
        
        # Collect
        data = self.collect(sources)
        self.logger.info(f"Total items collected: {len(data)}")
        
        # Validate
        data = self.validate(data)
        self.logger.info(f"Items after validation: {len(data)}")
        
        # Save
        output_file = self.save(data, filename)
        
        self.logger.info("Pipeline completed successfully")
        return output_file

# Usage
pipeline = DataCollectionPipeline()

sources = [
    {
        'name': 'API Source',
        'type': 'api',
        'url': 'https://api.example.com/data'
    },
    {
        'name': 'Web Source',
        'type': 'scrape',
        'url': 'https://example.com/items',
        'selector': '.item'
    }
]

output = pipeline.run(sources, 'my_data')

Monitoring and Maintenance

Health Checks

import smtplib
from email.message import EmailMessage

class CollectionMonitor:
    def __init__(self, email_config=None):
        self.email_config = email_config
        self.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'total_items_collected': 0
        }
    
    def record_request(self, success=True, items_count=0):
        """Record request metrics"""
        self.metrics['total_requests'] += 1
        
        if success:
            self.metrics['successful_requests'] += 1
            self.metrics['total_items_collected'] += items_count
        else:
            self.metrics['failed_requests'] += 1
    
    def get_success_rate(self):
        """Calculate success rate"""
        if self.metrics['total_requests'] == 0:
            return 0
        return (self.metrics['successful_requests'] / self.metrics['total_requests']) * 100
    
    def check_health(self):
        """Check if collection is healthy"""
        success_rate = self.get_success_rate()
        
        if success_rate < 80:
            alert_msg = f"Low success rate: {success_rate:.2f}%"
            self.send_alert(alert_msg)
            return False
        
        return True
    
    def send_alert(self, message):
        """Send alert email"""
        if not self.email_config:
            print(f"ALERT: {message}")
            return
        
        msg = EmailMessage()
        msg['Subject'] = 'Data Collection Alert'
        msg['From'] = self.email_config['from']
        msg['To'] = self.email_config['to']
        msg.set_content(message)
        
        try:
            with smtplib.SMTP(self.email_config['smtp_server']) as server:
                server.send_message(msg)
        except Exception as e:
            print(f"Failed to send alert: {e}")
    
    def get_report(self):
        """Generate collection report"""
        return f"""
        Data Collection Report
        =====================
        Total Requests: {self.metrics['total_requests']}
        Successful: {self.metrics['successful_requests']}
        Failed: {self.metrics['failed_requests']}
        Success Rate: {self.get_success_rate():.2f}%
        Items Collected: {self.metrics['total_items_collected']}
        """

Best Practices Summary

1. Start with APIs

Always check for official APIs before scraping:

2. Implement Robust Error Handling

# Always wrap in try-except
try:
    data = collect_data()
except requests.exceptions.Timeout:
    # Handle timeout
    pass
except requests.exceptions.HTTPError as e:
    # Handle HTTP errors
    pass
except Exception as e:
    # Catch-all for unexpected errors
    logger.error(f"Unexpected error: {e}")

3. Respect Websites

  • Follow robots.txt
  • Use appropriate User-Agent
  • Implement rate limiting
  • Cache responses during development
  • Scrape during off-peak hours

4. Monitor and Log

import logging

logging.basicConfig(
    filename='scraper.log',
    level=logging.INFO,
    format='%(asctime)s - %(message)s'
)

Your Data Collection Toolkit

You now have comprehensive data collection techniques:

  1. APIs - Integration and authentication
  2. Scraping - Static and dynamic content
  3. Error Handling - Retries and circuit breakers
  4. Extraction - Structured data patterns
  5. Pipelines - End-to-end automation
  6. Monitoring - Health checks and alerts

Remember: Good data collection is reliable, respectful, and maintainable.

My Data Collection Journey

Three years ago, I manually copied data into spreadsheets. Now I maintain automated pipelines collecting millions of records daily.

The transformation didn't happen overnight. It started with a single Python script using requests and BeautifulSoup. Then I learned proper error handling after my first scraper broke production. Added logging when debugging took hours. Implemented monitoring when I missed data gaps for weeks.

Each failure taught valuable lessons. Each success built confidence. The scrapers I build today reflect thousands of hours of trial, error, and iteration.

Your path will be similar: Start simple. Break things. Learn from failures. Build incrementally. Don't try to create the perfect solution immediately.

The secret nobody tells you: Even experienced engineers have scrapers break constantly. The difference is they build systems that detect and recover from failures automatically.

Next Steps

Start small, test thoroughly, and scale gradually. Every expert was once a beginner who kept practicing. Happy collecting!


Found this guide helpful? Share it with your data engineering community! Connect with me on Twitter or LinkedIn for more data collection tips.

Support My Work

If this guide helped you master data collection techniques, build robust scrapers, or automate your data pipelines, I'd greatly appreciate your support, I'd really appreciate your support! Creating comprehensive, free content like this takes significant time and effort. Your support helps me continue sharing knowledge and creating more helpful resources for developers.

☕ Buy me a coffee - Every contribution, big or small, means the world to me and keeps me motivated to create more content!


Cover image by Kike Salazar N on Unsplash

Related Blogs

Ojaswi Athghara

SDE, 4+ Years

© ojaswiat.com 2025-2027