Asyncio: Interview Questions and Practice Problems

In this article, we will go through some of the most commonly asked practice problems and interview questions related to asyncio.

Practice Problems

Write a simple async program that fetches multiple URLs in parallel.

import as…


This content originally appeared on DEV Community and was authored by Sushant Gaurav

In this article, we will go through some of the most commonly asked practice problems and interview questions related to asyncio.

Practice Problems

Write a simple async program that fetches multiple URLs in parallel.

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'https://example.com',
        'https://httpbin.org/get',
        'https://python.org'
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for i, content in enumerate(results):
            print(f"URL {urls[i]} returned {len(content)} characters")

asyncio.run(main())

Compare synchronous requests vs asyncio (time difference).

import time
import requests
import asyncio
import aiohttp

urls = [
    'https://example.com',
    'https://httpbin.org/get',
    'https://python.org'
]

# Synchronous version

def sync_fetch():
    for url in urls:
        response = requests.get(url)
        print(f"{url} fetched with {len(response.text)} characters")

# Async version

async def async_fetch():
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        for i, resp in enumerate(responses):
            text = await resp.text()
            print(f"{urls[i]} fetched with {len(text)} characters")

if __name__ == "__main__":
    print("Running synchronous fetch")
    start = time.perf_counter()
    sync_fetch()
    duration_sync = time.perf_counter() - start
    print(f"Synchronous fetching took {duration_sync:.2f} seconds\n")

    print("Running asynchronous fetch")
    start = time.perf_counter()
    asyncio.run(async_fetch())
    duration_async = time.perf_counter() - start
    print(f"Asynchronous fetching took {duration_async:.2f} seconds")

Create a toy chat server with asyncio streams.

import asyncio

clients = set()

async def handle_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"New connection from {addr}")
    clients.add(writer)
    try:
        while True:
            data = await reader.readline()
            if not data:
                print(f"{addr} disconnected")
                break
            message = data.decode().strip()
            print(f"Received from {addr}: {message}")
            broadcast = f"{addr}: {message}\n"
            for client in clients:
                if client != writer:
                    client.write(broadcast.encode())
                    await client.drain()
    except ConnectionResetError:
        print(f"Connection reset by {addr}")
    finally:
        clients.remove(writer)
        writer.close()
        await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
    addr = server.sockets[0].getsockname()
    print(f"Serving on {addr}")
    async with server:
        await server.serve_forever()

asyncio.run(main())

Create an async producer-consumer pipeline using asyncio.Queue.

import asyncio
import random

async def producer(queue: asyncio.Queue, n: int):
    for i in range(n):
        item = f"item-{i}"
        await queue.put(item)
        print(f"Produced {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    await queue.put(None)  # Sentinel for consumer to stop

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumed {item}")
        await asyncio.sleep(random.uniform(0.2, 0.6))

async def main():
    queue = asyncio.Queue()
    n_items = 10
    prod_task = asyncio.create_task(producer(queue, n_items))
    cons_task = asyncio.create_task(consumer(queue))
    await asyncio.gather(prod_task, cons_task)

asyncio.run(main())

Write a UDP echo server with asyncio.

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport
        print("UDP server is up")

    def datagram_received(self, data, addr):
        message = data.decode()
        print(f"Received {message} from {addr}")
        self.transport.sendto(data, addr)  # Echo back

async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))
    try:
        await asyncio.sleep(3600)  # Run for 1 hour
    finally:
        transport.close()

asyncio.run(main())

Write an async program that reads from multiple files concurrently using aiofiles.

import asyncio
import aiofiles

async def read_file(filename):
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
    print(f"{filename}: {len(contents)} characters")
    return contents

async def main():
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = [read_file(f) for f in files]
    await asyncio.gather(*tasks)

asyncio.run(main())

Build an async retry mechanism for flaky network calls with exponential backoff.

import asyncio
import random

async def flaky_network_call():
    if random.random() < 0.7:
        raise Exception("Network failure!")
    return "Success!"

async def retry(coro, retries=5, base_delay=1):
    for attempt in range(1, retries + 1):
        try:
            result = await coro()
            return result
        except Exception as e:
            print(f"Attempt {attempt} failed: {e}")
            if attempt == retries:
                raise
            delay = base_delay * 2 ** (attempt - 1)  # Exponential backoff
            print(f"Retrying in {delay} seconds...")
            await asyncio.sleep(delay)

async def main():
    try:
        result = await retry(flaky_network_call)
        print("Result:", result)
    except Exception as e:
        print("All attempts failed:", e)

asyncio.run(main())

Interview Questions

Explain the event loop clearly.

The event loop is the heart of asyncio. It is a single-threaded scheduler that manages and executes all asynchronous tasks (coroutines, futures, callbacks). It is a continuous loop that:

  1. Picks up ready tasks.
  2. Executes them until they hit an await (pause point).
  3. Switches to other ready tasks while waiting (I/O, timers, etc.).
  4. Resumes paused tasks when their awaited I/O completes.

