Stream Events

When streaming responses, Riffer emits typed events that represent incremental updates from the LLM.

Using Streaming

Use stream instead of generate to receive events as they arrive:

agent = MyAgent.new

agent.stream("Tell me a story").each do |event|
  case event
  when Riffer::StreamEvents::TextDelta
    print event.content
  when Riffer::StreamEvents::TextDone
    puts "\n[Complete]"
  when Riffer::StreamEvents::ToolCallDelta
    # Tool call being built
  when Riffer::StreamEvents::ToolCallDone
    puts "[Tool: #{event.name}]"
  when Riffer::StreamEvents::WebSearchStatus
    puts "[Searching: #{event.status}]"
  when Riffer::StreamEvents::WebSearchDone
    puts "[Search complete: #{event.query}]"
  end
end

Event Types

TextDelta

Emitted when incremental text content is received:

event = Riffer::StreamEvents::TextDelta.new("Hello ")
event.role     # => "assistant"
event.content  # => "Hello "
event.to_h     # => {role: "assistant", content: "Hello "}

Use this to display text in real-time as it streams.

TextDone

Emitted when text generation is complete:

event = Riffer::StreamEvents::TextDone.new("Hello, how can I help you?")
event.role     # => "assistant"
event.content  # => "Hello, how can I help you?"
event.to_h     # => {role: "assistant", content: "Hello, how can I help you?"}

Contains the complete final text.

ToolCallDelta

Emitted when tool call arguments are being streamed:

event = Riffer::StreamEvents::ToolCallDelta.new(
  item_id: "item_123",
  name: "weather_tool",
  arguments_delta: '{"city":'
)
event.role             # => "assistant"
event.item_id          # => "item_123"
event.name             # => "weather_tool"
event.arguments_delta  # => '{"city":'

The name may only be present in the first delta. Accumulate arguments_delta to build the complete arguments.

ToolCallDone

Emitted when a tool call is complete:

event = Riffer::StreamEvents::ToolCallDone.new(
  item_id: "item_123",
  call_id: "call_456",
  name: "weather_tool",
  arguments: '{"city":"Tokyo"}'
)
event.role       # => "assistant"
event.item_id    # => "item_123"
event.call_id    # => "call_456"
event.name       # => "weather_tool"
event.arguments  # => '{"city":"Tokyo"}'

Contains the complete tool call information.

ReasoningDelta

Emitted when reasoning/thinking content is streamed (OpenAI with reasoning enabled):

event = Riffer::StreamEvents::ReasoningDelta.new("Let me think about ")
event.role     # => "assistant"
event.content  # => "Let me think about "

ReasoningDone

Emitted when reasoning is complete:

event = Riffer::StreamEvents::ReasoningDone.new("Let me think about this step by step...")
event.role     # => "assistant"
event.content  # => "Let me think about this step by step..."

WebSearchStatus

Emitted during web search progress with status updates:

event = Riffer::StreamEvents::WebSearchStatus.new("searching", query: "Ruby language")
event.role    # => :assistant
event.status  # => "searching"
event.query   # => "Ruby language"
event.url     # => nil
event.to_h    # => {role: :assistant, status: "searching", query: "Ruby language"}

The url and query attributes are optional and only included in to_h when present. Status values include "in_progress", "searching", "completed", and "open_page".

# "open_page" status includes a url
event = Riffer::StreamEvents::WebSearchStatus.new("open_page", url: "https://example.com")
event.url   # => "https://example.com"
event.to_h  # => {role: :assistant, status: "open_page", url: "https://example.com"}

WebSearchDone

Emitted when web search completes:

event = Riffer::StreamEvents::WebSearchDone.new(
  "Ruby language",
  sources: [{title: "Ruby Programming", url: "https://ruby-lang.org"}]
)
event.role     # => :assistant
event.query    # => "Ruby language"
event.sources  # => [{title: "Ruby Programming", url: "https://ruby-lang.org"}]
event.to_h     # => {role: :assistant, query: "Ruby language", sources: [...]}

Contains the search query and an array of source hashes with title and url keys.

GuardrailTripwire

Emitted when a guardrail blocks execution during streaming:

agent.stream("Hello").each do |event|
  case event
  when Riffer::StreamEvents::GuardrailTripwire
    puts "Blocked by: #{event.guardrail_id}"
    puts "Reason: #{event.reason}"
    puts "Phase: #{event.phase}"  # :before or :after
  end
