Skip to content

Added proper handling of streaming error responses across both Faraday V1 and V2 #273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/ruby_llm/providers/openai/streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ def build_chunk(data)
output_tokens: data.dig('usage', 'completion_tokens')
)
end

def parse_streaming_error(data)
error_data = JSON.parse(data)
return unless error_data['error']

case error_data.dig('error', 'type')
when 'server_error'
[500, error_data['error']['message']]
when 'rate_limit_exceeded', 'insufficient_quota'
[429, error_data['error']['message']]
else
[400, error_data['error']['message']]
end
end
end
end
end
Expand Down
28 changes: 23 additions & 5 deletions lib/ruby_llm/streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ def create_stream_processor(parser, buffer, &)
end
end

def process_stream_chunk(chunk, parser, _env, &)
def process_stream_chunk(chunk, parser, env, &)
RubyLLM.logger.debug "Received chunk: #{chunk}"

if error_chunk?(chunk)
handle_error_chunk(chunk, nil)
handle_error_chunk(chunk, env)
else
yield handle_sse(chunk, parser, nil, &)
yield handle_sse(chunk, parser, env, &)
end
end

Expand All @@ -88,7 +88,16 @@ def error_chunk?(chunk)
def handle_error_chunk(chunk, env)
error_data = chunk.split("\n")[1].delete_prefix('data: ')
status, _message = parse_streaming_error(error_data)
error_response = env.merge(body: JSON.parse(error_data), status: status)
parsed_data = JSON.parse(error_data)

# Create a response-like object that works for both Faraday v1 and v2
error_response = if env
env.merge(body: parsed_data, status: status)
else
# For Faraday v1, create a simple object that responds to .status and .body
Struct.new(:body, :status).new(parsed_data, status)
end

ErrorMiddleware.parse_error(provider: self, response: error_response)
rescue JSON::ParserError => e
RubyLLM.logger.debug "Failed to parse error chunk: #{e.message}"
Expand Down Expand Up @@ -122,7 +131,16 @@ def handle_data(data)

def handle_error_event(data, env)
status, _message = parse_streaming_error(data)
error_response = env.merge(body: JSON.parse(data), status: status)
parsed_data = JSON.parse(data)

# Create a response-like object that works for both Faraday v1 and v2
error_response = if env
env.merge(body: parsed_data, status: status)
else
# For Faraday v1, create a simple object that responds to .status and .body
Struct.new(:body, :status).new(parsed_data, status)
end

ErrorMiddleware.parse_error(provider: self, response: error_response)
rescue JSON::ParserError => e
RubyLLM.logger.debug "Failed to parse error event: #{e.message}"
Expand Down
80 changes: 80 additions & 0 deletions spec/ruby_llm/chat_streaming_spec.rb
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes to the spec file are quite different than our current style for spec files. We also assume that cassettes can be removed, and in fact rake vcr:record[anthropic] would remove yours. I'd suggest to mock the messages coming back from the API instead.

It would also be great to test with other providers too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've removed the VCR cassettes and replaced it with stubbed_requests.

I've also added support for other providers (other than Bedrock)

I've verified the error format used in the mocks against Anthropic and Open AI apis, so I'm reasonably confident in that mocking. I don't have access to Bedrock right now, and the error handling seems somewhat different as far as I can tell, so I haven't added support for that in the tests yet. That would ideally be done by someone with access to Bedrock.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

RSpec.describe RubyLLM::Chat do
include_context 'with configured RubyLLM'
include StreamingErrorHelpers

describe 'streaming responses' do
CHAT_MODELS.each do |model_info|
Expand Down Expand Up @@ -41,4 +42,83 @@
end
end
end

describe 'Error handling' do
CHAT_MODELS.each do |model_info|
model = model_info[:model]
provider = model_info[:provider]

context "with #{provider}/#{model}" do
let(:chat) { RubyLLM.chat(model: model, provider: provider) }

describe 'Faraday version 1' do # rubocop:disable RSpec/NestedGroups
before do
stub_const('Faraday::VERSION', '1.10.0')
end

it "#{provider}/#{model} supports handling streaming error chunks" do # rubocop:disable RSpec/ExampleLength
skip('Error handling not implemented yet') unless error_handling_supported?(provider)

stub_error_response(provider, :chunk)

chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(expected_error_for(provider))
end

it "#{provider}/#{model} supports handling streaming error events" do # rubocop:disable RSpec/ExampleLength
skip('Error handling not implemented yet') unless error_handling_supported?(provider)

stub_error_response(provider, :event)

chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(expected_error_for(provider))
end
end

describe 'Faraday version 2' do # rubocop:disable RSpec/NestedGroups
before do
stub_const('Faraday::VERSION', '2.0.0')
end

it "#{provider}/#{model} supports handling streaming error chunks" do # rubocop:disable RSpec/ExampleLength
skip('Error handling not implemented yet') unless error_handling_supported?(provider)

stub_error_response(provider, :chunk)

chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(expected_error_for(provider))
end

it "#{provider}/#{model} supports handling streaming error events" do # rubocop:disable RSpec/ExampleLength
skip('Error handling not implemented yet') unless error_handling_supported?(provider)

stub_error_response(provider, :event)

chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(expected_error_for(provider))
end
end
end
end
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
require 'fileutils'
require 'ruby_llm'
require 'webmock/rspec'
require_relative 'support/streaming_error_helpers'

# VCR Configuration
VCR.configure do |config|
Expand Down
111 changes: 111 additions & 0 deletions spec/support/streaming_error_helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# frozen_string_literal: true

module StreamingErrorHelpers
ERROR_HANDLING_CONFIGS = {
anthropic: {
url: 'https://api.anthropic.com/v1/messages',
error_response: {
type: 'error',
error: {
type: 'overloaded_error',
message: 'Overloaded'
}
},
chunk_status: 529,
expected_error: RubyLLM::OverloadedError
},
openai: {
url: 'https://api.openai.com/v1/chat/completions',
error_response: {
error: {
message: 'The server is temporarily overloaded. Please try again later.',
type: 'server_error',
param: nil,
code: nil
}
},
chunk_status: 500,
expected_error: RubyLLM::ServerError
},
gemini: {
url: 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent?alt=sse',
error_response: {
error: {
code: 529,
message: 'Service overloaded - please try again later',
status: 'RESOURCE_EXHAUSTED'
}
},
chunk_status: 529,
expected_error: RubyLLM::OverloadedError
},
deepseek: {
url: 'https://api.deepseek.com/chat/completions',
error_response: {
error: {
message: 'Service overloaded - please try again later',
type: 'server_error',
param: nil,
code: nil
}
},
chunk_status: 500,
expected_error: RubyLLM::ServerError
},
openrouter: {
url: 'https://openrouter.ai/api/v1/chat/completions',
error_response: {
error: {
message: 'Service overloaded - please try again later',
type: 'server_error',
param: nil,
code: nil
}
},
chunk_status: 500,
expected_error: RubyLLM::ServerError
},
ollama: {
url: 'http://localhost:11434/v1/chat/completions',
error_response: {
error: {
message: 'Service overloaded - please try again later',
type: 'server_error',
param: nil,
code: nil
}
},
chunk_status: 500,
expected_error: RubyLLM::ServerError
}
}.freeze

def error_handling_supported?(provider)
ERROR_HANDLING_CONFIGS.key?(provider)
end

def expected_error_for(provider)
ERROR_HANDLING_CONFIGS[provider][:expected_error]
end

def stub_error_response(provider, type)
config = ERROR_HANDLING_CONFIGS[provider]
return unless config

body = case type
when :chunk
"#{config[:error_response].to_json}\n\n"
when :event
"event: error\ndata: #{config[:error_response].to_json}\n\n"
end

status = type == :chunk ? config[:chunk_status] : 200

stub_request(:post, config[:url])
.to_return(
status: status,
body: body,
headers: { 'Content-Type' => 'text/event-stream' }
)
end
end