Differences between asyncio and threading.

  • asyncio achieves concurrency within one thread by cooperative multitasking.
  • Threads achieve parallelism through OS-level scheduling but share GIL limitations.
Feature asyncio (Coroutines) Threading
Type of multitasking Cooperative Preemptive
Scheduler Event Loop OS Scheduler
Switch Control Voluntary (await) OS-forced
I/O behavior Non-blocking (single thread) Can block other threads
GIL effect No issue for I/O tasks GIL bottleneck for CPU tasks
Use case High I/O workload (network, DB) True parallelism for CPU tasks

How does asyncio avoid GIL problems for I/O?

The Global Interpreter Lock (GIL) prevents multiple Python threads from executing Python bytecode simultaneously. However, asyncio bypasses this constraint for I/O-bound operations because it uses single-threaded non-blocking I/O.

  • While waiting for I/O (like network/file read), coroutines yield control using await, letting other coroutines run.
  • There is no need for multiple threads to achieve concurrency, so GIL is not a limiting factor.
  • For CPU-heavy tasks, work can still be offloaded to a thread or process using:
  await asyncio.to_thread(blocking_function)
  # or
  loop.run_in_executor(None, blocking_function, *args)

What is Structured concurrency (Trio/Curio as alternatives)?

Structured Concurrency is a design principle where the lifetime of child tasks is managed within the scope of their parent task. When the parent exits, all children are automatically cancelled or waited on. It makes sure that no background task leaks after the function returns, making code more predictable and reliable.

  • In asyncio, Python 3.11 introduced TaskGroup to support structured concurrency.
  • Each TaskGroup ensures cooperative error propagation:
  async with asyncio.TaskGroup() as tg:
      tg.create_task(fetch_data())
      tg.create_task(process_data())
  # all tasks complete or errors are grouped

Note: Other frameworks like Trio and Curio were early adopters of this model. They offer stricter and safer task hierarchies with automatic cleanup. Trio’s approach influenced the TaskGroup design in Python 3.11.

Why can't the `await function() be there outside of an async function, but can be there inside another function?

Because await can only be used inside an active event loop, and that exists only within async def functions. Outside any async function, there is no running event loop, so Python raises SyntaxError:

  • The correct way to run top-level async code is: python async def main(): await asyncio.sleep(1) asyncio.run(main())

Here, asyncio.run() starts and stops the event loop for you, allowing await to work inside.

How does asyncio.gather() differ from asyncio.create_task()?

  • create_task() schedules coroutine execution and returns a Task which must later be awaited.
  • gather() runs multiple awaitables concurrently and waits for all to finish, returning results as a list.

How is exception handling done in asyncio tasks?

Exceptions raised inside async tasks are stored in the Task object.

  • await task or add_done_callback can be used to catch them.
  • With TaskGroup, Python 3.11+ allows handling multiple exceptions as an ExceptionGroup.

What is the difference between blocking, non-blocking, and asynchronous calls?

Type Description Example
Blocking Waits until task completes time.sleep(2)
Non-blocking Returns immediately, requires polling Setting socket to non-blocking mode
Asynchronous Schedules work, frees loop without blocking await asyncio.sleep(2)

How do you run blocking code inside asyncio?

Using thread or process executors:

python
result = await asyncio.to_thread(blocking_function)

Or using run_in_executor

python
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function)

This prevents blocking the single-threaded asyncio event loop.

What changes were made from Python 3.11 onward in asyncio?

  • New TaskGroup introduced for structured concurrency.
  • New asyncio.timeout() context manager replaced older timeout utilities.
  • Performance improvements to task scheduling and exception tracebacks.
  • Removal of deprecated APIs, for example, old generator-based coroutines.
  • Improvements for asyncio.StreamWriter, connection handlers, and debug-level tracing.

What is the purpose of asyncio.run() and how is it different from await?

Aspect asyncio.run() await
Purpose Starts and manages event loop Waits for an awaitable inside running loop
Where used Top-level code Inside async functions
Creates loop Yes No
Typical use asyncio.run(main()) await coro()

Explain cancellation and cleanup in asyncio

  • A running task can be cancelled using task.cancel().
  • The coroutine receives asyncio.CancelledError and can perform cleanup in a try/except/finally block. python try: await asyncio.sleep(10) except asyncio.CancelledError: print("Task cancelled!") finally: close_connection()

What is the difference between coroutines, tasks, and futures

Concept Definition Created via
Coroutine Async function that can be awaited async def
Task Wrapped coroutine scheduled on event loop asyncio.create_task()
Future Low-level awaitable representing result placeholder loop.create_future()

Why use asyncio.gather() over sequential awaits?

Sequential:

