To get user input in your Python AI assistant file, you have several approaches depending on your setup. Here are the most common methods:
# Basic console input
user_input = input("Enter your message: ")
print(f"You said: {user_input}")import asyncio
import uuid
from agents import Agent, Runner
async def main():
current_agent = triage_agent
input_items = []
context = AirlineAgentContext()
conversation_id = uuid.uuid4().hex[:16]
while True:
print(f"Result from {current_agent.name}:")
# Get user input
user_input = input("Enter your message (or 'quit' to exit): ")
# Exit condition
if user_input.lower() in ['quit', 'exit', 'bye']:
print("Goodbye!")
break
# Process the input
with trace("Customer service", group_id=conversation_id):
input_items.append({"content": user_input, "role": "user"})
result = await Runner.run(current_agent, input_items, context=context)
# Handle responses
for new_item in result.new_items:
# ... your existing response handling code
input_items = result.to_input_list()
current_agent = result.last_agent
if __name__ == "__main__":
asyncio.run(main())def get_user_input(prompt="Enter your message: "):
"""Get user input with validation and error handling"""
while True:
try:
user_input = input(prompt).strip()
# Check for empty input
if not user_input:
print("Please enter a message.")
continue
# Check for exit commands
if user_input.lower() in ['quit', 'exit', 'bye', 'q']:
return None
return user_input
except KeyboardInterrupt:
print("\nGoodbye!")
return None
except EOFError:
print("\nInput ended.")
return None
# Usage in your main loop
async def main():
# ... initialization code ...
while True:
user_input = get_user_input("You: ")
if user_input is None: # User wants to quit
break
# Process the input
# ... your agent processing code ...from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
console = Console()
def get_styled_input():
"""Get user input with rich styling"""
console.print(Panel("AI Assistant", style="bold blue"))
while True:
user_input = Prompt.ask(
"[bold green]You[/bold green]",
default="",
show_default=False
)
if user_input.lower() in ['quit', 'exit']:
console.print("[bold red]Goodbye![/bold red]")
return None
if user_input.strip():
return user_input
console.print("[yellow]Please enter a message.[/yellow]")
# Usage
async def main():
while True:
user_input = get_styled_input()
if user_input is None:
break
# Process input
console.print(f"[bold cyan]Processing:[/bold cyan] {user_input}")
# ... your agent code ...def read_from_file(filename="input.txt"):
"""Read user inputs from a file"""
try:
with open(filename, 'r') as file:
lines = file.readlines()
return [line.strip() for line in lines if line.strip()]
except FileNotFoundError:
print(f"File {filename} not found.")
return []
# Usage
async def main():
# Try file input first, fallback to console
file_inputs = read_from_file("user_inputs.txt")
if file_inputs:
for user_input in file_inputs:
print(f"Processing: {user_input}")
# ... process each input ...
else:
# Fallback to console input
while True:
user_input = input("Enter your message: ")
# ... process input ...from flask import Flask, request, jsonify
import asyncio
import threading
app = Flask(__name__)
class AIAssistant:
def __init__(self):
self.current_agent = triage_agent
self.context = AirlineAgentContext()
self.input_items = []
async def process_message(self, user_input):
"""Process user message and return response"""
self.input_items.append({"content": user_input, "role": "user"})
result = await Runner.run(
self.current_agent,
self.input_items,
context=self.context
)
# Extract response
response = ""
for new_item in result.new_items:
if isinstance(new_item, MessageOutputItem):
response += ItemHelpers.text_message_output(new_item)
self.input_items = result.to_input_list()
self.current_agent = result.last_agent
return response
assistant = AIAssistant()
@app.route('/chat', methods=['POST'])
def chat():
user_input = request.json.get('message', '')
# Run async function in sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
response = loop.run_until_complete(assistant.process_message(user_input))
loop.close()
return jsonify({'response': response})
if __name__ == "__main__":
app.run(debug=True)import argparse
import sys
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='AI Assistant')
parser.add_argument('--message', '-m', type=str, help='Single message to process')
parser.add_argument('--file', '-f', type=str, help='File containing messages')
parser.add_argument('--interactive', '-i', action='store_true', help='Interactive mode')
return parser.parse_args()
async def main():
args = parse_arguments()
if args.message:
# Single message mode
await process_single_message(args.message)
elif args.file:
# File input mode
await process_file_messages(args.file)
else:
# Interactive mode (default)
await interactive_mode()
async def interactive_mode():
"""Your existing interactive loop"""
while True:
user_input = input("Enter your message: ")
# ... process input ...Here's how to enhance your existing code:
async def main():
current_agent: Agent[AirlineAgentContext] = triage_agent
input_items: list[TResponseInputItem] = []
context = AirlineAgentContext()
conversation_id = uuid.uuid4().hex[:16]
print("🛫 Welcome to SkyWings AI Assistant!")
print("Type 'help' for commands, 'quit' to exit")
print("-" * 50)
while True:
try:
# Enhanced input prompt
user_input = input(f"\n[{current_agent.name}] You: ").strip()
# Handle special commands
if user_input.lower() == 'quit':
print("Thank you for using SkyWings! Safe travels! ✈️")
break
elif user_input.lower() == 'help':
print_help()
continue
elif user_input.lower() == 'clear':
os.system('cls' if os.name == 'nt' else 'clear')
continue
elif not user_input:
print("Please enter a message.")
continue
# Process with your existing agent system
with trace("Customer service", group_id=conversation_id):
input_items.append({"content": user_input, "role": "user"})
result = await Runner.run(current_agent, input_items, context=context)
print(f"\n🤖 Assistant:")
for new_item in result.new_items:
agent_name = new_item.agent.name
if isinstance(new_item, MessageOutputItem):
print(f" {ItemHelpers.text_message_output(new_item)}")
elif isinstance(new_item, HandoffOutputItem):
print(f" → Transferring to {new_item.target_agent.name}")
elif isinstance(new_item, ToolCallOutputItem):
print(f" ✅ {new_item.output}")
input_items = result.to_input_list()
current_agent = result.last_agent
except KeyboardInterrupt:
print("\n\nGoodbye! 👋")
break
except Exception as e:
print(f"❌ Error: {e}")
continue
def print_help():
"""Print available commands"""
print("""
📋 Available Commands:
• help - Show this help message
• quit - Exit the assistant
• clear - Clear the screen
• status - Check flight status
• book - Book a new flight
• seats - Check available seats
• faq - Ask frequently asked questions
""")The key is that input() function blocks and waits for user input, making it perfect for interactive AI assistants. Choose the approach that best fits your needs! 🚀
Ah, I understand! You want to know how to get the user message from your Next.js frontend (agent page) into your Python AI assistant file. Here are several approaches:
// In your agent page component
const processUserMessage = async (userInput: string) => {
try {
// Send to Python backend
const response = await fetch('http://localhost:8000/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: userInput,
conversation_id: conversationId,
context: agentContext
}),
})
const data = await response.json()
// Handle response
setMessages(prev => [...prev, {
role: 'assistant',
content: data.response,
timestamp: new Date()
}])
} catch (error) {
console.error('Error:', error)
}
}from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import asyncio
from your_agent_file import triage_agent, AirlineAgentContext, Runner
app = FastAPI()
# Enable CORS for Next.js frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # Your Next.js URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatMessage(BaseModel):
message: str
conversation_id: str
context: dict = {}
class ChatResponse(BaseModel):
response: str
actions_executed: list = []
context: dict = {}
# Store conversation states
conversations = {}
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(chat_message: ChatMessage):
try:
conversation_id = chat_message.conversation_id
# Get or create conversation state
if conversation_id not in conversations:
conversations[conversation_id] = {
"current_agent": triage_agent,
"input_items": [],
"context": AirlineAgentContext()
}
conv_state = conversations[conversation_id]
# Add user message to conversation
conv_state["input_items"].append({
"content": chat_message.message,
"role": "user"
})
# Process with your agent system
result = await Runner.run(
conv_state["current_agent"],
conv_state["input_items"],
context=conv_state["context"]
)
# Extract response
response_text = ""
actions_executed = []
for new_item in result.new_items:
if isinstance(new_item, MessageOutputItem):
response_text += ItemHelpers.text_message_output(new_item)
elif isinstance(new_item, ToolCallOutputItem):
actions_executed.append({
"action": "tool_call",
"output": new_item.output,
"success": True
})
# Update conversation state
conv_state["input_items"] = result.to_input_list()
conv_state["current_agent"] = result.last_agent
return ChatResponse(
response=response_text,
actions_executed=actions_executed,
context=conv_state["context"].__dict__
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def root():
return {"message": "AI Assistant API is running"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)// WebSocket connection in your agent page
useEffect(() => {
const ws = new WebSocket('ws://localhost:8000/ws')
ws.onopen = () => {
console.log('Connected to Python AI Assistant')
}
ws.onmessage = (event) => {
const data = JSON.parse(event.data)
setMessages(prev => [...prev, {
role: 'assistant',
content: data.response,
timestamp: new Date()
}])
}
// Store WebSocket reference
wsRef.current = ws
return () => ws.close()
}, [])
const sendMessage = (userInput: string) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify({
message: userInput,
conversation_id: conversationId
}))
}
}from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
import asyncio
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
self.conversations = {}
async def connect(self, websocket: WebSocket, conversation_id: str):
await websocket.accept()
self.active_connections[conversation_id] = websocket
# Initialize conversation
if conversation_id not in self.conversations:
self.conversations[conversation_id] = {
"current_agent": triage_agent,
"input_items": [],
"context": AirlineAgentContext()
}
def disconnect(self, conversation_id: str):
if conversation_id in self.active_connections:
del self.active_connections[conversation_id]
async def send_message(self, message: str, conversation_id: str):
if conversation_id in self.active_connections:
await self.active_connections[conversation_id].send_text(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
conversation_id = None
try:
await websocket.accept()
while True:
# Receive message from frontend
data = await websocket.receive_text()
message_data = json.loads(data)
conversation_id = message_data.get("conversation_id")
user_message = message_data.get("message")
# Initialize conversation if needed
if conversation_id not in manager.conversations:
manager.conversations[conversation_id] = {
"current_agent": triage_agent,
"input_items": [],
"context": AirlineAgentContext()
}
conv_state = manager.conversations[conversation_id]
# Process with agent
conv_state["input_items"].append({
"content": user_message,
"role": "user"
})
result = await Runner.run(
conv_state["current_agent"],
conv_state["input_items"],
context=conv_state["context"]
)
# Send response back
response_text = ""
for new_item in result.new_items:
if isinstance(new_item, MessageOutputItem):
response_text += ItemHelpers.text_message_output(new_item)
await websocket.send_text(json.dumps({
"response": response_text,
"conversation_id": conversation_id
}))
# Update state
conv_state["input_items"] = result.to_input_list()
conv_state["current_agent"] = result.last_agent
except WebSocketDisconnect:
if conversation_id:
manager.disconnect(conversation_id)const sendToQueue = async (message: string) => {
await fetch('/api/queue/publish', {
method: 'POST',
body: JSON.stringify({
message,
conversation_id: conversationId
})
})
}import redis
import json
import asyncio
redis_client = redis.Redis(host='localhost', port=6379, db=0)
async def process_queue_messages():
"""Process messages from Redis queue"""
while True:
try:
# Get message from queue
message = redis_client.blpop('chat_queue', timeout=1)
if message:
data = json.loads(message[1])
user_message = data['message']
conversation_id = data['conversation_id']
# Process with your agent
response = await process_with_agent(user_message, conversation_id)
# Send response back to queue
redis_client.rpush(f'response_{conversation_id}', json.dumps({
'response': response,
'conversation_id': conversation_id
}))
except Exception as e:
print(f"Error processing queue message: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(process_queue_messages())const sendMessage = async (message: string) => {
await fetch('/api/messages', {
method: 'POST',
body: JSON.stringify({
message,
conversation_id: conversationId,
status: 'pending'
})
})
// Poll for response
pollForResponse(conversationId)
}import asyncio
from pymongo import MongoClient
async def poll_for_messages():
"""Poll database for new messages"""
client = MongoClient(os.getenv("MONGODB_URI"))
db = client["airline_db"]
messages_collection = db["chat_messages"]
while True:
try:
# Find pending messages
pending_messages = messages_collection.find({
"status": "pending"
})
for message_doc in pending_messages:
# Process message
response = await process_with_agent(
message_doc["message"],
message_doc["conversation_id"]
)
# Update with response
messages_collection.update_one(
{"_id": message_doc["_id"]},
{
"$set": {
"response": response,
"status": "completed",
"processed_at": datetime.utcnow()
}
}
)
except Exception as e:
print(f"Error polling messages: {e}")
await asyncio.sleep(2) # Poll every 2 seconds
if __name__ == "__main__":
asyncio.run(poll_for_messages())# Terminal 1: Start Python AI server
cd python-ai-assistant
python main.py
# Terminal 2: Start Next.js frontend
cd airline-booking-website
npm run dev// app/api/agent/route.ts
export async function POST(request: NextRequest) {
try {
const { message, context, conversationId } = await request.json()
// Forward to Python AI assistant
const pythonResponse = await fetch('http://localhost:8000/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message,
conversation_id: conversationId,
context
}),
})
const data = await pythonResponse.json()
return NextResponse.json({
content: data.response,
actionsExecuted: data.actions_executed,
context: data.context,
})
} catch (error) {
console.error("Error connecting to Python AI:", error)
return NextResponse.json({
content: "AI assistant is temporarily unavailable",
error: true
})
}
}Recommended approach: Use FastAPI with HTTP endpoints (#1) as it's the most straightforward and reliable method for getting messages from your Next.js frontend to your Python AI assistant! 🚀