end

See Guardrails for more information.

GuardrailModification

Emitted when a guardrail transforms data during streaming:

agent.stream("Hello").each do |event|
  case event
  when Riffer::StreamEvents::GuardrailModification
    puts "Modified by: #{event.guardrail_id}"
    puts "Phase: #{event.phase}"              # :before or :after
    puts "Changed: #{event.message_indices}"  # Array of affected indices
  end
end

See Guardrails for more information.

Interrupt

Emitted when the agent loop is interrupted. This can happen in two ways:

This is the streaming equivalent of Response#interrupted? in generate mode.

# Callback interrupt with a string reason
event = Riffer::StreamEvents::Interrupt.new(reason: "needs approval")
event.role    # => :system
event.reason  # => "needs approval"
event.to_h    # => {role: :system, interrupt: true, reason: "needs approval"}

# Max steps interrupt with a symbol reason
event = Riffer::StreamEvents::Interrupt.new(reason: :max_steps)
event.reason  # => :max_steps

The reason is nil when throw :riffer_interrupt is called without a second argument.

agent.stream("Hello").each do |event|
  case event
  when Riffer::StreamEvents::Interrupt
    puts "Loop was interrupted: #{event.reason}"
  end
end

After an interrupt, use resume_stream to continue the loop. See Agents - Interrupting the Agent Loop for details.

TokenUsageDone

Emitted when token usage data is available at the end of a response:

event = Riffer::StreamEvents::TokenUsageDone.new(token_usage: token_usage)
event.role                          # => :assistant
event.token_usage                   # => Riffer::TokenUsage
event.token_usage.input_tokens      # => 100
event.token_usage.output_tokens     # => 50
event.token_usage.total_tokens      # => 150
event.to_h                          # => {role: :assistant, token_usage: {input_tokens: 100, output_tokens: 50}}

Use this to track token consumption in real-time during streaming.

Streaming with Tools

When an agent uses tools during streaming, the flow is:

  1. Text events stream in (TextDelta, TextDone)

  2. If tool calls are present: ToolCallDelta events, then ToolCallDone

  3. Agent executes tools internally

  4. Agent sends results back to LLM

  5. More text events stream in

  6. Repeat until no more tool calls

Note: When web_search is enabled, WebSearchStatus and WebSearchDone events may appear before text events as the provider performs a server-side search.

agent.stream("What's the weather in Tokyo?").each do |event|
  case event
  when Riffer::StreamEvents::TextDelta
    print event.content
  when Riffer::StreamEvents::ToolCallDone
    puts "\n[Calling #{event.name}...]"
  when Riffer::StreamEvents::TextDone
    puts "\n"
  end
end

Complete Example

class WeatherAgent < Riffer::Agent
  model 'openai/gpt-4o'
  instructions 'You are a weather assistant.'
  uses_tools [WeatherTool]
end

agent = WeatherAgent.new
text_buffer = ""

agent.stream("What's the weather in Tokyo and New York?").each do |event|
  case event
  when Riffer::StreamEvents::TextDelta
    print event.content
    text_buffer += event.content

  when Riffer::StreamEvents::TextDone
    # Final text available
    puts "\n---"
    puts "Complete response: #{event.content}"

  when Riffer::StreamEvents::ToolCallDelta
    # Could show "typing..." indicator

  when Riffer::StreamEvents::ToolCallDone
    puts "\n[Tool: #{event.name}(#{event.arguments})]"

  when Riffer::StreamEvents::ReasoningDelta
    # Show thinking process if desired
    print "[thinking] #{event.content}"

  when Riffer::StreamEvents::ReasoningDone
    puts "\n[reasoning complete]"

  when Riffer::StreamEvents::WebSearchStatus
    puts "[search: #{event.status}]"

  when Riffer::StreamEvents::WebSearchDone
    puts "[search complete: #{event.query}]"
    event.sources.each { |s| puts "  - #{s[:title]}: #{s[:url]}" }

  when Riffer::StreamEvents::Interrupt
    puts "\n[interrupted]"
  end
end

Base Class

All events inherit from Riffer::StreamEvents::Base:

class Riffer::StreamEvents::Base
  attr_reader :role

  def initialize(role: "assistant")
    @role = role
  end

  def to_h
    raise NotImplementedError
  end
end