Python Performance & Profiling
Apply your skills with a real-world coding challenge. Try to solve it yourself first!
Here is a practical Coding Challenge based on concepts taught in Python Performance & Profiling materials.
Coding Challenge: The Bottleneck Detective and the High-Speed Pipeline
Problem Description Imagine you have just inherited the backend code for a massive data aggregation dashboard. A system is designed to load millions for user profiles fetch their profile images out of a third-party API, and run the heavy mathematical graphical filter on the images before saving them.
Currently the application feels incredibly sluggish and takes minutes for run. Instead of guessing the problem you ran cProfile (the "Macro Detective") and discovered three critical bottlenecks:
1. Memory Exhaustion: The system is choking on memory overhead because standard Python classes are creating millions of fat, hidden __dict__ structures.
2. I/O Freezes: A CPU is sitting idle because program is simply making synchronous, blocking API calls to download images one by one.
3. CPU Lock: The heavy graphical filters are severely bottlenecked by Python's Global Interpreter Lock (GIL), preventing true multi-core processing.
Your challenge is to refactor legacy pipeline using modern, production-grade architectural fixes. You've got to eliminate the memory overhead using Python 3.10+ dataclass optimizations, prevent I/O freezing using asynchronous threads. Unlock true CPU parallelism by bypassing the GIL to the heavy mathematical calculations.
Difficulty Level: Advanced
Input & Output Specifications
* Input:
* user_data: THE large list of dictionaries containing raw user info (e.g., {"name": "Alice", "id": 1, "image_url": "http://api.example.com/img1"}).
* Output:
* Returns a fully processed list of optimized User dataclass instances.
* The User objects must not contain the __dict__ attribute.
* The overall execution time must be mathematically proven to be drastically lower than a synchronous legacy code (which can be measured using timeit).
Starter Code Boilerplate
import time
import asyncio
from dataclasses import dataclass
import multiprocessing
# --- LEGACY SLOW CODE (For Reference) ---
class LegacyUser:
def __init__(self, name, user_id, image_url):
self.name = name
self.user_id = user_id
self.image_url = image_url
self.filtered_image = None
def download_image_sync(url):
time.sleep(0.1) # Simulates a slow, blocking I/O network request
return f"raw_data_for_{url}"
def apply_heavy_filter(image_data, intensity):
time.sleep(0.2) # Simulates heavy CPU-bound mathematical calculations
return f"filtered_{image_data}_at_{intensity}"
# --- YOUR CHALLENGE: REFACTOR THE PIPELINE ---
# 1. Optimize this Dataclass to eliminate the __dict__ trap
@dataclass
class User:
name: str
user_id: int
image_url: str
filtered_image: str = None
# 2. Fix the I/O Bottleneck: Convert this to safely wrap the blocking download
async def fetch_all_images(users):
# TODO: Use asyncio to run 'download_image_sync' in separate threads
pass
# 3. Fix the CPU Bottleneck: Bypass the GIL for heavy calculations
def process_images_parallel(image_payloads):
# TODO: Use multiprocessing.Pool and starmap to process filters in parallel
pass
# Main execution pipeline
async def main_pipeline(raw_data):
# 1. Create optimized User objects
# 2. Fetch images concurrently (I/O Bound fix)
# 3. Apply heavy filters in parallel (CPU Bound fix)
return [] # Return the processed User objects
if __name__ == '__main__':
# Try running cProfile or timeit on your finished pipeline!
pass
Hints
* Fixing Memory Overhead: By default, classes store variables in a heavy hidden dictionary. Towards fix this add the slots=True parameter directly into your @dataclass decorator, while this rigidly locks down the exact memory spaces needed and deletes the dictionary entirely.
* Fixing Network Waiting: download_image_sync is a blocking function. Inside your async loop, use the cutting-edge asyncio.to_thread(download_image_sync, url) function; gather these tasks using asyncio.gather() to execute them without freezing the main thread.
* Fixing Heavy Calculations: You cannot use standard multithreading for CPU-bound tasks because of a GIL. Use multiprocessing.Pool() combined with pool.starmap() to effortlessly execute the apply_heavy_filter target function across multiple CPU cores by passing a list of tuples (image_data, intensity).
* Micro-Benchmarking: If you want to verify your speed improvements, use a timeit module instead of time.time(). timeit temporarily disables Python's cyclic garbage collector for give you highly precise mathematically average execution times without background OS noise.
Test Cases
- Test Case 1 (Memory Optimization Check):
- Input:
user = User(name="Test", user_id=1, image_url="url") -
Expected Behavior: Calling
hasattr(user, '__dict__')must returnFalseproving a fat hidden dictionary has been successfully destroyed. -
Test Case 2 (I/O Concurrency Validation):
- Input: Fetching 10 images by an artificial
0.1ssleep time. -
Expected Behavior: The total download phase should take just over
0.1s(run concurrently) rather than the1.0sit would really take synchronously, proving the event loop successfully yielded control via threaded operations. -
Test Case 3 (CPU Parallelism Validation):
- Input: Passing 4 images to
process_images_parallelwith a0.2sCPU sleep time. - Expected Behavior: Utilizing
multiprocessing.Pool, a total execution time should ideally be around0.2s(if running upon a 4+ core machine) rather than an0.8sexpected from a GIL-locked sequential execution.
Verify Your Solution
Write your solution in the compiler, run it to verify output, then click below to verify.