Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions gemfiles/open_telemetry.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# rubocop:todo all
source "https://rubygems.org"
gemspec path: '..'

gem 'opentelemetry-sdk'

require_relative './standard'

standard_dependencies
1 change: 1 addition & 0 deletions lib/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
require 'mongo/monitoring'
require 'mongo/logger'
require 'mongo/retryable'
require 'mongo/open_telemetry'
require 'mongo/operation'
require 'mongo/error'
require 'mongo/event'
Expand Down
39 changes: 27 additions & 12 deletions lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,35 @@ def select_cursor(session)
operation_timeouts: operation_timeouts,
view: self
)

if respond_to?(:write?, true) && write?
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
op = initial_query_op(session)
operation_name = op.is_a?(Mongo::Operation::Find) ? 'find' : 'explain'
attrs = {
'db.system' => 'mongodb',
'db.namespace' => op.spec[:db_name],
'db.collection.name' => op.spec[:coll_name],
'db.operation.name' => 'find',
'db.operation.summary' => "find #{op.spec[:coll_name]}"
}
context.tracer.in_span("find #{op.spec[:coll_name]}", attrs) do |span, ctx|
context.current_span = span
context.current_context = ctx
if respond_to?(:write?, true) && write?
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
else
Cursor.new(view, result, server, session: session, context: context)
end
else
Cursor.new(view, result, server, session: session, context: context)
end
else
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
send_initial_query(server, context)
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
send_initial_query(server, context)
end
end
ensure
context.current_span = nil
context.current_context = nil
end
end

Expand Down
15 changes: 11 additions & 4 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,17 @@ def unregister

def execute_operation(op, context: nil)
op_context = context || possibly_refreshed_context
if @connection.nil?
op.execute(@server, context: op_context)
else
op.execute_with_connection(@connection, context: op_context)
op_context.tracer.in_span('getMore', {}) do |span, ctx|
op_context.current_span = span
op_context.current_context = ctx
if @connection.nil?
op.execute(@server, context: op_context)
else
op.execute_with_connection(@connection, context: op_context)
end
ensure
op_context.current_span = nil
op_context.current_context = nil
end
end

Expand Down
29 changes: 29 additions & 0 deletions lib/mongo/open_telemetry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

# Copyright (C) 2015-present MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
# Container class for OpenTelemetry functionality.
#
# @api private
class OpenTelemetry
def self.tracer
@tracer ||= Tracer.new
end
end
end

require 'mongo/open_telemetry/statement_builder'
require 'mongo/open_telemetry/tracer'
48 changes: 48 additions & 0 deletions lib/mongo/open_telemetry/statement_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

#
# Copyright (C) 2015-present MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class OpenTelemetry
# This class is used to build +db.statement+ attribute for an OpenTelemetry span
# from a MongoDB command.
class StatementBuilder
# @param [ BSON::Document ] command The message that will be
# sent to the server.
def initialize(command)
@command = command
@command_name, @collection = command.first
end

# Builds the statement.
#
# @return [ String ] The statement as a JSON string.
def build
statement.to_json.freeze unless statement.empty?
end

private

def statement
mask(@command)
end

def mask(hash)
hash.reject { |k, v| Mongo::Protocol::Msg::INTERNAL_KEYS.include?(k.to_s) }
end
end
end
end
146 changes: 146 additions & 0 deletions lib/mongo/open_telemetry/tracer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# frozen_string_literal: true

# Copyright (C) 2015-present MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class OpenTelemetry
# This is a wrapper around OpenTelemetry tracer that provides a convenient
# interface for creating spans.
class Tracer
# Environment variable that enables otel instrumentation.
ENV_VARIABLE_DISABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED'

# Environment variable that controls the db.statement attribute.
ENV_VARIABLE_QUERY_TEXT = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT'

# Name of the tracer.
OTEL_TRACER_NAME = 'mongo-ruby-driver'

# @return [ OpenTelemetry::SDK::Trace::Tracer | nil ] The otel tracer.
attr_reader :ot_tracer

def initialize
return unless defined?(::OpenTelemetry)
return if %w[ 1 yes true ].include?(ENV[ENV_VARIABLE_DISABLED])

@ot_tracer = ::OpenTelemetry.tracer_provider.tracer(
OTEL_TRACER_NAME,
Mongo::VERSION
)
end

def start_span(command, address, parent)
if enabled?
attributes = build_attributes(command, address)
@ot_tracer.start_span(
span_name(command),
with_parent: parent,
attributes: attributes
)
end
end

def in_span(name, attributes = {}, &block)
if enabled?
@ot_tracer.in_span(name, attributes: attributes, kind: :client, &block)
else
yield
end
end

def add_attributes_from_command(span, command, address)
return unless enabled?

span&.add_attributes(build_attributes(command, address))
end

def add_query_text(span, message)
return unless enabled? && query_text?

span&.add_attributes(
'db.query.text' => StatementBuilder.new(message.payload[:command]).build
)
end

# @param [ OpenTelemetry::Trace::Span | nil ] span
# @param [ Mongo::Operation::Result ] result
def add_attributes_from_result(span, result)
return unless enabled?

if result.successful?
if (cursor_id = result.cursor_id).positive?
span&.add_attributes(
'db.mongodb.cursor_id' => cursor_id
)
end
else
span&.record_exception(result.error)
end
end

def add_event(span, name, attributes = {})
end

private

# @return [ true, false ] Whether otel instrumentation is enabled.
def enabled?
@ot_tracer != nil
end

def query_text?
%w[ 1 yes true ].include?(ENV[ENV_VARIABLE_QUERY_TEXT])
end

# @return [ String ] The name of the span.
def span_name(command)
collection = collection_name(command)
command_name = command.keys.first
if collection
"#{collection}.#{command_name}"
else
command_name
end
end

# @return [ Hash ] The attributes of the span.
def build_attributes(command, address)
command_name = command.keys.first
{
'db.system' => 'mongodb',
'db.namespace' => command['$db'],
'db.command.name' => command_name,
'server.port' => address.port,
'net.peer.port' => address.port,
'server.address' => address.host,
'net.peer.address' => address.host,
'db.query.summary' => span_name(command)
}.tap do |attributes|
if (coll_name = collection_name(command))
attributes['db.collection.name'] = coll_name
end
if command_name == 'getMore'
attributes['db.mongodb.cursor_id'] = command[command_name].value
end
end
end

# @return [ String | nil] Name of collection the operation is executed on.
def collection_name(command)
command.values.first if command.values.first.is_a?(String)
end
end
end
end
5 changes: 5 additions & 0 deletions lib/mongo/operation/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def initialize(
connection_global_id: nil,
operation_timeouts: {},
view: nil,
tracer: OpenTelemetry.tracer,
options: nil
)
if options
Expand All @@ -61,6 +62,7 @@ def initialize(
@session = session
@view = view
@connection_global_id = connection_global_id
@tracer = tracer
@options = options
super(session: session, operation_timeouts: operation_timeouts)
end
Expand All @@ -69,6 +71,9 @@ def initialize(
attr_reader :session
attr_reader :view
attr_reader :options
attr_reader :tracer

attr_accessor :current_span, :current_context

# Returns a new Operation::Context with the deadline refreshed
# and relative to the current moment.
Expand Down
7 changes: 5 additions & 2 deletions lib/mongo/operation/insert/op_msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ class OpMsg < OpMsgBase
private

def get_result(connection, context, options = {})
# This is a Mongo::Operation::Insert::Result
Result.new(*dispatch_message(connection, context), @ids, context: context)
context.tracer.trace_message(command(connection), connection.address) do |span|
# This is a Mongo::Operation::Insert::Result
Result.new(*dispatch_message(connection, context, span), @ids, context: context)
context.tracer.add_attributes_from_result(span, result)
end
end

def selector(connection)
Expand Down
13 changes: 10 additions & 3 deletions lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def do_execute(connection, context, options = {})
# reasonable to refactor things so this saved reference is used instead.
@context = context

session&.materialize_if_needed
unpin_maybe(session, connection) do
add_error_labels(connection, context) do
check_for_network_error do
Expand Down Expand Up @@ -92,8 +91,15 @@ def execute(connection, context:, options: {})
end
end

do_execute(connection, context, options).tap do |result|
validate_result(result, connection, context)
session&.materialize_if_needed
begin
span = context.tracer.start_span(command(connection), connection.address, context.current_context)
do_execute(connection, context, options).tap do |result|
context.tracer.add_attributes_from_result(span, result)
validate_result(result, connection, context)
end
ensure
span&.finish
end
end

Expand All @@ -111,6 +117,7 @@ def get_result(connection, context, options = {})
def dispatch_message(connection, context, options = {})
message = build_message(connection, context)
message = message.maybe_encrypt(connection, context)
context.tracer.add_query_text(context.current_span, message)
reply = connection.dispatch([ message ], context, options)
[reply, connection.description, connection.global_id]
end
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def modern_read_with_retry(session, server_selector, context, &block)
session,
timeout: context&.remaining_timeout_sec
)
context&.tracer&.add_event(context&.current_span, 'Server Selected')
yield server
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
e.add_notes('modern retry', 'attempt 1')
Expand Down
Loading
Loading