Python Concurrency: Threading, Multiprocessing, and Asyncio Guide

Master Python concurrency for faster programs. Learn threading vs multiprocessing, asyncio, GIL limitations, and parallel processing for data science and ML

πŸ“… Published: September 2, 2025 ✏️ Updated: October 10, 2025 By Ojaswi Athghara
#python #threading #asyncio #parallel #gil

Python Concurrency: Threading, Multiprocessing, and Asyncio Guide

When My Script Took 10 Hours

I was training 10 ML models sequentially:

for model in models:
    model.train(X, y)  # 1 hour each = 10 hours total!

Then I discovered multiprocessing:

from multiprocessing import Pool

with Pool(10) as pool:
    pool.map(train_model, models)  # All at once = 1 hour total!

10x speedup! Concurrency changed everything.

Threading vs Multiprocessing vs Asyncio

FeatureThreadingMultiprocessingAsyncio
Best forI/O boundCPU boundI/O bound
ParallelismNo (GIL)YesNo
MemorySharedSeparateShared
SpeedMediumFastVery fast
OverheadLowHighVery low
ComplexityMediumLowHigh

When to Use Each

Threading:

  • Downloading files from multiple URLs
  • Making multiple API calls
  • Database queries
  • Reading/writing multiple files
  • Network operations with delays

Multiprocessing:

  • ML model training
  • Large-scale data processing
  • Image/video processing
  • Scientific computations
  • Any CPU-intensive calculation

Asyncio:

  • Web scraping thousands of pages
  • Handling many API requests
  • Real-time data streaming
  • WebSocket connections
  • High-performance web servers

Understanding the Global Interpreter Lock (GIL)

The GIL is Python's biggest concurrency limitation:

import threading
import time

# CPU-bound function
def cpu_work():
    result = 0
    for i in range(10000000):
        result += i ** 2
    return result

# Test with threading (doesn't help for CPU-bound!)
start = time.time()
threads = [threading.Thread(target=cpu_work) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s")  # ~10s (no speedup!)

# Test with multiprocessing (actually parallel!)
from multiprocessing import Pool
start = time.time()
with Pool(4) as pool:
    pool.map(cpu_work, range(4))
print(f"Multiprocessing: {time.time() - start:.2f}s")  # ~2.5s (4x faster!)

Why? The GIL allows only one thread to execute Python bytecode at a time, even on multi-core CPUs. Multiprocessing bypasses this by using separate processes.

Threading: I/O-Bound Tasks

Basic Threading Example

import threading
import time
import requests

def download_file(url):
    print(f"Downloading {url}")
    response = requests.get(url)
    print(f"Finished {url}")
    return len(response.content)

urls = [
    'https://example.com/data1.csv',
    'https://example.com/data2.csv',
    'https://example.com/data3.csv',
]

# Sequential (slow) - 3 seconds per file = 9 seconds total
start = time.time()
for url in urls:
    download_file(url)
print(f"Sequential: {time.time() - start:.1f}s")  # ~9s

# Threading (fast) - all at once = 3 seconds total
start = time.time()
threads = []
for url in urls:
    thread = threading.Thread(target=download_file, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()  # Wait for all to finish

print(f"Threading: {time.time() - start:.1f}s")  # ~3s (3x faster!)

Thread Safety and Locks

When threads share data, you need synchronization:

import threading

# Unsafe - race condition!
counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1  # Not atomic!

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Counter: {counter}")  # Should be 1,000,000 but isn't!
# Safe - using lock
counter = 0
lock = threading.Lock()

def increment_safe():
    global counter
    for _ in range(100000):
        with lock:  # Acquire lock
            counter += 1
        # Lock automatically released

threads = [threading.Thread(target=increment_safe) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Counter: {counter}")  # Exactly 1,000,000!

ThreadPoolExecutor - Better Threading

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url):
    response = requests.get(url)
    return url, len(response.content)

urls = [f'https://example.com/page{i}' for i in range(20)]

# Use ThreadPoolExecutor (cleaner API)
with ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks
    futures = [executor.submit(fetch_url, url) for url in urls]
    
    # Get results as they complete
    for future in futures:
        url, size = future.result()
        print(f"{url}: {size} bytes")

Multiprocessing: CPU-Bound Tasks

Basic Multiprocessing

from multiprocessing import Pool
import time

def train_model(model_id):
    """Simulate CPU-intensive training."""
    result = 0
    for i in range(10000000):
        result += i ** 2
    return f"Model {model_id} trained"

model_ids = list(range(8))

# Sequential (slow) - 8 cores sitting idle
start = time.time()
results = [train_model(mid) for mid in model_ids]
print(f"Sequential: {time.time() - start:.1f}s")  # ~32s

# Multiprocessing (fast) - all cores working
start = time.time()
with Pool(8) as pool:
    results = pool.map(train_model, model_ids)
print(f"Multiprocessing: {time.time() - start:.1f}s")  # ~4s (8x faster!)

Sharing Data Between Processes

Processes don't share memory by default. Use special structures:

from multiprocessing import Process, Queue, Manager

# Using Queue for communication
def worker(queue, worker_id):
    result = f"Worker {worker_id} result"
    queue.put(result)

queue = Queue()
processes = [Process(target=worker, args=(queue, i)) for i in range(4)]
for p in processes: p.start()
for p in processes: p.join()

# Collect results
while not queue.empty():
    print(queue.get())
# Using Manager for shared data structures
def process_data(shared_dict, key, value):
    shared_dict[key] = value * 2

with Manager() as manager:
    shared_dict = manager.dict()
    
    processes = [
        Process(target=process_data, args=(shared_dict, i, i*10))
        for i in range(5)
    ]
    
    for p in processes: p.start()
    for p in processes: p.join()
    
    print(dict(shared_dict))  # {0: 0, 1: 20, 2: 40, 3: 60, 4: 80}

ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import numpy as np

def process_chunk(data_chunk):
    """Process a chunk of data."""
    return np.mean(data_chunk) * 2

# Large dataset
data = np.random.rand(10000000)
chunks = np.array_split(data, 8)

# Process chunks in parallel
with ProcessPoolExecutor(max_workers=8) as executor:
    results = list(executor.map(process_chunk, chunks))

final_result = np.mean(results)
print(f"Result: {final_result}")

Asyncio: Modern Async/Await

Understanding Async/Await

Asyncio uses cooperative multitasking - functions voluntarily yield control:

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)  # Yield control during wait
    print("Task 1 finished")
    return "Result 1"

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)  # Yield control during wait
    print("Task 2 finished")
    return "Result 2"

async def main():
    # Run both tasks concurrently
    results = await asyncio.gather(task1(), task2())
    print(f"Results: {results}")

asyncio.run(main())
# Output:
# Task 1 started
# Task 2 started
# Task 2 finished (after 1s)
# Task 1 finished (after 2s)
# Results: ['Result 1', 'Result 2']
# Total time: 2s (not 3s!)

Web Scraping with Asyncio

import asyncio
import aiohttp

async def fetch_data(session, url):
    async with session.get(url) as response:
        data = await response.text()
        return len(data)

async def main():
    urls = [f'https://example.com/api{i}' for i in range(100)]
    
    async with aiohttp.ClientSession() as session:
        # Create all tasks
        tasks = [fetch_data(session, url) for url in urls]
        
        # Run all concurrently
        results = await asyncio.gather(*tasks)
        print(f"Downloaded {sum(results)} bytes from {len(urls)} URLs")

# Run async code
asyncio.run(main())
# 100 requests in ~1s instead of 100s!

Asyncio vs Threading for I/O

# Threading approach
from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_sync(url):
    return requests.get(url).text

with ThreadPoolExecutor(max_workers=50) as executor:
    urls = [f'https://api.example.com/{i}' for i in range(100)]
    results = executor.map(fetch_sync, urls)
# Asyncio approach (more efficient)
import asyncio
import aiohttp

async def fetch_async(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [f'https://api.example.com/{i}' for i in range(100)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

asyncio.run(main())

Asyncio advantages:

  • Lower memory overhead
  • Can handle thousands of connections
  • Better performance for I/O-bound tasks
  • Single-threaded (no thread safety issues)

Real-World ML Example

from multiprocessing import Pool
from functools import partial

def process_batch(data_batch, model_params):
    """Process one batch of data."""
    # Preprocess
    processed = [preprocess(x) for x in data_batch]
    # Predict
    predictions = model.predict(processed)
    return predictions

def parallel_batch_processing(data, model_params, n_workers=4):
    """Process data in parallel batches."""
    # Split data into batches
    batch_size = len(data) // n_workers
    batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
    
    # Process in parallel
    with Pool(n_workers) as pool:
        results = pool.map(
            partial(process_batch, model_params=model_params),
            batches
        )
    
    # Combine results
    return [pred for batch in results for pred in batch]

# Usage
predictions = parallel_batch_processing(test_data, model_params, n_workers=8)

Common Pitfalls

Pitfall 1: Using Threading for CPU-Bound Work

# Wrong - threading doesn't help CPU-bound tasks!
import threading

def cpu_intensive():
    return sum(i**2 for i in range(10000000))

threads = [threading.Thread(target=cpu_intensive) for _ in range(4)]
# No speedup due to GIL!

Pitfall 2: Forgetting to Join Processes/Threads

# Wrong - program exits before tasks finish
threads = [threading.Thread(target=work) for _ in range(10)]
for t in threads:
    t.start()
# Missing: for t in threads: t.join()

Pitfall 3: Shared State Without Locks

# Wrong - race condition!
result = []

def append_items():
    for i in range(1000):
        result.append(i)  # Not thread-safe!

threads = [threading.Thread(target=append_items) for _ in range(10)]

Pitfall 4: Too Many Workers

# Wrong - diminishing returns and overhead
with Pool(100) as pool:  # Too many for 8-core CPU!
    pool.map(cpu_work, data)

# Right - match CPU cores
import os
optimal_workers = os.cpu_count()
with Pool(optimal_workers) as pool:
    pool.map(cpu_work, data)

Choosing the Right Approach

Decision Tree

Is your task CPU-bound (calculations, data processing)?
β”œβ”€ YES β†’ Use multiprocessing
└─ NO β†’ Is it I/O-bound?
    β”œβ”€ YES β†’ Do you need to make 1000s of requests?
    β”‚   β”œβ”€ YES β†’ Use asyncio
    β”‚   └─ NO β†’ Use threading
    └─ NO β†’ Don't use concurrency

Performance Comparison

import time

# Test data
data = list(range(1000000))

# Sequential
start = time.time()
result = [x**2 for x in data]
print(f"Sequential: {time.time() - start:.2f}s")

# Threading (no help for CPU-bound!)
from concurrent.futures import ThreadPoolExecutor
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    result = list(executor.map(lambda x: x**2, data))
print(f"Threading: {time.time() - start:.2f}s")  # Similar to sequential

# Multiprocessing (actual speedup!)
from concurrent.futures import ProcessPoolExecutor
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    result = list(executor.map(lambda x: x**2, data))
print(f"Multiprocessing: {time.time() - start:.2f}s")  # 4x faster!

Best Practices

1. Use Context Managers

# Good - automatic cleanup
with Pool(4) as pool:
    results = pool.map(work, data)

# Bad - manual cleanup
pool = Pool(4)
results = pool.map(work, data)
pool.close()
pool.join()

2. Profile Before Optimizing

Don't guess where bottlenecks areβ€”measure them first using profiling tools!

import cProfile

def my_program():
    # Your code here
    pass

# Profile to find bottlenecks
cProfile.run('my_program()')

# Or use line_profiler for line-by-line analysis

3. Use ProcessPoolExecutor for Simplicity

# Simpler than Pool
from concurrent.futures import ProcessPoolExecutor, as_completed

with ProcessPoolExecutor() as executor:
    futures = [executor.submit(work, item) for item in data]
    
    for future in as_completed(futures):
        result = future.result()
        process(result)

4. Handle Exceptions Properly

from concurrent.futures import ProcessPoolExecutor

def risky_work(x):
    if x < 0:
        raise ValueError("Negative value")
    return x ** 2

with ProcessPoolExecutor() as executor:
    futures = [executor.submit(risky_work, x) for x in data]
    
    for future in futures:
        try:
            result = future.result()
        except Exception as e:
            print(f"Error: {e}")

5. Optimize Worker Count

import os

# For CPU-bound: use CPU count
cpu_workers = os.cpu_count()

# For I/O-bound: can use more
io_workers = cpu_workers * 5

# For asyncio: thousands possible
async_connections = 1000

Key Takeaways

  1. GIL limits threading - only useful for I/O-bound tasks
  2. Multiprocessing bypasses GIL - true parallelism for CPU-bound work
  3. Asyncio is most efficient for I/O - handles thousands of connections
  4. Choose based on bottleneck: CPU β†’ multiprocessing, I/O β†’ asyncio/threading
  5. Profile first - don't guess where the slowness is
  6. Use context managers - prevents resource leaks
  7. Match worker count to task type - CPU cores for CPU-bound, more for I/O-bound

Understanding concurrency transforms slow scripts into fast, scalable applications. Start with the decision tree, profile your code, and apply the right tool for your bottleneck!


If this guide helped you understand concurrency, I'd love to hear about it! Connect with me on Twitter or LinkedIn.

Support My Work

If this guide helped you understand Python concurrency, choose between threading and multiprocessing, or build faster parallel programs, I'd really appreciate your support! Creating comprehensive tutorials on advanced Python concepts like this takes significant time and effort. Your support helps me continue sharing knowledge and creating more helpful resources for Python developers.

β˜• Buy me a coffee - Every contribution, big or small, means the world to me and keeps me motivated to create more content!


Cover image by Ralfs Blumbergs on Unsplash

Related Blogs

Ojaswi Athghara

SDE, 4+ Years

Β© ojaswiat.com 2025-2027