AskTable
sidebar.freeTrial

Streaming Response and Tool Calling: Creating Real-Time Interactive AI Experience

AskTable Team
AskTable Team 2026-03-04

In AI applications, the key to user experience is response speed. Waiting 10 seconds for a complete answer is far less friendly than displaying character by character. AskTable achieves true real-time interactive experience through the clever combination of streaming response and tool calling.

Why Do We Need Streaming Response?

Problems with Traditional Approach

# ❌ Traditional approach: wait for complete response
response = await openai.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Analyze last month's sales data"}]
)

# User needs to wait 10 seconds to see results
print(response.choices[0].message.content)

Poor User Experience:

  • Long wait, don't know if system is working
  • Cannot see partial results early
  • Feels like system "froze"

Advantages of Streaming Response

# ✅ Streaming response: return token by token
stream = await openai.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Analyze last month's sales data"}],
    stream=True
)

async for chunk in stream:
    delta = chunk.choices[0].delta.content
    if delta:
        print(delta, end="", flush=True)  # Display in real-time

Good User Experience:

  • Immediately see response start
  • Display character by character, like someone typing
  • Can read partial content early
  • Feels like system is "alive"

Core Implementation

1. Basic Streaming Processing

async def _step(self) -> AsyncGenerator[StreamEvent, None]:
    """Execute one step of Agent reasoning"""
    self.completion_count += 1

    # Create streaming request
    completion = await self.llm_client.chat.completions.create(
        messages=self.message_builder.dump_openai(),
        model=self.model,
        tools=self.tools,
        stream=True,  # Key: enable streaming
    )

    # Process chunk by chunk
    async for chunk in completion:
        delta = self.message_builder.append_openai_delta(chunk)
        if delta:
            if isinstance(delta, list):
                for event in delta:
                    yield event  # Return event in real-time
            else:
                yield delta

Design Highlights:

1.1 Event-Driven Architecture

@dataclass
class StreamEvent:
    type: str  # "text" | "tool_call" | "tool_result"
    content: str
    metadata: dict

Event Types:

  • text: Text content generated by LLM
  • tool_call: LLM requests to call a tool
  • tool_result: Tool execution result

Example Flow:

# User asks: "What was last month's sales amount?"

# Event 1: text
{"type": "text", "content": "Let me check"}

# Event 2: tool_call
{"type": "tool_call", "name": "search_metadata", "args": {"queries": ["销售额"]}}

# Event 3: tool_result
{"type": "tool_result", "content": "Found field: sales.amount"}

# Event 4: tool_call
{"type": "tool_call", "name": "execute_sql", "args": {"sql": "SELECT SUM(amount) FROM sales WHERE ..."}}

# Event 5: tool_result
{"type": "tool_result", "content": "Result: 1,234,567 yuan"}

# Event 6: text
{"type": "text", "content": "Last month's sales amount was 1,234,567 yuan"}

1.2 Incremental Message Building

class ChatMessageBuilder:
    def append_openai_delta(self, chunk) -> StreamEvent | list[StreamEvent] | None:
        """Process streaming chunk, return event"""
        delta = chunk.choices[0].delta

        # Text content
        if delta.content:
            self._current_message["content"] += delta.content
            return StreamEvent(type="text", content=delta.content)

        # Tool calling
        if delta.tool_calls:
            for tool_call in delta.tool_calls:
                # Accumulate tool call parameters
                self._accumulate_tool_call(tool_call)

            # Return event when tool call is complete
            if self._is_tool_call_complete():
                return StreamEvent(
                    type="tool_call",
                    name=self._current_tool_call["name"],
                    args=self._current_tool_call["arguments"]
                )

        return None

Key Points:

  • Incremental Accumulation: Accumulate content chunk by chunk
  • Completeness Detection: Determine if tool call is complete
  • Event Generation: Generate events at appropriate moments

2. Tool Calling Integration

async def _step(self) -> AsyncGenerator[StreamEvent, None]:
    # ... stream receiving LLM response ...

    # Execute tool calls
    for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
        func = self.actions[tool_use["name"]]
        params = json.loads(tool_use["input"])

        try:
            # Execute tool (supports sync and async)
            func_handler = func(**params)
            if iscoroutine(func_handler):
                result = await func_handler
            else:
                result = func_handler
        except Exception as e:
            result = json.dumps({"error": str(e)})

        self.tool_called_count += 1

        # Return tool result event
        yield self.message_builder.append_tool_result(tool_use["id"], result)

Design Highlights:

2.1 Sync/Async Compatibility

# Sync tool
def show_table(self, table_names: list[str]) -> str:
    return json.dumps([...])

# Async tool
async def search_metadata(self, queries: list[str]) -> str:
    results = await retrieve_entities(...)
    return json.dumps(results)

# Auto-detect and correctly call
if iscoroutine(func_handler):
    result = await func_handler  # Async
else:
    result = func_handler  # Sync

2.2 Error Handling

try:
    result = await func(**params)
except Exception as e:
    # Return error to LLM, let it try other approaches
    result = json.dumps({"error": str(e)})

Advantages:

  • Tool execution failure won't interrupt entire flow
  • LLM can adjust strategy based on error information
  • Improves system robustness

