Effloow
← Back to Articles
ARTICLES ·2026-04-30 ·BY EFFLOOW CONTENT FACTORY

Claude Streaming + Tool Use: Build Real-Time Agentic Pipelines

Stream tool calls in real time with the Anthropic Python SDK. Learn SSE events, input_json_delta accumulation, and multi-turn patterns for responsive AI agents.
anthropic claude streaming tool-use python
SHARE
Claude Streaming + Tool Use: Build Real-Time Agentic Pipelines

If you have built a Claude-powered app and added tool use, you already know the problem: the model freezes at the UI level while it reasons and calls tools. Streaming alone fixes text output latency — but streaming with tool use is a different beast. The events are different, the accumulation logic is different, and the multi-turn flow adds another layer.

Effloow Lab ran a sandbox validation using anthropic SDK 0.97.0, confirmed all SSE event types, and traced the full event sequence for a streaming tool call. This guide translates those findings into production-ready patterns.

What Changes When You Add Tool Use to Streaming

Without tools, streaming is straightforward: receive text_delta events, append to a buffer, render incrementally. With tools, the sequence has three distinct phases:

  1. Text phase — the model may stream reasoning text before calling a tool
  2. Tool input phase — the model streams the tool input as partial JSON fragments
  3. Result turn — you call the tool, send the result back, and stream the final response

The key difference from plain streaming: the tool input arrives as input_json_delta events whose partial_json field is an incomplete JSON string. You must accumulate fragments across all content_block_delta events for a given block index, then parse the complete JSON at content_block_stop.

SSE Event Types in the Anthropic SDK

Effloow Lab confirmed the following types in anthropic.types (SDK 0.97.0):

from anthropic.types import (
    RawContentBlockStartEvent,   # alias: ContentBlockStartEvent
    RawContentBlockDeltaEvent,   # alias: ContentBlockDeltaEvent
    RawContentBlockStopEvent,    # alias: ContentBlockStopEvent
    MessageStartEvent,
    MessageDeltaEvent,
    MessageStopEvent,
    InputJSONDelta,
    TextDelta,
)

The event type aliases (ContentBlockStartEvent, ContentBlockDeltaEvent, ContentBlockStopEvent) resolve to the Raw* variants at runtime. Both names work in type annotations.

The Complete SSE Event Sequence

For a streaming message where the model calls one tool before generating a final text response:

message_start
  └── usage.input_tokens, model id

content_block_start  (index=0, type="text")
content_block_delta  (index=0, type="text_delta", text="Let me check...")
content_block_stop   (index=0)

content_block_start  (index=1, type="tool_use", id="toolu_...", name="get_weather")
content_block_delta  (index=1, type="input_json_delta", partial_json='{"ci')
content_block_delta  (index=1, type="input_json_delta", partial_json='ty": "')
content_block_delta  (index=1, type="input_json_delta", partial_json='Tokyo"}')
content_block_stop   (index=1)   ← parse accumulated JSON here

message_delta  (stop_reason="tool_use", usage.output_tokens)
message_stop

After receiving message_stop with stop_reason="tool_use", send the tool result and resume streaming for the model's final answer.

Minimal Working Implementation

import anthropic
import json

client = anthropic.Anthropic()

tools = [
    {
        "name": "get_weather",
        "description": "Get current weather for a city",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "City name"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["city"]
        }
    }
]

def run_tool(name: str, tool_input: dict) -> str:
    """Fake tool executor — replace with real implementation."""
    if name == "get_weather":
        return json.dumps({"city": tool_input["city"], "temp": 22, "condition": "sunny"})
    return json.dumps({"error": "unknown tool"})


def stream_with_tools(user_message: str):
    messages = [{"role": "user", "content": user_message}]

    while True:
        # Phase 1: stream model response
        tool_calls = {}     # index → {name, id, accumulated_json}
        stop_reason = None

        with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            tools=tools,
            messages=messages,
        ) as stream:
            for event in stream:
                if event.type == "content_block_start":
                    if event.content_block.type == "tool_use":
                        tool_calls[event.index] = {
                            "name": event.content_block.name,
                            "id": event.content_block.id,
                            "json_buf": "",
                        }

                elif event.type == "content_block_delta":
                    if event.delta.type == "input_json_delta":
                        tool_calls[event.index]["json_buf"] += event.delta.partial_json
                    elif event.delta.type == "text_delta":
                        print(event.delta.text, end="", flush=True)

                elif event.type == "message_delta":
                    stop_reason = event.delta.stop_reason

        if stop_reason != "tool_use":
            break  # model finished without tools

        # Phase 2: execute tools, build tool_result turn
        assistant_content = stream.get_final_message().content
        tool_results = []

        for idx, tc in tool_calls.items():
            tool_input = json.loads(tc["json_buf"])
            result = run_tool(tc["name"], tool_input)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tc["id"],
                "content": result,
            })

        # Append assistant turn and tool results, then loop
        messages.append({"role": "assistant", "content": assistant_content})
        messages.append({"role": "user", "content": tool_results})


stream_with_tools("What is the weather in Tokyo right now?")

Handling Multiple Tools Per Turn