python
await task1()
await task2()

Concurrent:

python
await asyncio.gather(task1(), task2())

What is cooperative multitasking, and how does asyncio implement it?

Cooperative multitasking means that tasks voluntarily yield control to allow others to run, instead of the operating system forcibly switching them (as in preemptive multitasking).

  • In asyncio, this happens whenever a coroutine uses await on a non-blocking operation.
  • The event loop then resumes another task that was waiting.
  • Only one task runs at a time, but many can make progress concurrently.

If one coroutine runs CPU-bound code without await, it blocks the entire event loop.

Contrast asyncio.sleep() vs time.sleep() and their impact on concurrency.

Function Type Behavior Effect in Async Code
time.sleep() Blocking Halts the OS thread entirely for the duration Stops everything in the event loop; no other coroutines can run
asyncio.sleep() Non-blocking Awaits asynchronously, yielding control Keeps event loop responsive; other tasks can execute

`python
time.sleep(2) # Bad: Freezes event loop

await asyncio.sleep(2) # Good: Non-blocking delay
`

One should never use blocking functions inside async code unless wrapped in asyncio.to_thread() or an executor.

When should you avoid using asyncio?

Avoid asyncio when:

  • The tasks are CPU-bound (for example, image processing, computation-heavy loops). In such cases, threading or multiprocessing should be used.
  • When there is a need for true parallelism on multiple CPU cores.
  • When working with simple scripts or synchronous APIs, async adds unnecessary complexity.

Asyncio is best for network-bound or I/O-heavy programs with many simultaneous connections (HTTP clients, servers, sockets, etc.).

How are async context managers (async with) implemented internally?

An async context manager defines two special asynchronous methods:

  • __aenter__(self) — awaited when entering the block.
  • __aexit__(self, exc_type, exc_val, exc_tb) — awaited when exiting, even if an exception occurs.

When this is written:

python
async with MyAsyncManager():
...

Python executes roughly this:

`python
manager = MyAsyncManager()
await manager.aenter()

try:
... # block code
finally:
await manager.aexit(None, None, None)
`

What are async iterators and how are they are different from normal ones?

  • A normal iterator uses __iter__() and __next__().
  • An async iterator uses __aiter__() and __anext__(), both of which can be asynchronous (use await).

This allows iteration over asynchronous data streams.

Example:

`python
class AsyncCounter:
def init(self, n):
self.n = n
self.current = 0

def __aiter__(self):
    return self

async def __anext__(self):
    if self.current >= self.n:
        raise StopAsyncIteration

    await asyncio.sleep(1)  # async wait
    self.current += 1
    return self.current

async def main():
async for i in AsyncCounter(3):
print(i)
`

Async iterators are perfect for consuming incoming data in real-time, like websockets, streams, or async file reads.

How does await asyncio.wait() differ from asyncio.gather()?

Function Purpose Returns Behavior
asyncio.gather() Collect results of multiple awaitables Ordered list of results Cancels remaining tasks if one fails (unless return_exceptions=True)
asyncio.wait() Monitor completion of multiple tasks Two sets: done, pending Gives more granular control (you decide how to process results)

Example:

`python

gather — gets results in order

results = await asyncio.gather(task1(), task2())

wait — let handle the tasks as they are finished

done, pending = await asyncio.wait([task1(), task2()], return_when=asyncio.FIRST_COMPLETED)
`

Thanks for reading!

We have come a long way. Thanks for staying with my articles over the journey. We have covered GILs, Threading, Multithreading, Multiprocessing, and Asyncio in detail, along with their practice problems and interview questions.

Stay tuned for more articles on some new topics.


This content originally appeared on DEV Community and was authored by Sushant Gaurav


Print Share Comment Cite Upload Translate Updates
APA

Sushant Gaurav | Sciencx (2025-11-04T04:30:00+00:00) Asyncio: Interview Questions and Practice Problems. Retrieved from https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/

MLA
" » Asyncio: Interview Questions and Practice Problems." Sushant Gaurav | Sciencx - Tuesday November 4, 2025, https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/
HARVARD
Sushant Gaurav | Sciencx Tuesday November 4, 2025 » Asyncio: Interview Questions and Practice Problems., viewed ,<https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/>
VANCOUVER
Sushant Gaurav | Sciencx - » Asyncio: Interview Questions and Practice Problems. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/
CHICAGO
" » Asyncio: Interview Questions and Practice Problems." Sushant Gaurav | Sciencx - Accessed . https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/
IEEE
" » Asyncio: Interview Questions and Practice Problems." Sushant Gaurav | Sciencx [Online]. Available: https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/. [Accessed: ]
rf:citation
» Asyncio: Interview Questions and Practice Problems | Sushant Gaurav | Sciencx | https://www.scien.cx/2025/11/04/asyncio-interview-questions-and-practice-problems/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.