Stream Events
When streaming responses, Riffer emits typed events that represent incremental updates from the LLM.
Using Streaming
Use stream instead of generate to receive events as they arrive:
agent = MyAgent.new agent.stream("Tell me a story").each do |event| case event when Riffer::StreamEvents::TextDelta print event.content when Riffer::StreamEvents::TextDone puts "\n[Complete]" when Riffer::StreamEvents::ToolCallDelta # Tool call being built when Riffer::StreamEvents::ToolCallDone puts "[Tool: #{event.name}]" when Riffer::StreamEvents::WebSearchStatus puts "[Searching: #{event.status}]" when Riffer::StreamEvents::WebSearchDone puts "[Search complete: #{event.query}]" end end
Event Types
TextDelta
Emitted when incremental text content is received:
event = Riffer::StreamEvents::TextDelta.new("Hello ") event.role # => "assistant" event.content # => "Hello " event.to_h # => {role: "assistant", content: "Hello "}
Use this to display text in real-time as it streams.
TextDone
Emitted when text generation is complete:
event = Riffer::StreamEvents::TextDone.new("Hello, how can I help you?") event.role # => "assistant" event.content # => "Hello, how can I help you?" event.to_h # => {role: "assistant", content: "Hello, how can I help you?"}
Contains the complete final text.
ToolCallDelta
Emitted when tool call arguments are being streamed:
event = Riffer::StreamEvents::ToolCallDelta.new( item_id: "item_123", name: "weather_tool", arguments_delta: '{"city":' ) event.role # => "assistant" event.item_id # => "item_123" event.name # => "weather_tool" event.arguments_delta # => '{"city":'
The name may only be present in the first delta. Accumulate arguments_delta to build the complete arguments.
ToolCallDone
Emitted when a tool call is complete:
event = Riffer::StreamEvents::ToolCallDone.new( item_id: "item_123", call_id: "call_456", name: "weather_tool", arguments: '{"city":"Tokyo"}' ) event.role # => "assistant" event.item_id # => "item_123" event.call_id # => "call_456" event.name # => "weather_tool" event.arguments # => '{"city":"Tokyo"}'
Contains the complete tool call information.
ReasoningDelta
Emitted when reasoning/thinking content is streamed (OpenAI with reasoning enabled):
event = Riffer::StreamEvents::ReasoningDelta.new("Let me think about ") event.role # => "assistant" event.content # => "Let me think about "
ReasoningDone
Emitted when reasoning is complete:
event = Riffer::StreamEvents::ReasoningDone.new("Let me think about this step by step...") event.role # => "assistant" event.content # => "Let me think about this step by step..."
WebSearchStatus
Emitted during web search progress with status updates:
event = Riffer::StreamEvents::WebSearchStatus.new("searching", query: "Ruby language") event.role # => :assistant event.status # => "searching" event.query # => "Ruby language" event.url # => nil event.to_h # => {role: :assistant, status: "searching", query: "Ruby language"}
The url and query attributes are optional and only included in to_h when present. Status values include "in_progress", "searching", "completed", and "open_page".
# "open_page" status includes a url event = Riffer::StreamEvents::WebSearchStatus.new("open_page", url: "https://example.com") event.url # => "https://example.com" event.to_h # => {role: :assistant, status: "open_page", url: "https://example.com"}
WebSearchDone
Emitted when web search completes:
event = Riffer::StreamEvents::WebSearchDone.new( "Ruby language", sources: [{title: "Ruby Programming", url: "https://ruby-lang.org"}] ) event.role # => :assistant event.query # => "Ruby language" event.sources # => [{title: "Ruby Programming", url: "https://ruby-lang.org"}] event.to_h # => {role: :assistant, query: "Ruby language", sources: [...]}
Contains the search query and an array of source hashes with title and url keys.
GuardrailTripwire
Emitted when a guardrail blocks execution during streaming:
agent.stream("Hello").each do |event| case event when Riffer::StreamEvents::GuardrailTripwire puts "Blocked by: #{event.guardrail_id}" puts "Reason: #{event.reason}" puts "Phase: #{event.phase}" # :before or :after end end
See Guardrails for more information.
GuardrailModification
Emitted when a guardrail transforms data during streaming:
agent.stream("Hello").each do |event| case event when Riffer::StreamEvents::GuardrailModification puts "Modified by: #{event.guardrail_id}" puts "Phase: #{event.phase}" # :before or :after puts "Changed: #{event.message_indices}" # Array of affected indices end end
See Guardrails for more information.
Interrupt
Emitted when the agent loop is interrupted. This can happen in two ways:
-
An
on_messagecallback callsthrow :riffer_interrupt(reason is a String ornil). -
The
max_stepslimit is reached (reason is the Symbol:max_steps).
This is the streaming equivalent of Response#interrupted? in generate mode.
# Callback interrupt with a string reason event = Riffer::StreamEvents::Interrupt.new(reason: "needs approval") event.role # => :system event.reason # => "needs approval" event.to_h # => {role: :system, interrupt: true, reason: "needs approval"} # Max steps interrupt with a symbol reason event = Riffer::StreamEvents::Interrupt.new(reason: :max_steps) event.reason # => :max_steps
The reason is nil when throw :riffer_interrupt is called without a second argument.
agent.stream("Hello").each do |event| case event when Riffer::StreamEvents::Interrupt puts "Loop was interrupted: #{event.reason}" end end
After an interrupt, use resume_stream to continue the loop. See Agents - Interrupting the Agent Loop for details.
TokenUsageDone
Emitted when token usage data is available at the end of a response:
event = Riffer::StreamEvents::TokenUsageDone.new(token_usage: token_usage) event.role # => :assistant event.token_usage # => Riffer::TokenUsage event.token_usage.input_tokens # => 100 event.token_usage.output_tokens # => 50 event.token_usage.total_tokens # => 150 event.to_h # => {role: :assistant, token_usage: {input_tokens: 100, output_tokens: 50}}
Use this to track token consumption in real-time during streaming.
Streaming with Tools
When an agent uses tools during streaming, the flow is:
-
Text events stream in (
TextDelta,TextDone) -
If tool calls are present:
ToolCallDeltaevents, thenToolCallDone -
Agent executes tools internally
-
Agent sends results back to LLM
-
More text events stream in
-
Repeat until no more tool calls
Note: When
web_searchis enabled,WebSearchStatusandWebSearchDoneevents may appear before text events as the provider performs a server-side search.
agent.stream("What's the weather in Tokyo?").each do |event| case event when Riffer::StreamEvents::TextDelta print event.content when Riffer::StreamEvents::ToolCallDone puts "\n[Calling #{event.name}...]" when Riffer::StreamEvents::TextDone puts "\n" end end
Complete Example
class WeatherAgent < Riffer::Agent model 'openai/gpt-4o' instructions 'You are a weather assistant.' uses_tools [WeatherTool] end agent = WeatherAgent.new text_buffer = "" agent.stream("What's the weather in Tokyo and New York?").each do |event| case event when Riffer::StreamEvents::TextDelta print event.content text_buffer += event.content when Riffer::StreamEvents::TextDone # Final text available puts "\n---" puts "Complete response: #{event.content}" when Riffer::StreamEvents::ToolCallDelta # Could show "typing..." indicator when Riffer::StreamEvents::ToolCallDone puts "\n[Tool: #{event.name}(#{event.arguments})]" when Riffer::StreamEvents::ReasoningDelta # Show thinking process if desired print "[thinking] #{event.content}" when Riffer::StreamEvents::ReasoningDone puts "\n[reasoning complete]" when Riffer::StreamEvents::WebSearchStatus puts "[search: #{event.status}]" when Riffer::StreamEvents::WebSearchDone puts "[search complete: #{event.query}]" event.sources.each { |s| puts " - #{s[:title]}: #{s[:url]}" } when Riffer::StreamEvents::Interrupt puts "\n[interrupted]" end end
Base Class
All events inherit from Riffer::StreamEvents::Base:
class Riffer::StreamEvents::Base attr_reader :role def initialize(role: "assistant") @role = role end def to_h raise NotImplementedError end end