The model can call multiple tools in a single turn. Each tool call occupies its own content block index. The accumulation dict tool_calls handles this naturally since each index maps to an independent buffer. After message_stop, iterate all entries in tool_calls to build one tool_results list.

Progressive UI Updates During Tool Streaming

One advantage of streaming tool input is that you can update a UI element before the tool executes. For example, show a "Checking weather for Tokyo..." placeholder as the JSON fragments arrive:

elif event.type == "content_block_delta":
    if event.delta.type == "input_json_delta":
        tool_calls[event.index]["json_buf"] += event.delta.partial_json
        # Try partial parse with jiter-compatible approach
        buf = tool_calls[event.index]["json_buf"]
        if '"city"' in buf:
            try:
                partial = json.loads(buf + "}}")  # attempt partial close
                print(f"\r[calling {tool_calls[event.index]['name']} for {partial.get('city','')}...]", end="")
            except json.JSONDecodeError:
                pass

For production, the jiter library (bundled with the SDK) provides proper incremental JSON parsing without the hack above.

Async Streaming

For web servers (FastAPI, Django async views), use the async variant:

async def stream_with_tools_async(user_message: str):
    messages = [{"role": "user", "content": user_message}]

    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        tools=tools,
        messages=messages,
    ) as stream:
        async for event in stream:
            # same event handling as sync version
            ...

The async version uses async with and async for. Everything else is identical.

Common Mistakes to Avoid

1. Parsing JSON before content_block_stop input_json_delta fragments are incomplete JSON strings. Parsing mid-stream will throw a JSONDecodeError. Always wait for content_block_stop before calling json.loads().

2. Ignoring stop_reason If stop_reason is "end_turn", the model did not call a tool. Sending a tool_result turn in this case causes an API error. Check stop_reason before building the next turn.

3. Using get_final_text() when tools are involved The .get_final_text() helper on the stream object returns only text content. When tools are involved, use .get_final_message().content to get the full content list including tool_use blocks.

4. Forgetting to append the assistant turn Before sending tool results, append the full assistant response to the messages list. Omitting this breaks the conversation context and the model will repeat tool calls.

5. Passing tool_choice="required" without a matching tool If tool_choice forces a specific tool by name and that tool is not in your tools list, the API returns a 400 error. Validate tool names at startup.

6. Single-turn architecture with tool use Tool use requires multiple HTTP requests. If you architect as a single-request pipeline, you will hit an early wall. Design for the while True: ... if stop_reason != "tool_use": break loop pattern from the start.

Comparing Streaming Modes

Mode When to use
client.messages.stream() Standard — gives SSE events + helper methods
client.messages.create(stream=True) Raw SSE — gives full control, no helpers
client.messages.create() Non-streaming — simpler code, higher latency

For tool use, client.messages.stream() is the right default. The .get_final_message() helper handles event accumulation so you only need to track input_json_delta fragments manually.

Measuring Streaming Latency

To measure time-to-first-token (TTFT) and time-to-first-tool-call (TTFTC):

import time

start = time.perf_counter()
first_token_logged = False
first_tool_logged = False

with client.messages.stream(...) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if not first_token_logged and event.delta.type == "text_delta":
                print(f"TTFT: {(time.perf_counter() - start) * 1000:.0f}ms")
                first_token_logged = True
            if not first_tool_logged and event.delta.type == "input_json_delta":
                print(f"TTFTC: {(time.perf_counter() - start) * 1000:.0f}ms")
                first_tool_logged = True

FAQ

Q: Can I stream tool results back to the user? Tool results are sent as user-turn messages — they are not streamed. Only the model's response is streamed. If you want to show tool output incrementally, render it from your tool executor as it runs.

Q: Does streaming work with extended thinking? Yes. Extended thinking content blocks stream as thinking type in content_block_start, with thinking_delta events. Tool use and thinking can coexist in the same stream.

Q: How do I cancel a stream mid-way? Break out of the event loop. The HTTP connection will be dropped. No partial tokens are charged — billing is based on the final message usage fields in message_delta.

Q: Does tool input streaming work with structured output / JSON mode? Tool input is always streamed as partial JSON (via input_json_delta). If you use betas=["structured-output"], the model's text output may be constrained but tool input streaming is unchanged.

Q: What is the maximum number of tools in one request? There is no documented hard limit on tool count. In practice, performance degrades with more than ~20 tool definitions due to increased context overhead. Use tool groups or dynamic tool selection for large toolsets.

Verdict: Stream Tool Use From Day One

Streaming tool use adds ~30 lines of state management to a basic streaming implementation but changes the UX from "frozen for 5 seconds" to "visibly working in real time." The patterns here — event accumulation, multi-turn loop, async mode — cover the full production surface. Start with the stream_with_tools() pattern and add the UI update layer once the baseline is solid.

For related patterns: OpenAI Agents SDK multi-agent guide and Claude Opus 4.7 developer guide cover complementary agentic patterns.

Need content like this
for your blog?

We run AI-powered technical blogs. Start with a free 3-article pilot.

Learn more →

More in Articles

Stay in the loop.

One dispatch every Friday. New articles, tool releases, and a short note from the editor.

Get weekly AI tool reviews & automation tips

Join our newsletter. No spam, unsubscribe anytime.