
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In AI applications, the key to user experience is response speed. Waiting 10 seconds for a complete answer is far less friendly than displaying character by character. AskTable achieves true real-time interactive experience through the clever combination of streaming response and tool calling.
# ❌ Traditional approach: wait for complete response
response = await openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Analyze last month's sales data"}]
)
# User needs to wait 10 seconds to see results
print(response.choices[0].message.content)
Poor User Experience:
# ✅ Streaming response: return token by token
stream = await openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Analyze last month's sales data"}],
stream=True
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True) # Display in real-time
Good User Experience:
async def _step(self) -> AsyncGenerator[StreamEvent, None]:
"""Execute one step of Agent reasoning"""
self.completion_count += 1
# Create streaming request
completion = await self.llm_client.chat.completions.create(
messages=self.message_builder.dump_openai(),
model=self.model,
tools=self.tools,
stream=True, # Key: enable streaming
)
# Process chunk by chunk
async for chunk in completion:
delta = self.message_builder.append_openai_delta(chunk)
if delta:
if isinstance(delta, list):
for event in delta:
yield event # Return event in real-time
else:
yield delta
Design Highlights:
@dataclass
class StreamEvent:
type: str # "text" | "tool_call" | "tool_result"
content: str
metadata: dict
Event Types:
Example Flow:
# User asks: "What was last month's sales amount?"
# Event 1: text
{"type": "text", "content": "Let me check"}
# Event 2: tool_call
{"type": "tool_call", "name": "search_metadata", "args": {"queries": ["销售额"]}}
# Event 3: tool_result
{"type": "tool_result", "content": "Found field: sales.amount"}
# Event 4: tool_call
{"type": "tool_call", "name": "execute_sql", "args": {"sql": "SELECT SUM(amount) FROM sales WHERE ..."}}
# Event 5: tool_result
{"type": "tool_result", "content": "Result: 1,234,567 yuan"}
# Event 6: text
{"type": "text", "content": "Last month's sales amount was 1,234,567 yuan"}
class ChatMessageBuilder:
def append_openai_delta(self, chunk) -> StreamEvent | list[StreamEvent] | None:
"""Process streaming chunk, return event"""
delta = chunk.choices[0].delta
# Text content
if delta.content:
self._current_message["content"] += delta.content
return StreamEvent(type="text", content=delta.content)
# Tool calling
if delta.tool_calls:
for tool_call in delta.tool_calls:
# Accumulate tool call parameters
self._accumulate_tool_call(tool_call)
# Return event when tool call is complete
if self._is_tool_call_complete():
return StreamEvent(
type="tool_call",
name=self._current_tool_call["name"],
args=self._current_tool_call["arguments"]
)
return None
Key Points:
async def _step(self) -> AsyncGenerator[StreamEvent, None]:
# ... stream receiving LLM response ...
# Execute tool calls
for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
func = self.actions[tool_use["name"]]
params = json.loads(tool_use["input"])
try:
# Execute tool (supports sync and async)
func_handler = func(**params)
if iscoroutine(func_handler):
result = await func_handler
else:
result = func_handler
except Exception as e:
result = json.dumps({"error": str(e)})
self.tool_called_count += 1
# Return tool result event
yield self.message_builder.append_tool_result(tool_use["id"], result)
Design Highlights:
# Sync tool
def show_table(self, table_names: list[str]) -> str:
return json.dumps([...])
# Async tool
async def search_metadata(self, queries: list[str]) -> str:
results = await retrieve_entities(...)
return json.dumps(results)
# Auto-detect and correctly call
if iscoroutine(func_handler):
result = await func_handler # Async
else:
result = func_handler # Sync
try:
result = await func(**params)
except Exception as e:
# Return error to LLM, let it try other approaches
result = json.dumps({"error": str(e)})
Advantages:
async def run(
self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
"""Continuously execute until final answer is obtained"""
self.tool_called_count = 0
self.completion_count = 0
# First step: process user input
async for chunk in self.step(user_input):
yield chunk
# Continue executing until complete
while True:
if self.completion_count >= self.max_completions:
raise CannotHandle("Completion count exceeds max")
last_message = self.message_builder.dump_anthropic()[-1]
if last_message["role"] == "assistant":
last_block = last_message["content"][-1]
if last_block["type"] == "text":
# Got final answer, end
break
elif last_message["role"] == "user":
last_block = last_message["content"][-1]
if last_block["type"] == "tool_result":
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle("Tool called count exceeds max")
# Continue to next step
async for chunk in self.step():
yield chunk
Execution Flow:
User Input
↓
LLM Reasoning (Streaming)
↓
Tool Call?
├─ Yes → Execute Tool → Return Result → LLM Reasoning (Streaming) → ...
└─ No → Return Final Answer → End
Safety Limits:
self.max_tool_calls = 10 # Maximum 10 tool calls
self.max_completions = 20 # Maximum 20 LLM requests
Prevents infinite loops from consuming resources.
User Question: "Analyze last month's sales by region, find the top 3 fastest-growing regions"
Streaming Response Process:
[Text] Let me first query the relevant data tables...
[Tool Call] search_metadata(queries=["Sales", "Region"])
[Tool Result] Found tables: sales, regions
[Text] Now getting detailed table structure...
[Tool Call] show_table(table_names=["sales", "regions"])
[Tool Result] Returned field information
[Text] Querying last month's sales by region...
[Tool Call] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[Tool Result] Returned query results
[Text] Querying month-before-last data for comparison...
[Tool Call] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[Tool Result] Returned query results
[Text] Based on data analysis, the top 3 fastest-growing regions are:
1. East China: 45% growth
2. South China: 38% growth
3. Southwest: 32% growth
User Experience:
User Question: "Query data from users table"
Streaming Response Process:
[Text] Let me query the users table...
[Tool Call] execute_sql(sql="SELECT * FROM users")
[Tool Result] Error: Table does not exist
[Text] Sorry, users table does not exist. Let me search for related tables...
[Tool Call] search_metadata(queries=["用户"])
[Tool Result] Found tables: user_info, customer
[Text] I found the user_info table. Would you like to query this table?
Advantages:
# Avoid too frequent events
class EventBuffer:
def __init__(self, flush_interval=0.1):
self.buffer = []
self.last_flush = time.time()
self.flush_interval = flush_interval
def add(self, event):
self.buffer.append(event)
if time.time() - self.last_flush > self.flush_interval:
return self.flush()
return None
def flush(self):
if not self.buffer:
return None
events = self.buffer
self.buffer = []
self.last_flush = time.time()
return events
# Control production speed, avoid consumer falling behind
class BackpressureStream:
def __init__(self, max_buffer_size=100):
self.queue = asyncio.Queue(maxsize=max_buffer_size)
async def produce(self, event):
await self.queue.put(event) # Queue full时会阻塞
async def consume(self):
while True:
event = await self.queue.get()
yield event
# ✅ Good event design
@dataclass
class StreamEvent:
type: str
content: str
metadata: dict # Extra information
# ❌ Poor event design
@dataclass
class StreamEvent:
data: str # Type not clear
# ✅ Graceful degradation
try:
result = await tool(**params)
except Exception as e:
result = {"error": str(e), "suggestion": "try another approach"}
yield StreamEvent(type="error", content=str(e))
# ❌ Throw exception directly
result = await tool(**params) # May interrupt entire flow
# ✅ Provide progress information
yield StreamEvent(type="progress", content="Querying database...")
yield StreamEvent(type="progress", content="Analyzing results...")
# ❌ No feedback for a long time
# User doesn't know what the system is doing
AskTable achieves smooth real-time interactive experience through the combination of streaming response and tool calling:
Streaming response is not just a technical implementation, but the key to user experience. By letting users see AI's thinking process in real-time, we build more transparent and trustworthy AI applications.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial