Python Concurrency: Threading, Multiprocessing, and Asyncio Guide
Master Python concurrency for faster programs. Learn threading vs multiprocessing, asyncio, GIL limitations, and parallel processing for data science and ML

When My Script Took 10 Hours
I was training 10 ML models sequentially:
for model in models:
model.train(X, y) # 1 hour each = 10 hours total!
Then I discovered multiprocessing:
from multiprocessing import Pool
with Pool(10) as pool:
pool.map(train_model, models) # All at once = 1 hour total!
10x speedup! Concurrency changed everything.
Threading vs Multiprocessing vs Asyncio
| Feature | Threading | Multiprocessing | Asyncio |
|---|---|---|---|
| Best for | I/O bound | CPU bound | I/O bound |
| Parallelism | No (GIL) | Yes | No |
| Memory | Shared | Separate | Shared |
| Speed | Medium | Fast | Very fast |
| Overhead | Low | High | Very low |
| Complexity | Medium | Low | High |
When to Use Each
Threading:
- Downloading files from multiple URLs
- Making multiple API calls
- Database queries
- Reading/writing multiple files
- Network operations with delays
Multiprocessing:
- ML model training
- Large-scale data processing
- Image/video processing
- Scientific computations
- Any CPU-intensive calculation
Asyncio:
- Web scraping thousands of pages
- Handling many API requests
- Real-time data streaming
- WebSocket connections
- High-performance web servers
Understanding the Global Interpreter Lock (GIL)
The GIL is Python's biggest concurrency limitation:
import threading
import time
# CPU-bound function
def cpu_work():
result = 0
for i in range(10000000):
result += i ** 2
return result
# Test with threading (doesn't help for CPU-bound!)
start = time.time()
threads = [threading.Thread(target=cpu_work) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s") # ~10s (no speedup!)
# Test with multiprocessing (actually parallel!)
from multiprocessing import Pool
start = time.time()
with Pool(4) as pool:
pool.map(cpu_work, range(4))
print(f"Multiprocessing: {time.time() - start:.2f}s") # ~2.5s (4x faster!)
Why? The GIL allows only one thread to execute Python bytecode at a time, even on multi-core CPUs. Multiprocessing bypasses this by using separate processes.
Threading: I/O-Bound Tasks
Basic Threading Example
import threading
import time
import requests
def download_file(url):
print(f"Downloading {url}")
response = requests.get(url)
print(f"Finished {url}")
return len(response.content)
urls = [
'https://example.com/data1.csv',
'https://example.com/data2.csv',
'https://example.com/data3.csv',
]
# Sequential (slow) - 3 seconds per file = 9 seconds total
start = time.time()
for url in urls:
download_file(url)
print(f"Sequential: {time.time() - start:.1f}s") # ~9s
# Threading (fast) - all at once = 3 seconds total
start = time.time()
threads = []
for url in urls:
thread = threading.Thread(target=download_file, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all to finish
print(f"Threading: {time.time() - start:.1f}s") # ~3s (3x faster!)
Thread Safety and Locks
When threads share data, you need synchronization:
import threading
# Unsafe - race condition!
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # Not atomic!
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Counter: {counter}") # Should be 1,000,000 but isn't!
# Safe - using lock
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100000):
with lock: # Acquire lock
counter += 1
# Lock automatically released
threads = [threading.Thread(target=increment_safe) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Counter: {counter}") # Exactly 1,000,000!
ThreadPoolExecutor - Better Threading
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
response = requests.get(url)
return url, len(response.content)
urls = [f'https://example.com/page{i}' for i in range(20)]
# Use ThreadPoolExecutor (cleaner API)
with ThreadPoolExecutor(max_workers=10) as executor:
# Submit all tasks
futures = [executor.submit(fetch_url, url) for url in urls]
# Get results as they complete
for future in futures:
url, size = future.result()
print(f"{url}: {size} bytes")
Multiprocessing: CPU-Bound Tasks
Basic Multiprocessing
from multiprocessing import Pool
import time
def train_model(model_id):
"""Simulate CPU-intensive training."""
result = 0
for i in range(10000000):
result += i ** 2
return f"Model {model_id} trained"
model_ids = list(range(8))
# Sequential (slow) - 8 cores sitting idle
start = time.time()
results = [train_model(mid) for mid in model_ids]
print(f"Sequential: {time.time() - start:.1f}s") # ~32s
# Multiprocessing (fast) - all cores working
start = time.time()
with Pool(8) as pool:
results = pool.map(train_model, model_ids)
print(f"Multiprocessing: {time.time() - start:.1f}s") # ~4s (8x faster!)
Sharing Data Between Processes
Processes don't share memory by default. Use special structures:
from multiprocessing import Process, Queue, Manager
# Using Queue for communication
def worker(queue, worker_id):
result = f"Worker {worker_id} result"
queue.put(result)
queue = Queue()
processes = [Process(target=worker, args=(queue, i)) for i in range(4)]
for p in processes: p.start()
for p in processes: p.join()
# Collect results
while not queue.empty():
print(queue.get())
# Using Manager for shared data structures
def process_data(shared_dict, key, value):
shared_dict[key] = value * 2
with Manager() as manager:
shared_dict = manager.dict()
processes = [
Process(target=process_data, args=(shared_dict, i, i*10))
for i in range(5)
]
for p in processes: p.start()
for p in processes: p.join()
print(dict(shared_dict)) # {0: 0, 1: 20, 2: 40, 3: 60, 4: 80}
ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import numpy as np
def process_chunk(data_chunk):
"""Process a chunk of data."""
return np.mean(data_chunk) * 2
# Large dataset
data = np.random.rand(10000000)
chunks = np.array_split(data, 8)
# Process chunks in parallel
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_chunk, chunks))
final_result = np.mean(results)
print(f"Result: {final_result}")
Asyncio: Modern Async/Await
Understanding Async/Await
Asyncio uses cooperative multitasking - functions voluntarily yield control:
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2) # Yield control during wait
print("Task 1 finished")
return "Result 1"
async def task2():
print("Task 2 started")
await asyncio.sleep(1) # Yield control during wait
print("Task 2 finished")
return "Result 2"
async def main():
# Run both tasks concurrently
results = await asyncio.gather(task1(), task2())
print(f"Results: {results}")
asyncio.run(main())
# Output:
# Task 1 started
# Task 2 started
# Task 2 finished (after 1s)
# Task 1 finished (after 2s)
# Results: ['Result 1', 'Result 2']
# Total time: 2s (not 3s!)
Web Scraping with Asyncio
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
data = await response.text()
return len(data)
async def main():
urls = [f'https://example.com/api{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
# Create all tasks
tasks = [fetch_data(session, url) for url in urls]
# Run all concurrently
results = await asyncio.gather(*tasks)
print(f"Downloaded {sum(results)} bytes from {len(urls)} URLs")
# Run async code
asyncio.run(main())
# 100 requests in ~1s instead of 100s!
Asyncio vs Threading for I/O
# Threading approach
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_sync(url):
return requests.get(url).text
with ThreadPoolExecutor(max_workers=50) as executor:
urls = [f'https://api.example.com/{i}' for i in range(100)]
results = executor.map(fetch_sync, urls)
# Asyncio approach (more efficient)
import asyncio
import aiohttp
async def fetch_async(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f'https://api.example.com/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Asyncio advantages:
- Lower memory overhead
- Can handle thousands of connections
- Better performance for I/O-bound tasks
- Single-threaded (no thread safety issues)
Real-World ML Example
from multiprocessing import Pool
from functools import partial
def process_batch(data_batch, model_params):
"""Process one batch of data."""
# Preprocess
processed = [preprocess(x) for x in data_batch]
# Predict
predictions = model.predict(processed)
return predictions
def parallel_batch_processing(data, model_params, n_workers=4):
"""Process data in parallel batches."""
# Split data into batches
batch_size = len(data) // n_workers
batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
# Process in parallel
with Pool(n_workers) as pool:
results = pool.map(
partial(process_batch, model_params=model_params),
batches
)
# Combine results
return [pred for batch in results for pred in batch]
# Usage
predictions = parallel_batch_processing(test_data, model_params, n_workers=8)
Common Pitfalls
Pitfall 1: Using Threading for CPU-Bound Work
# Wrong - threading doesn't help CPU-bound tasks!
import threading
def cpu_intensive():
return sum(i**2 for i in range(10000000))
threads = [threading.Thread(target=cpu_intensive) for _ in range(4)]
# No speedup due to GIL!
Pitfall 2: Forgetting to Join Processes/Threads
# Wrong - program exits before tasks finish
threads = [threading.Thread(target=work) for _ in range(10)]
for t in threads:
t.start()
# Missing: for t in threads: t.join()
Pitfall 3: Shared State Without Locks
# Wrong - race condition!
result = []
def append_items():
for i in range(1000):
result.append(i) # Not thread-safe!
threads = [threading.Thread(target=append_items) for _ in range(10)]
Pitfall 4: Too Many Workers
# Wrong - diminishing returns and overhead
with Pool(100) as pool: # Too many for 8-core CPU!
pool.map(cpu_work, data)
# Right - match CPU cores
import os
optimal_workers = os.cpu_count()
with Pool(optimal_workers) as pool:
pool.map(cpu_work, data)
Choosing the Right Approach
Decision Tree
Is your task CPU-bound (calculations, data processing)?
ββ YES β Use multiprocessing
ββ NO β Is it I/O-bound?
ββ YES β Do you need to make 1000s of requests?
β ββ YES β Use asyncio
β ββ NO β Use threading
ββ NO β Don't use concurrency
Performance Comparison
import time
# Test data
data = list(range(1000000))
# Sequential
start = time.time()
result = [x**2 for x in data]
print(f"Sequential: {time.time() - start:.2f}s")
# Threading (no help for CPU-bound!)
from concurrent.futures import ThreadPoolExecutor
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
result = list(executor.map(lambda x: x**2, data))
print(f"Threading: {time.time() - start:.2f}s") # Similar to sequential
# Multiprocessing (actual speedup!)
from concurrent.futures import ProcessPoolExecutor
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
result = list(executor.map(lambda x: x**2, data))
print(f"Multiprocessing: {time.time() - start:.2f}s") # 4x faster!
Best Practices
1. Use Context Managers
# Good - automatic cleanup
with Pool(4) as pool:
results = pool.map(work, data)
# Bad - manual cleanup
pool = Pool(4)
results = pool.map(work, data)
pool.close()
pool.join()
2. Profile Before Optimizing
Don't guess where bottlenecks areβmeasure them first using profiling tools!
import cProfile
def my_program():
# Your code here
pass
# Profile to find bottlenecks
cProfile.run('my_program()')
# Or use line_profiler for line-by-line analysis
3. Use ProcessPoolExecutor for Simplicity
# Simpler than Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
with ProcessPoolExecutor() as executor:
futures = [executor.submit(work, item) for item in data]
for future in as_completed(futures):
result = future.result()
process(result)
4. Handle Exceptions Properly
from concurrent.futures import ProcessPoolExecutor
def risky_work(x):
if x < 0:
raise ValueError("Negative value")
return x ** 2
with ProcessPoolExecutor() as executor:
futures = [executor.submit(risky_work, x) for x in data]
for future in futures:
try:
result = future.result()
except Exception as e:
print(f"Error: {e}")
5. Optimize Worker Count
import os
# For CPU-bound: use CPU count
cpu_workers = os.cpu_count()
# For I/O-bound: can use more
io_workers = cpu_workers * 5
# For asyncio: thousands possible
async_connections = 1000
Key Takeaways
- GIL limits threading - only useful for I/O-bound tasks
- Multiprocessing bypasses GIL - true parallelism for CPU-bound work
- Asyncio is most efficient for I/O - handles thousands of connections
- Choose based on bottleneck: CPU β multiprocessing, I/O β asyncio/threading
- Profile first - don't guess where the slowness is
- Use context managers - prevents resource leaks
- Match worker count to task type - CPU cores for CPU-bound, more for I/O-bound
Understanding concurrency transforms slow scripts into fast, scalable applications. Start with the decision tree, profile your code, and apply the right tool for your bottleneck!