This content originally appeared on DEV Community and was authored by Sushant Gaurav
In this article, we will go through some of the most commonly asked practice problems and interview questions related to asyncio.
Practice Problems
Write a simple async program that fetches multiple URLs in parallel.
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://example.com',
'https://httpbin.org/get',
'https://python.org'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for i, content in enumerate(results):
print(f"URL {urls[i]} returned {len(content)} characters")
asyncio.run(main())
Compare synchronous requests vs asyncio (time difference).
import time
import requests
import asyncio
import aiohttp
urls = [
'https://example.com',
'https://httpbin.org/get',
'https://python.org'
]
# Synchronous version
def sync_fetch():
for url in urls:
response = requests.get(url)
print(f"{url} fetched with {len(response.text)} characters")
# Async version
async def async_fetch():
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
for i, resp in enumerate(responses):
text = await resp.text()
print(f"{urls[i]} fetched with {len(text)} characters")
if __name__ == "__main__":
print("Running synchronous fetch")
start = time.perf_counter()
sync_fetch()
duration_sync = time.perf_counter() - start
print(f"Synchronous fetching took {duration_sync:.2f} seconds\n")
print("Running asynchronous fetch")
start = time.perf_counter()
asyncio.run(async_fetch())
duration_async = time.perf_counter() - start
print(f"Asynchronous fetching took {duration_async:.2f} seconds")
Create a toy chat server with asyncio streams.
import asyncio
clients = set()
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"New connection from {addr}")
clients.add(writer)
try:
while True:
data = await reader.readline()
if not data:
print(f"{addr} disconnected")
break
message = data.decode().strip()
print(f"Received from {addr}: {message}")
broadcast = f"{addr}: {message}\n"
for client in clients:
if client != writer:
client.write(broadcast.encode())
await client.drain()
except ConnectionResetError:
print(f"Connection reset by {addr}")
finally:
clients.remove(writer)
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f"Serving on {addr}")
async with server:
await server.serve_forever()
asyncio.run(main())
Create an async producer-consumer pipeline using asyncio.Queue.
import asyncio
import random
async def producer(queue: asyncio.Queue, n: int):
for i in range(n):
item = f"item-{i}"
await queue.put(item)
print(f"Produced {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(None) # Sentinel for consumer to stop
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed {item}")
await asyncio.sleep(random.uniform(0.2, 0.6))
async def main():
queue = asyncio.Queue()
n_items = 10
prod_task = asyncio.create_task(producer(queue, n_items))
cons_task = asyncio.create_task(consumer(queue))
await asyncio.gather(prod_task, cons_task)
asyncio.run(main())
Write a UDP echo server with asyncio.
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
print("UDP server is up")
def datagram_received(self, data, addr):
message = data.decode()
print(f"Received {message} from {addr}")
self.transport.sendto(data, addr) # Echo back
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Run for 1 hour
finally:
transport.close()
asyncio.run(main())
Write an async program that reads from multiple files concurrently using aiofiles.
import asyncio
import aiofiles
async def read_file(filename):
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
print(f"{filename}: {len(contents)} characters")
return contents
async def main():
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = [read_file(f) for f in files]
await asyncio.gather(*tasks)
asyncio.run(main())
Build an async retry mechanism for flaky network calls with exponential backoff.
import asyncio
import random
async def flaky_network_call():
if random.random() < 0.7:
raise Exception("Network failure!")
return "Success!"
async def retry(coro, retries=5, base_delay=1):
for attempt in range(1, retries + 1):
try:
result = await coro()
return result
except Exception as e:
print(f"Attempt {attempt} failed: {e}")
if attempt == retries:
raise
delay = base_delay * 2 ** (attempt - 1) # Exponential backoff
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
async def main():
try:
result = await retry(flaky_network_call)
print("Result:", result)
except Exception as e:
print("All attempts failed:", e)
asyncio.run(main())
Interview Questions
Explain the event loop clearly.
The event loop is the heart of asyncio. It is a single-threaded scheduler that manages and executes all asynchronous tasks (coroutines, futures, callbacks). It is a continuous loop that:
- Picks up ready tasks.
- Executes them until they hit an
await(pause point). - Switches to other ready tasks while waiting (I/O, timers, etc.).
- Resumes paused tasks when their awaited I/O completes.
Differences between asyncio and threading.
- asyncio achieves concurrency within one thread by cooperative multitasking.
- Threads achieve parallelism through OS-level scheduling but share GIL limitations.
| Feature |
asyncio (Coroutines) |
Threading |
|---|---|---|
| Type of multitasking | Cooperative | Preemptive |
| Scheduler | Event Loop | OS Scheduler |
| Switch Control | Voluntary (await) |
OS-forced |
| I/O behavior | Non-blocking (single thread) | Can block other threads |
| GIL effect | No issue for I/O tasks | GIL bottleneck for CPU tasks |
| Use case | High I/O workload (network, DB) | True parallelism for CPU tasks |
How does asyncio avoid GIL problems for I/O?
The Global Interpreter Lock (GIL) prevents multiple Python threads from executing Python bytecode simultaneously. However, asyncio bypasses this constraint for I/O-bound operations because it uses single-threaded non-blocking I/O.
- While waiting for I/O (like network/file read), coroutines yield control using
await, letting other coroutines run. - There is no need for multiple threads to achieve concurrency, so GIL is not a limiting factor.
- For CPU-heavy tasks, work can still be offloaded to a thread or process using:
await asyncio.to_thread(blocking_function)
# or
loop.run_in_executor(None, blocking_function, *args)
What is Structured concurrency (Trio/Curio as alternatives)?
Structured Concurrency is a design principle where the lifetime of child tasks is managed within the scope of their parent task. When the parent exits, all children are automatically cancelled or waited on. It makes sure that no background task leaks after the function returns, making code more predictable and reliable.
- In
asyncio, Python 3.11 introducedTaskGroupto support structured concurrency. - Each
TaskGroupensures cooperative error propagation:
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_data())
tg.create_task(process_data())
# all tasks complete or errors are grouped
Note: Other frameworks like Trio and Curio were early adopters of this model. They offer stricter and safer task hierarchies with automatic cleanup. Trio’s approach influenced the
TaskGroupdesign in Python 3.11.
Why can't the `await function() be there outside of an async function, but can be there inside another function?
Because await can only be used inside an active event loop, and that exists only within async def functions. Outside any async function, there is no running event loop, so Python raises SyntaxError:
- The correct way to run top-level async code is:
python async def main(): await asyncio.sleep(1) asyncio.run(main())
Here, asyncio.run() starts and stops the event loop for you, allowing await to work inside.
How does asyncio.gather() differ from asyncio.create_task()?
-
create_task()schedules coroutine execution and returns aTaskwhich must later be awaited. -
gather()runs multiple awaitables concurrently and waits for all to finish, returning results as a list.
How is exception handling done in asyncio tasks?
Exceptions raised inside async tasks are stored in the Task object.
-
await taskoradd_done_callbackcan be used to catch them. - With
TaskGroup, Python 3.11+ allows handling multiple exceptions as anExceptionGroup.
What is the difference between blocking, non-blocking, and asynchronous calls?
| Type | Description | Example |
|---|---|---|
| Blocking | Waits until task completes | time.sleep(2) |
| Non-blocking | Returns immediately, requires polling | Setting socket to non-blocking mode |
| Asynchronous | Schedules work, frees loop without blocking | await asyncio.sleep(2) |
How do you run blocking code inside asyncio?
Using thread or process executors:
python
result = await asyncio.to_thread(blocking_function)
Or using run_in_executor
python
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function)
This prevents blocking the single-threaded asyncio event loop.
What changes were made from Python 3.11 onward in asyncio?
- New
TaskGroupintroduced for structured concurrency. - New
asyncio.timeout()context manager replaced older timeout utilities. - Performance improvements to task scheduling and exception tracebacks.
- Removal of deprecated APIs, for example, old generator-based coroutines.
- Improvements for
asyncio.StreamWriter, connection handlers, and debug-level tracing.
What is the purpose of asyncio.run() and how is it different from await?
| Aspect | asyncio.run() |
await |
|---|---|---|
| Purpose | Starts and manages event loop | Waits for an awaitable inside running loop |
| Where used | Top-level code | Inside async functions |
| Creates loop | Yes | No |
| Typical use | asyncio.run(main()) |
await coro() |
Explain cancellation and cleanup in asyncio
- A running task can be cancelled using
task.cancel(). - The coroutine receives
asyncio.CancelledErrorand can perform cleanup in atry/except/finallyblock.python try: await asyncio.sleep(10) except asyncio.CancelledError: print("Task cancelled!") finally: close_connection()
What is the difference between coroutines, tasks, and futures
| Concept | Definition | Created via |
|---|---|---|
| Coroutine | Async function that can be awaited | async def |
| Task | Wrapped coroutine scheduled on event loop | asyncio.create_task() |
| Future | Low-level awaitable representing result placeholder | loop.create_future() |
Why use asyncio.gather() over sequential awaits?
Sequential:
python
await task1()
await task2()
Concurrent:
python
await asyncio.gather(task1(), task2())
What is cooperative multitasking, and how does asyncio implement it?
Cooperative multitasking means that tasks voluntarily yield control to allow others to run, instead of the operating system forcibly switching them (as in preemptive multitasking).
- In
asyncio, this happens whenever a coroutine usesawaiton a non-blocking operation. - The event loop then resumes another task that was waiting.
- Only one task runs at a time, but many can make progress concurrently.
If one coroutine runs CPU-bound code without await, it blocks the entire event loop.
Contrast asyncio.sleep() vs time.sleep() and their impact on concurrency.
| Function | Type | Behavior | Effect in Async Code |
|---|---|---|---|
time.sleep() |
Blocking | Halts the OS thread entirely for the duration | Stops everything in the event loop; no other coroutines can run |
asyncio.sleep() |
Non-blocking | Awaits asynchronously, yielding control | Keeps event loop responsive; other tasks can execute |
`python
time.sleep(2) # Bad: Freezes event loop
await asyncio.sleep(2) # Good: Non-blocking delay
`
One should never use blocking functions inside async code unless wrapped in asyncio.to_thread() or an executor.
When should you avoid using asyncio?
Avoid asyncio when:
- The tasks are CPU-bound (for example, image processing, computation-heavy loops). In such cases, threading or multiprocessing should be used.
- When there is a need for true parallelism on multiple CPU cores.
- When working with simple scripts or synchronous APIs, async adds unnecessary complexity.
Asyncio is best for network-bound or I/O-heavy programs with many simultaneous connections (HTTP clients, servers, sockets, etc.).
How are async context managers (async with) implemented internally?
An async context manager defines two special asynchronous methods:
-
__aenter__(self)— awaited when entering the block. -
__aexit__(self, exc_type, exc_val, exc_tb)— awaited when exiting, even if an exception occurs.
When this is written:
python
async with MyAsyncManager():
...
Python executes roughly this:
`python
manager = MyAsyncManager()
await manager.aenter()
try:
... # block code
finally:
await manager.aexit(None, None, None)
`
What are async iterators and how are they are different from normal ones?
- A normal iterator uses
__iter__()and__next__(). - An async iterator uses
__aiter__()and__anext__(), both of which can be asynchronous (useawait).
This allows iteration over asynchronous data streams.
Example:
`python
class AsyncCounter:
def init(self, n):
self.n = n
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.n:
raise StopAsyncIteration
await asyncio.sleep(1) # async wait
self.current += 1
return self.current
async def main():
async for i in AsyncCounter(3):
print(i)
`
Async iterators are perfect for consuming incoming data in real-time, like websockets, streams, or async file reads.
How does await asyncio.wait() differ from asyncio.gather()?
| Function | Purpose | Returns | Behavior |
|---|---|---|---|
asyncio.gather() |
Collect results of multiple awaitables | Ordered list of results | Cancels remaining tasks if one fails (unless return_exceptions=True) |
asyncio.wait() |
Monitor completion of multiple tasks | Two sets: done, pending
|
Gives more granular control (you decide how to process results) |
Example:
`python
gather — gets results in order
results = await asyncio.gather(task1(), task2())
wait — let handle the tasks as they are finished
done, pending = await asyncio.wait([task1(), task2()], return_when=asyncio.FIRST_COMPLETED)
`
Thanks for reading!
We have come a long way. Thanks for staying with my articles over the journey. We have covered GILs, Threading, Multithreading, Multiprocessing, and Asyncio in detail, along with their practice problems and interview questions.
Stay tuned for more articles on some new topics.
This content originally appeared on DEV Community and was authored by Sushant Gaurav
Sushant Gaurav | Sciencx (2025-11-04T04:30:00+00:00) Asyncio: Interview Questions and Practice Problems. Retrieved from https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.