This content originally appeared on DEV Community and was authored by Seenivasa Ramadurai
In this blog, we’ll explore how to build a Human-in-the-Loop (HITL) workflow using the Microsoft Agent FrameworkThis example demonstrates a user search workflow where AI and humans collaborate interactively the AI performs the initial search, while a human reviews, selects, and guides the process.
What is Human-in-the-Loop?
Human-in-the-Loop (HITL) is a powerful design approach where AI systems work hand-in-hand with humans to make critical decisions, verify results, and guide workflows.
Instead of relying solely on automated decisions, HITL ensures that human judgment and AI intelligence complement each other.
Here’s how it works in this example:
- AI performs an initial search to find potential users.
- Human selects the most relevant result.
- AI performs a detailed search on the selected user.
- Human reviews the final, formatted output.
This interaction loop ensures high-quality results with human oversight and AI efficiency ideal for use cases like recruitment, research, or business intelligence.
Key Components
Component Description
RequestInfoMessage - Defines typed request structures for human input.
RequestInfoExecutor - Coordinates between the workflow and human input requests.
RequestInfoEvent. - Handles asynchronous, event-driven responses from humans.
WorkflowBuilder - Constructs the workflow graph and connects executors.
Executor - Core processing units that handle messages and execute business logic.
By combining these, the workflow achieves type safety, pause-resume functionality, and full event traceability.
Request-Response Flow Explained
Let’s walk through how a Human-in-the-Loop flow operates in the Microsoft Agent Framework.
1. Workflow Initialization: The system starts by requesting a user’s name.
2. Request Generation: A RequestInfoExecutor emits a RequestInfoEvent asking for human input.
3. Workflow Pause: The process waits indefinitely until the human responds.
4. Human Response: The user provides input via send_responses_streaming().
5. Workflow Resumption: The system continues from the paused state using the response.
6. Detailed Search: AI executes a focused search for the selected profile.
7. Output Generation: The workflow yields formatted, detailed results.
This model allows interactive, event-driven collaboration between humans and agents.
Implementation Details
import asyncio
import os
from dotenv import load_dotenv
from agent_framework import (
Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent,
RequestInfoEvent, WorkflowStatusEvent, WorkflowRunState, handler,
RequestInfoExecutor, RequestInfoMessage, RequestResponse
)
from typing_extensions import Never
import requests
from pydantic import BaseModel
# Load environment variables
load_dotenv()
# Data classes following Microsoft Agent Framework patterns exactly
class UserSearchRequest(RequestInfoMessage):
"""Request for user search input."""
def __init__(self, prompt: str):
super().__init__()
self.prompt = prompt
class UserSelectionRequest(RequestInfoMessage):
"""Request for user selection from discovered users."""
def __init__(self, prompt: str, users: list):
super().__init__()
self.prompt = prompt
self.users = users
class SearchData(BaseModel):
"""Search data model."""
user_name: str
title_place: str
search_type: str
class SearchResults(BaseModel):
"""Search results model."""
summary: str
users: list
class TurnManager(Executor):
"""Coordinates the workflow between AI search and human input following Microsoft Agent Framework."""
def __init__(self, id: str = None):
super().__init__(id=id or "turn_manager")
self.current_search_data = None
@handler
async def start(self, _: str, ctx: WorkflowContext[UserSearchRequest]) -> None:
"""Start the workflow by requesting user name."""
print("🔍 User Search Workflow - Human in the Loop")
print("=" * 60)
print("-" * 60)
prompt = "Enter the name of the person to search for:"
await ctx.send_message(UserSearchRequest(prompt=prompt))
@handler
async def on_user_name_response(self, user_name: RequestResponse[UserSearchRequest, str], ctx: WorkflowContext[SearchData]) -> None:
"""Handle user name input and start search."""
name = user_name.data.strip() if user_name.data else ""
if not name:
await ctx.yield_output("❌ No name provided. Exiting.")
return
print(f"\n✅ User name: {name}")
# Get title/place and search type from user input
print("\n🏢 Additional information (optional but recommended for better results):")
title_place = input("👤 Enter title/designation and place/city (e.g., 'Software Engineer, San Francisco'): ").strip()
print("\n🔍 Search type options:")
print("1. professional - LinkedIn, executive profiles (DEFAULT)")
print("2. social - Twitter, Instagram, Facebook")
print("3. comprehensive - All sources")
search_type = input("👤 Enter search type (professional/social/comprehensive) [default: professional]: ").strip().lower()
if not search_type or search_type not in ["professional", "social", "comprehensive"]:
search_type = "professional"
print(f"🔄 Using default search type: {search_type} (LinkedIn focus)")
print(f"✅ Title/Place: {title_place}")
print(f"✅ Search type: {search_type}")
# Store search data for later use
self.current_search_data = SearchData(
user_name=name,
title_place=title_place,
search_type=search_type
)
await ctx.send_message(self.current_search_data)
@handler
async def on_search_results(self, search_results: SearchResults, ctx: WorkflowContext[UserSelectionRequest]) -> None:
"""Handle search completion and request user selection."""
print(f"\n🔍 Search completed: {search_results.summary}")
users = search_results.users
print(f"👥 Found {len(users)} potential matches:")
print("=" * 70)
for i, user in enumerate(users, 1):
print(f"\n{i}. 👤 {user['name']}")
print(f" 📋 {user['title']}")
print(f" 🔗 {user['url']}")
print(f" 📝 {user['content']}")
print(f" ⭐ Score: {user['score']}")
print("-" * 50)
# Create user selection request
prompt = f"Which user would you like to search for in detail? Enter a number (1-{len(users)}) or 'all' to search all users:"
user_selection_request = UserSelectionRequest(prompt=prompt, users=users)
await ctx.send_message(user_selection_request)
@handler
async def on_user_selection_response(self, selection_data: RequestResponse[UserSelectionRequest, str], ctx: WorkflowContext[WorkflowOutputEvent]) -> None:
"""Handle user selection and perform detailed search with formatted output."""
user_input = selection_data.data.strip().lower() if selection_data.data else ""
users = selection_data.original_request.users
print(f"\n🔍 Processing selection: {user_input}")
if user_input == "all":
# Search all users
print("🔄 Searching for all users...")
selected_users = users
else:
try:
selection = int(user_input)
if 1 <= selection <= len(users):
selected_user = users[selection - 1]
print(f"✅ Selected: {selected_user['name']}")
selected_users = [selected_user]
else:
await ctx.yield_output(f"❌ Invalid selection: {user_input}")
return
except ValueError:
await ctx.yield_output(f"❌ Invalid input: {user_input}")
return
# Perform detailed search for selected users
for user in selected_users:
await self._perform_detailed_search(user)
result_text = f"✅ Detailed search completed for {len(selected_users)} user(s)."
await ctx.yield_output(result_text)
async def _perform_detailed_search(self, selected_user):
"""Perform detailed search for the selected user and display formatted results."""
print(f"\n🔍 Performing detailed search for: {selected_user['name']}")
try:
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
raise ValueError("TAVILY_API_KEY not found")
# Create detailed search query focused on the selected user
user_name = selected_user['name']
search_type = self.current_search_data.search_type if self.current_search_data else 'professional'
if search_type == "professional":
search_query = f'"{user_name}" site:linkedin.com'
elif search_type == "social":
search_query = f'"{user_name}" site:twitter.com OR site:instagram.com OR site:facebook.com'
else: # comprehensive
search_query = f'"{user_name}"'
print(f"🔍 Detailed search query: {search_query}")
# Make API request
response = requests.post(
"https://api.tavily.com/search",
json={
"api_key": api_key,
"query": search_query,
"search_depth": "basic",
"include_answer": False,
"max_results": 5
},
timeout=10
)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
# Display detailed results with formatting
print(f"\n📋 Detailed Search Results for {user_name}")
print("=" * 70)
print(f"📊 Found {len(results)} detailed results")
print("-" * 70)
if results:
for i, result in enumerate(results, 1):
title = result.get('title', 'No title')
url = result.get('url', 'No URL')
content = result.get('content', 'No content')
score = result.get('score', 'N/A')
# Clean up content
content = content.replace('\n', ' ').strip()
if len(content) > 250:
content = content[:250] + "..."
print(f"\n{i}. {title}")
print(f" 🔗 {url}")
print(f" 📝 {content}")
print(f" ⭐ Score: {score}")
print("-" * 50)
else:
print("❌ No detailed results found")
print("\n" + "=" * 70)
except Exception as e:
print(f"❌ Error performing detailed search: {e}")
class UserSearchExecutor(Executor):
"""Performs web search for a user using Tavily API."""
def __init__(self, id: str = None):
super().__init__(id=id or "user_search")
@handler
async def handle(self, search_data: SearchData, ctx: WorkflowContext[SearchResults]) -> None:
"""Handle user search request."""
print(f"🔍 Searching for: {search_data.user_name}")
if search_data.title_place:
print(f"🏢 Title/Place: {search_data.title_place}")
print(f"📊 Search type: {search_data.search_type}")
try:
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
raise ValueError("TAVILY_API_KEY not found in environment variables")
# Create search query using name and title/place
if search_data.title_place:
if search_data.search_type == "professional":
search_query = f'"{search_data.user_name}" "{search_data.title_place}" site:linkedin.com'
elif search_data.search_type == "social":
search_query = f'"{search_data.user_name}" "{search_data.title_place}" site:twitter.com OR site:instagram.com OR site:facebook.com'
else: # comprehensive
search_query = f'"{search_data.user_name}" "{search_data.title_place}"'
else:
if search_data.search_type == "professional":
search_query = f'"{search_data.user_name}" site:linkedin.com'
elif search_data.search_type == "social":
search_query = f'"{search_data.user_name}" site:twitter.com OR site:instagram.com OR site:facebook.com'
else: # comprehensive
search_query = f'"{search_data.user_name}"'
print(f"🔍 Search query: {search_query}")
# Make API request
response = requests.post(
"https://api.tavily.com/search",
json={
"api_key": api_key,
"query": search_query,
"search_depth": "basic",
"include_answer": False,
"max_results": 10
},
timeout=10
)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
# Process results and extract users
users = []
for result in results:
users.append({
'name': search_data.user_name,
'title': result.get('title', 'No title'),
'url': result.get('url', 'No URL'),
'content': result.get('content', 'No content')[:200],
'score': result.get('score', 'N/A')
})
# Create summary
summary = f"Found {len(results)} results for {search_data.user_name}"
print(f"✅ {summary}")
# Create search results
search_results = SearchResults(
summary=summary,
users=users
)
await ctx.send_message(search_results)
except Exception as e:
print(f"❌ Error performing search: {e}")
error_msg = f"Error searching for {search_data.user_name}: {str(e)}"
search_results = SearchResults(
summary=error_msg,
users=[]
)
await ctx.send_message(search_results)
async def run_interactive_workflow(workflow):
"""Run the workflow with proper RequestInfoEvent handling event driven architecture"""
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None
print("🎯 User Search Game")
print("I'll help you search for users and let you select which one to explore!")
print("-" * 50)
while not completed:
# First iteration uses run_stream("start")
# Subsequent iterations use send_responses_streaming with pending responses
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("start")
)
# Collect events for this turn
events = [event async for event in stream]
pending_responses = None
# Process events to collect requests and detect completion
requests: list[tuple[str, str]] = [] # (request_id, prompt)
for event in events:
if isinstance(event, RequestInfoEvent):
# Handle different types of requests
if hasattr(event.data, 'prompt'):
requests.append((event.request_id, event.data.prompt))
elif isinstance(event, WorkflowOutputEvent):
# Capture workflow output when yielded
workflow_output = str(event.data)
completed = True
# Check workflow status
pending_status = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("⏸️ State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# Handle human requests if any
if requests and not completed:
responses: dict[str, str] = {}
for req_id, prompt in requests:
print(f"\n🤖 {prompt}")
answer = input("👤 Enter your response: ").strip()
if answer.lower() == "exit":
print("👋 Exiting...")
return
responses[req_id] = answer
pending_responses = responses
# Show final result
if workflow_output:
print(f"\n🎉 {workflow_output}")
async def main():
print("=" * 60)
print("🤖 MICROSOFT AGENT FRAMEWORK WITH DETAILED OUTPUT")
print("=" * 60)
print("=" * 60)
# Create workflow components
turn_manager = TurnManager(id="turn_manager")
user_search = UserSearchExecutor(id="user_search")
request_info_executor = RequestInfoExecutor(id="request_info")
# Build the workflow graph following Microsoft Agent Framework pattern exactly
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, request_info_executor) # Request user input
.add_edge(request_info_executor, turn_manager) # User input back to turn manager
.add_edge(turn_manager, user_search) # Start search
.add_edge(user_search, turn_manager) # Search results back to turn manager
.build()
)
# Execute the interactive workflow
await run_interactive_workflow(workflow)
if __name__ == "__main__":
asyncio.run(main())
Output
Conclusion
This Human-in-the-Loop User Search workflow demonstrates how AI agents and humans can work together using Microsoft’s official Agent Framework design patterns.
By leveraging:
RequestInfoMessage / RequestInfoEvent
Typed, event-driven workflows
State preservation with pause-resume
AI-human collaboration
You can build enterprise-grade, compliant, and intelligent workflows that balance automation and human reasoning ideal for next-generation agentic AI applications.
Thanks
Sreeni Ramadorai
This content originally appeared on DEV Community and was authored by Seenivasa Ramadurai

Seenivasa Ramadurai | Sciencx (2025-10-13T00:45:15+00:00) Microsoft Agent Framework Building a Human-in-the-Loop User Search Workflow : Part-V. Retrieved from https://www.scien.cx/2025/10/13/microsoft-agent-framework-building-a-human-in-the-loop-user-search-workflow-part-v/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.