Python Concurrency: Threading, Multiprocessing, and Asyncio Guide
Master Python concurrency for faster programs. Learn threading vs multiprocessing, asyncio, GIL limitations, and parallel processing for data science and ML

When My Script Took 10 Hours
I was training 10 ML models sequentially:
for model in models:
model.train(X, y) # 1 hour each = 10 hours total!
Then I discovered multiprocessing:
from multiprocessing import Pool
with Pool(10) as pool:
pool.map(train_model, models) # All at once = 1 hour total!
10x speedup! Concurrency changed everything.
Threading vs Multiprocessing vs Asyncio
| Feature | Threading | Multiprocessing | Asyncio |
|---|---|---|---|
| Best for | I/O bound | CPU bound | I/O bound |
| Parallelism | No (GIL) | Yes | No |
| Memory | Shared | Separate | Shared |
| Speed | Medium | Fast | Very fast |
| Overhead | Low | High | Very low |
| Complexity | Medium | Low | High |
When to Use Each
Threading:
- Downloading files from multiple URLs
- Making multiple API calls
- Database queries
- Reading/writing multiple files
- Network operations with delays
Multiprocessing:
- ML model training
- Large-scale data processing
- Image/video processing
- Scientific computations
- Any CPU-intensive calculation
Asyncio:
- Web scraping thousands of pages
- Handling many API requests
- Real-time data streaming
- WebSocket connections
- High-performance web servers
Understanding the Global Interpreter Lock (GIL)
The GIL is Python's biggest concurrency limitation:
import threading
import time
# CPU-bound function
def cpu_work():
result = 0
for i in range(10000000):
result += i ** 2
return result
# Test with threading (doesn't help for CPU-bound!)
start = time.time()
threads = [threading.Thread(target=cpu_work) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s") # ~10s (no speedup!)
# Test with multiprocessing (actually parallel!)
from multiprocessing import Pool
start = time.time()
with Pool(4) as pool:
pool.map(cpu_work, range(4))
print(f"Multiprocessing: {time.time() - start:.2f}s") # ~2.5s (4x faster!)
Why? The GIL allows only one thread to execute Python bytecode at a time, even on multi-core CPUs. Multiprocessing bypasses this by using separate processes.
Threading: I/O-Bound Tasks
Basic Threading Example
import threading
import time
import requests
def download_file(url):
print(f"Downloading {url}")
response = requests.get(url)
print(f"Finished {url}")
return len(response.content)
urls = [
'https://example.com/data1.csv',
'https://example.com/data2.csv',
'https://example.com/data3.csv',
]
# Sequential (slow) - 3 seconds per file = 9 seconds total
start = time.time()
for url in urls:
download_file(url)
print(f"Sequential: {time.time() - start:.1f}s") # ~9s
# Threading (fast) - all at once = 3 seconds total
start = time.time()
threads = []
for url in urls:
thread = threading.Thread(target=download_file, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all to finish
print(f"Threading: {time.time() - start:.1f}s") # ~3s (3x faster!)
Thread Safety and Locks
When threads share data, you need synchronization:
import threading
# Unsafe - race condition!
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # Not atomic!
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Counter: {counter}") # Should be 1,000,000 but isn't!
# Safe - using lock
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100000):
with lock: # Acquire lock
counter += 1
# Lock automatically released
threads = [threading.Thread(target=increment_safe) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Counter: {counter}") # Exactly 1,000,000!
ThreadPoolExecutor - Better Threading
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
response = requests.get(url)
return url, len(response.content)
urls = [f'https://example.com/page{i}' for i in range(20)]
# Use ThreadPoolExecutor (cleaner API)
with ThreadPoolExecutor(max_workers=10) as executor:
# Submit all tasks
futures = [executor.submit(fetch_url, url) for url in urls]
# Get results as they complete
for future in futures:
url, size = future.result()
print(f"{url}: {size} bytes")
Multiprocessing: CPU-Bound Tasks
Basic Multiprocessing
from multiprocessing import Pool
import time
def train_model(model_id):
"""Simulate CPU-intensive training."""
result = 0
for i in range(10000000):
result += i ** 2
return f"Model {model_id} trained"
model_ids = list(range(8))
# Sequential (slow) - 8 cores sitting idle
start = time.time()
results = [train_model(mid) for mid in model_ids]
print(f"Sequential: {time.time() - start:.1f}s") # ~32s
# Multiprocessing (fast) - all cores working
start = time.time()
with Pool(8) as pool:
results = pool.map(train_model, model_ids)
print(f"Multiprocessing: {time.time() - start:.1f}s") # ~4s (8x faster!)
Sharing Data Between Processes
Processes don't share memory by default. Use special structures:
from multiprocessing import Process, Queue, Manager
# Using Queue for communication
def worker(queue, worker_id):
result = f"Worker {worker_id} result"
queue.put(result)
queue = Queue()
processes = [Process(target=worker, args=(queue, i)) for i in range(4)]
for p in processes: p.start()
for p in processes: p.join()
# Collect results
while not queue.empty():
print(queue.get())
# Using Manager for shared data structures
def process_data(shared_dict, key, value):
shared_dict[key] = value * 2
with Manager() as manager:
shared_dict = manager.dict()
processes = [
Process(target=process_data, args=(shared_dict, i, i*10))
for i in range(5)
]
for p in processes: p.start()
for p in processes: p.join()
print(dict(shared_dict)) # {0: 0, 1: 20, 2: 40, 3: 60, 4: 80}
ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import numpy as np
def process_chunk(data_chunk):
"""Process a chunk of data."""
return np.mean(data_chunk) * 2
# Large dataset
data = np.random.rand(10000000)
chunks = np.array_split(data, 8)
# Process chunks in parallel
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_chunk, chunks))
final_result = np.mean(results)
print(f"Result: {final_result}")
Asyncio: Modern Async/Await
Understanding Async/Await
Asyncio uses cooperative multitasking - functions voluntarily yield control:
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2) # Yield control during wait
print("Task 1 finished")
return "Result 1"
async def task2():
print("Task 2 started")
await asyncio.sleep(1) # Yield control during wait
print("Task 2 finished")
return "Result 2"
async def main():
# Run both tasks concurrently
results = await asyncio.gather(task1(), task2())
print(f"Results: {results}")
asyncio.run(main())
# Output:
# Task 1 started
# Task 2 started
# Task 2 finished (after 1s)
# Task 1 finished (after 2s)
# Results: ['Result 1', 'Result 2']
# Total time: 2s (not 3s!)
Web Scraping with Asyncio
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
data = await response.text()
return len(data)
async def main():
urls = [f'https://example.com/api{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
# Create all tasks
tasks = [fetch_data(session, url) for url in urls]
# Run all concurrently
results = await asyncio.gather(*tasks)
print(f"Downloaded {sum(results)} bytes from {len(urls)} URLs")
# Run async code
asyncio.run(main())
# 100 requests in ~1s instead of 100s!
Asyncio vs Threading for I/O
# Threading approach
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_sync(url):
return requests.get(url).text
with ThreadPoolExecutor(max_workers=50) as executor:
urls = [f'https://api.example.com/{i}' for i in range(100)]
results = executor.map(fetch_sync, urls)
# Asyncio approach (more efficient)
import asyncio
import aiohttp
async def fetch_async(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f'https://api.example.com/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Asyncio advantages:
- Lower memory overhead
- Can handle thousands of connections
- Better performance for I/O-bound tasks
- Single-threaded (no thread safety issues)
Real-World ML Example
from multiprocessing import Pool
from functools import partial
def process_batch(data_batch, model_params):
"""Process one batch of data."""
# Preprocess
processed = [preprocess(x) for x in data_batch]
# Predict
predictions = model.predict(processed)
return predictions
def parallel_batch_processing(data, model_params, n_workers=4):
"""Process data in parallel batches."""
# Split data into batches
batch_size = len(data) // n_workers
batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
# Process in parallel
with Pool(n_workers) as pool:
results = pool.map(
partial(process_batch, model_params=model_params),
batches
)
# Combine results
return [pred for batch in results for pred in batch]
# Usage
predictions = parallel_batch_processing(test_data, model_params, n_workers=8)
Common Pitfalls
Pitfall 1: Using Threading for CPU-Bound Work
# Wrong - threading doesn't help CPU-bound tasks!
import threading
def cpu_intensive():
return sum(i**2 for i in range(10000000))
threads = [threading.Thread(target=cpu_intensive) for _ in range(4)]
# No speedup due to GIL!
Pitfall 2: Forgetting to Join Processes/Threads
# Wrong - program exits before tasks finish
threads = [threading.Thread(target=work) for _ in range(10)]
for t in threads:
t.start()
# Missing: for t in threads: t.join()
Pitfall 3: Shared State Without Locks
# Wrong - race condition!
result = []
def append_items():
for i in range(1000):
result.append(i) # Not thread-safe!
threads = [threading.Thread(target=append_items) for _ in range(10)]
Pitfall 4: Too Many Workers
# Wrong - diminishing returns and overhead
with Pool(100) as pool: # Too many for 8-core CPU!
pool.map(cpu_work, data)
# Right - match CPU cores
import os
optimal_workers = os.cpu_count()
with Pool(optimal_workers) as pool:
pool.map(cpu_work, data)
Choosing the Right Approach
Decision Tree
Is your task CPU-bound (calculations, data processing)?
ββ YES β Use multiprocessing
ββ NO β Is it I/O-bound?
ββ YES β Do you need to make 1000s of requests?
β ββ YES β Use asyncio
β ββ NO β Use threading
ββ NO β Don't use concurrency
Performance Comparison
import time
# Test data
data = list(range(1000000))
# Sequential
start = time.time()
result = [x**2 for x in data]
print(f"Sequential: {time.time() - start:.2f}s")
# Threading (no help for CPU-bound!)
from concurrent.futures import ThreadPoolExecutor
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
result = list(executor.map(lambda x: x**2, data))
print(f"Threading: {time.time() - start:.2f}s") # Similar to sequential
# Multiprocessing (actual speedup!)
from concurrent.futures import ProcessPoolExecutor
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
result = list(executor.map(lambda x: x**2, data))
print(f"Multiprocessing: {time.time() - start:.2f}s") # 4x faster!
Best Practices
1. Use Context Managers
# Good - automatic cleanup
with Pool(4) as pool:
results = pool.map(work, data)
# Bad - manual cleanup
pool = Pool(4)
results = pool.map(work, data)
pool.close()
pool.join()
2. Profile Before Optimizing
Don't guess where bottlenecks areβmeasure them first using profiling tools!
import cProfile
def my_program():
# Your code here
pass
# Profile to find bottlenecks
cProfile.run('my_program()')
# Or use line_profiler for line-by-line analysis
3. Use ProcessPoolExecutor for Simplicity
# Simpler than Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
with ProcessPoolExecutor() as executor:
futures = [executor.submit(work, item) for item in data]
for future in as_completed(futures):
result = future.result()
process(result)
4. Handle Exceptions Properly
from concurrent.futures import ProcessPoolExecutor
def risky_work(x):
if x < 0:
raise ValueError("Negative value")
return x ** 2
with ProcessPoolExecutor() as executor:
futures = [executor.submit(risky_work, x) for x in data]
for future in futures:
try:
result = future.result()
except Exception as e:
print(f"Error: {e}")
5. Optimize Worker Count
import os
# For CPU-bound: use CPU count
cpu_workers = os.cpu_count()
# For I/O-bound: can use more
io_workers = cpu_workers * 5
# For asyncio: thousands possible
async_connections = 1000
Key Takeaways
- GIL limits threading - only useful for I/O-bound tasks
- Multiprocessing bypasses GIL - true parallelism for CPU-bound work
- Asyncio is most efficient for I/O - handles thousands of connections
- Choose based on bottleneck: CPU β multiprocessing, I/O β asyncio/threading
- Profile first - don't guess where the slowness is
- Use context managers - prevents resource leaks
- Match worker count to task type - CPU cores for CPU-bound, more for I/O-bound
Understanding concurrency transforms slow scripts into fast, scalable applications. Start with the decision tree, profile your code, and apply the right tool for your bottleneck!
If this guide helped you understand concurrency, I'd love to hear about it! Connect with me on Twitter or LinkedIn.
Support My Work
If this guide helped you understand Python concurrency, choose between threading and multiprocessing, or build faster parallel programs, I'd really appreciate your support! Creating comprehensive tutorials on advanced Python concepts like this takes significant time and effort. Your support helps me continue sharing knowledge and creating more helpful resources for Python developers.
β Buy me a coffee - Every contribution, big or small, means the world to me and keeps me motivated to create more content!
Cover image by Ralfs Blumbergs on Unsplash