3. Automatic Loop Execution

async def run(
    self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
    """Continuously execute until final answer is obtained"""
    self.tool_called_count = 0
    self.completion_count = 0

    # First step: process user input
    async for chunk in self.step(user_input):
        yield chunk

    # Continue executing until complete
    while True:
        if self.completion_count >= self.max_completions:
            raise CannotHandle("Completion count exceeds max")

        last_message = self.message_builder.dump_anthropic()[-1]

        if last_message["role"] == "assistant":
            last_block = last_message["content"][-1]
            if last_block["type"] == "text":
                # Got final answer, end
                break

        elif last_message["role"] == "user":
            last_block = last_message["content"][-1]
            if last_block["type"] == "tool_result":
                if self.tool_called_count >= self.max_tool_calls:
                    raise CannotHandle("Tool called count exceeds max")

                # Continue to next step
                async for chunk in self.step():
                    yield chunk

Execution Flow:

User Input
  ↓
LLM Reasoning (Streaming)
  ↓
Tool Call?
  ├─ Yes → Execute Tool → Return Result → LLM Reasoning (Streaming) → ...
  └─ No → Return Final Answer → End

Safety Limits:

self.max_tool_calls = 10  # Maximum 10 tool calls
self.max_completions = 20  # Maximum 20 LLM requests

Prevents infinite loops from consuming resources.

Practical Application Scenarios

Scenario 1: Complex Data Analysis

User Question: "Analyze last month's sales by region, find the top 3 fastest-growing regions"

Streaming Response Process:

[Text] Let me first query the relevant data tables...
[Tool Call] search_metadata(queries=["Sales", "Region"])
[Tool Result] Found tables: sales, regions
[Text] Now getting detailed table structure...
[Tool Call] show_table(table_names=["sales", "regions"])
[Tool Result] Returned field information
[Text] Querying last month's sales by region...
[Tool Call] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[Tool Result] Returned query results
[Text] Querying month-before-last data for comparison...
[Tool Call] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[Tool Result] Returned query results
[Text] Based on data analysis, the top 3 fastest-growing regions are:
1. East China: 45% growth
2. South China: 38% growth
3. Southwest: 32% growth

User Experience:

  • See Agent's thinking process in real-time
  • Understand what each step is doing
  • Even if entire flow takes 10 seconds, won't feel "stuck"

Scenario 2: Error Recovery

User Question: "Query data from users table"

Streaming Response Process:

[Text] Let me query the users table...
[Tool Call] execute_sql(sql="SELECT * FROM users")
[Tool Result] Error: Table does not exist
[Text] Sorry, users table does not exist. Let me search for related tables...
[Tool Call] search_metadata(queries=["用户"])
[Tool Result] Found tables: user_info, customer
[Text] I found the user_info table. Would you like to query this table?

Advantages:

  • Errors don't interrupt the flow
  • Agent automatically tries other approaches
  • User sees the complete reasoning process

Performance Optimization

1. Event Merging

# Avoid too frequent events
class EventBuffer:
    def __init__(self, flush_interval=0.1):
        self.buffer = []
        self.last_flush = time.time()
        self.flush_interval = flush_interval

    def add(self, event):
        self.buffer.append(event)
        if time.time() - self.last_flush > self.flush_interval:
            return self.flush()
        return None

    def flush(self):
        if not self.buffer:
            return None
        events = self.buffer
        self.buffer = []
        self.last_flush = time.time()
        return events

2. Backpressure Control

# Control production speed, avoid consumer falling behind
class BackpressureStream:
    def __init__(self, max_buffer_size=100):
        self.queue = asyncio.Queue(maxsize=max_buffer_size)

    async def produce(self, event):
        await self.queue.put(event)  # Queue full时会阻塞

    async def consume(self):
        while True:
            event = await self.queue.get()
            yield event

Best Practices

1. Event Design

# ✅ Good event design
@dataclass
class StreamEvent:
    type: str
    content: str
    metadata: dict  # Extra information

# ❌ Poor event design
@dataclass
class StreamEvent:
    data: str  # Type not clear

2. Error Handling

# ✅ Graceful degradation
try:
    result = await tool(**params)
except Exception as e:
    result = {"error": str(e), "suggestion": "try another approach"}
    yield StreamEvent(type="error", content=str(e))

# ❌ Throw exception directly
result = await tool(**params)  # May interrupt entire flow

3. User Feedback

# ✅ Provide progress information
yield StreamEvent(type="progress", content="Querying database...")
yield StreamEvent(type="progress", content="Analyzing results...")

# ❌ No feedback for a long time
# User doesn't know what the system is doing

Summary

AskTable achieves smooth real-time interactive experience through the combination of streaming response and tool calling:

  1. True Streaming: Return token by token, instead of waiting for complete response
  2. Event-Driven: Clear event types, easy for front-end rendering
  3. Tool Integration: Seamlessly execute tool calls during streaming
  4. Automatic Loop: Continuously execute until final answer is obtained
  5. Error Recovery: Gracefully handle errors, don't interrupt flow

Streaming response is not just a technical implementation, but the key to user experience. By letting users see AI's thinking process in real-time, we build more transparent and trustworthy AI applications.

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport