Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

RUBY-1813 Discard ServerSessions involved in network errors #2825

Merged
merged 4 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
# @sdam_flow_lock covers just the sdam flow. Note it does not apply
# to @topology replacements which are done under @update_lock.
@sdam_flow_lock = Mutex.new
Session::SessionPool.create(self)
@session_pool = Session::SessionPool.new(self)

if seeds.empty? && load_balanced?
raise ArgumentError, 'Load-balanced clusters with no seeds are prohibited'
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def inspect
def insert_one(document, opts = {})
QueryCache.clear_namespace(namespace)

client.send(:with_session, opts) do |session|
client.with_session(opts) do |session|
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
Expand Down
70 changes: 43 additions & 27 deletions lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/error'

module Mongo
module Operation

Expand All @@ -30,40 +32,42 @@ def do_execute(connection, context, options = {})
session&.materialize_if_needed
unpin_maybe(session, connection) do
add_error_labels(connection, context) do
add_server_diagnostics(connection) do
get_result(connection, context, options).tap do |result|
if session
if session.in_transaction? &&
connection.description.load_balancer?
then
if session.pinned_connection_global_id
unless session.pinned_connection_global_id == connection.global_id
raise(
Error::InternalDriverError,
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
)
check_for_network_error do
add_server_diagnostics(connection) do
get_result(connection, context, options).tap do |result|
if session
if session.in_transaction? &&
connection.description.load_balancer?
then
if session.pinned_connection_global_id
unless session.pinned_connection_global_id == connection.global_id
raise(
Error::InternalDriverError,
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
)
end
else
session.pin_to_connection(connection.global_id)
connection.pin
end
else
session.pin_to_connection(connection.global_id)
connection.pin
end
end

if session.snapshot? && !session.snapshot_timestamp
session.snapshot_timestamp = result.snapshot_timestamp
if session.snapshot? && !session.snapshot_timestamp
session.snapshot_timestamp = result.snapshot_timestamp
end
end
end

if result.has_cursor_id? &&
connection.description.load_balancer?
then
if result.cursor_id == 0
connection.unpin
else
connection.pin
if result.has_cursor_id? &&
connection.description.load_balancer?
then
if result.cursor_id == 0
connection.unpin
else
connection.pin
end
end
process_result(result, connection)
end
process_result(result, connection)
end
end
end
Expand Down Expand Up @@ -144,6 +148,18 @@ def process_result_for_sdam(result, connection)
connection.server.scan_semaphore.signal
end
end

NETWORK_ERRORS = [
Error::SocketError,
Error::SocketTimeoutError
].freeze

def check_for_network_error
yield
rescue *NETWORK_ERRORS
session&.dirty!
raise
end
end
end
end
48 changes: 23 additions & 25 deletions lib/mongo/operation/shared/response_handling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,33 @@ def validate_result(result, connection, context)
# the operation is performed.
# @param [ Mongo::Operation::Context ] context The operation context.
def add_error_labels(connection, context)
begin
yield
rescue Mongo::Error::SocketError => e
if context.in_transaction? && !context.committing_transaction?
e.add_label('TransientTransactionError')
end
if context.committing_transaction?
e.add_label('UnknownTransactionCommitResult')
end
yield
rescue Mongo::Error::SocketError => e
if context.in_transaction? && !context.committing_transaction?
e.add_label('TransientTransactionError')
end
if context.committing_transaction?
e.add_label('UnknownTransactionCommitResult')
end

maybe_add_retryable_write_error_label!(e, connection, context)

raise e
rescue Mongo::Error::SocketTimeoutError => e
maybe_add_retryable_write_error_label!(e, connection, context)
raise e
rescue Mongo::Error::OperationFailure => e
if context.committing_transaction?
if e.write_retryable? || e.wtimeout? || (e.write_concern_error? &&
!Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code)
) || e.max_time_ms_expired?
e.add_label('UnknownTransactionCommitResult')
end
maybe_add_retryable_write_error_label!(e, connection, context)

raise e
rescue Mongo::Error::SocketTimeoutError => e
maybe_add_retryable_write_error_label!(e, connection, context)
raise e
rescue Mongo::Error::OperationFailure => e
if context.committing_transaction?
if e.write_retryable? || e.wtimeout? || (e.write_concern_error? &&
!Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code)
) || e.max_time_ms_expired?
e.add_label('UnknownTransactionCommitResult')
end
end

maybe_add_retryable_write_error_label!(e, connection, context)
maybe_add_retryable_write_error_label!(e, connection, context)

raise e
end
raise e
end

# Unpins the session and/or the connection if the yielded to block
Expand Down
17 changes: 17 additions & 0 deletions lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ def snapshot?
# @since 2.5.0
attr_reader :operation_time

# Sets the dirty state to the given value for the underlying server
# session. If there is no server session, this does nothing.
#
# @param [ true | false ] mark whether to mark the server session as
# dirty, or not.
def dirty!(mark = true)
@server_session&.dirty!(mark)
end

# @return [ true | false | nil ] whether the underlying server session is
# dirty. If no server session exists for this session, returns nil.
#
# @api private
def dirty?
@server_session&.dirty?
end

# @return [ Hash ] The options for the transaction currently being executed
# on this session.
#
Expand Down
3 changes: 3 additions & 0 deletions lib/mongo/session/server_session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/session/server_session/dirtyable'

module Mongo

class Session
Expand All @@ -25,6 +27,7 @@ class Session
#
# @since 2.5.0
class ServerSession
include Dirtyable

# Regex for removing dashes from the UUID string.
#
Expand Down
53 changes: 53 additions & 0 deletions lib/mongo/session/server_session/dirtyable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

# Copyright (C) 2024 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Session
class ServerSession
# Functionality for manipulating and querying a session's
# "dirty" state, per the last paragraph at
# https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#server-session-pool
#
# If a driver has a server session pool and a network error is
# encountered when executing any command with a ClientSession, the
# driver MUST mark the associated ServerSession as dirty. Dirty server
# sessions are discarded when returned to the server session pool. It is
# valid for a dirty session to be used for subsequent commands (e.g. an
# implicit retry attempt, a later command in a bulk write, or a later
# operation on an explicit session), however, it MUST remain dirty for
# the remainder of its lifetime regardless if later commands succeed.
#
# @api private
module Dirtyable
# Query whether the server session has been marked dirty or not.
#
# @return [ true | false ] the server session's dirty state
def dirty?
@dirty
end

# Mark the server session as dirty (the default) or clean.
#
# @param [ true | false ] mark whether the mark the server session
# #dirty
# or not.
def dirty!(mark = true)
@dirty = mark
end
end
end
end
end
30 changes: 12 additions & 18 deletions lib/mongo/session/session_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,6 @@ class Session
#
# @since 2.5.0
class SessionPool

# Create a SessionPool.
#
# @example
# SessionPool.create(cluster)
#
# @param [ Mongo::Cluster ] cluster The cluster that will be associated with this
# session pool.
#
# @since 2.5.0
def self.create(cluster)
pool = new(cluster)
cluster.instance_variable_set(:@session_pool, pool)
end

# Initialize a SessionPool.
#
# @example
Expand Down Expand Up @@ -105,9 +90,7 @@ def checkin(session)

@mutex.synchronize do
prune!
unless about_to_expire?(session)
@queue.unshift(session)
end
@queue.unshift(session) if return_to_queue?(session)
end
end

Expand Down Expand Up @@ -136,6 +119,17 @@ def end_sessions

private

# Query whether the given session is okay to return to the
# pool's queue.
#
# @param [ Session::ServerSession ] session the session to query
#
# @return [ true | false ] whether to return the session to the
# queue.
def return_to_queue?(session)
!session.dirty? && !about_to_expire?(session)
end

def about_to_expire?(session)
if session.nil?
raise ArgumentError, 'session cannot be nil'
Expand Down
17 changes: 1 addition & 16 deletions spec/mongo/session/session_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@
end
end

describe '.create' do

let!(:pool) do
described_class.create(cluster)
end

it 'creates a session pool' do
expect(pool).to be_a(Mongo::Session::SessionPool)
end

it 'adds the pool as an instance variable on the cluster' do
expect(cluster.session_pool).to eq(pool)
end
end

describe '#initialize' do

let(:pool) do
Expand Down Expand Up @@ -181,7 +166,7 @@
describe '#end_sessions' do

let(:pool) do
described_class.create(client.cluster)
client.cluster.session_pool
end

let!(:session_a) do
Expand Down
8 changes: 3 additions & 5 deletions spec/runners/unified/support_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,15 @@ def assert_session_dirty(op)
consume_test_runner(op)
use_arguments(op) do |args|
session = entities.get(:session, args.use!('session'))
# https://jira.mongodb.org/browse/RUBY-1813
true
session.dirty? || raise(Error::ResultMismatch, 'expected session to be dirty')
end
end

def assert_session_not_dirty(op)
consume_test_runner(op)
use_arguments(op) do |args|
session = entities.get(:session, args.use!('session'))
# https://jira.mongodb.org/browse/RUBY-1813
true
session.dirty? && raise(Error::ResultMismatch, 'expected session to be not dirty')
end
end

Expand All @@ -92,7 +90,7 @@ def assert_same_lsid_on_last_two_commands(op, expected: true)
unless subscriber.started_events.length >= 2
raise Error::ResultMismatch, "Must have at least 2 events, have #{subscriber.started_events.length}"
end
lsids = subscriber.started_events[-2...-1].map do |cmd|
lsids = subscriber.started_events[-2..-1].map do |cmd|
cmd.command.fetch('lsid')
end
if expected
Expand Down
Loading
Loading