diff --git a/lib/mongo/read_preference.ex b/lib/mongo/read_preference.ex index 3671f5e..fb3ea13 100644 --- a/lib/mongo/read_preference.ex +++ b/lib/mongo/read_preference.ex @@ -141,4 +141,7 @@ defmodule Mongo.ReadPreference do filter_nils(read_preference) end + + def to_topology_single_type({_, %{replica?: true} = _server_description}), do: %{mode: :primaryPreferred} + def to_topology_single_type(_), do: nil end diff --git a/lib/mongo/server_description.ex b/lib/mongo/server_description.ex index ece27c1..3d74969 100644 --- a/lib/mongo/server_description.ex +++ b/lib/mongo/server_description.ex @@ -41,7 +41,8 @@ defmodule Mongo.ServerDescription do compression: [compressor_types], read_only: boolean(), logical_session_timeout: non_neg_integer, - supports_retryable_writes: boolean() + supports_retryable_writes: boolean(), + replica?: boolean() } @empty %{ @@ -69,7 +70,8 @@ defmodule Mongo.ServerDescription do compression: [], read_only: false, logical_session_timeout: 30, - support_retryable_writes: false + support_retryable_writes: false, + replica?: false } def new() do @@ -115,7 +117,8 @@ defmodule Mongo.ServerDescription do compression: map_compressors(hello_response["compression"]), read_only: hello_response["readOnly"] || false, logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30, - supports_retryable_writes: supports_retryable_writes + supports_retryable_writes: supports_retryable_writes, + replica?: replica?(server_type) } end @@ -147,7 +150,8 @@ defmodule Mongo.ServerDescription do compression: map_compressors(hello_response["compression"]), read_only: hello_response["readOnly"] || false, logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30, - supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil + supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil, + replica?: replica?(server_type) } end @@ -187,4 +191,8 @@ defmodule Mongo.ServerDescription do [:zlib] end end + + defp replica?(server_type) do + server_type in [:rs_primary, :rs_secondary, :rs_arbiter, :rs_other, :rs_ghost] + end end diff --git a/lib/mongo/topology_description.ex b/lib/mongo/topology_description.ex index 180f536..a0ddc8f 100644 --- a/lib/mongo/topology_description.ex +++ b/lib/mongo/topology_description.ex @@ -123,19 +123,20 @@ defmodule Mongo.TopologyDescription do |> Keyword.get(:read_preference) |> ReadPreference.merge_defaults() - {servers, read_prefs} = + {server, read_prefs} = case topology.type do :unknown -> - {[], nil} + {nil, nil} :single -> - {topology.servers, nil} + server = pick_server(topology.servers) + {server, ReadPreference.to_topology_single_type(server)} :sharded -> - {mongos_servers(topology), ReadPreference.to_mongos(read_preference)} + {topology |> mongos_servers() |> pick_server(), ReadPreference.to_mongos(read_preference)} _other -> - {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.to_replica_set(read_preference)} + {topology |> select_replica_set_server(read_preference.mode, read_preference) |> pick_server(), ReadPreference.to_replica_set(read_preference)} end opts = @@ -147,17 +148,12 @@ defmodule Mongo.TopologyDescription do Keyword.put(opts, :read_preference, prefs) end - server = - servers - |> Enum.take_random(1) - |> Enum.map(fn {server, description} -> {server, description.compression} end) - case server do - [] -> + nil -> :empty - [{addr, compression}] -> - {:ok, {addr, merge_compression(opts, compression)}} + {addr, server_description} -> + {:ok, {addr, merge_compression(opts, server_description.compression)}} end end @@ -182,6 +178,13 @@ defmodule Mongo.TopologyDescription do end end + defp pick_server(servers) do + case Enum.take_random(servers, 1) do + [] -> nil + [server] -> server + end + end + defp mongos_servers(%{:servers => servers}) do Enum.filter(servers, fn {_, server} -> server.type == :mongos end) end diff --git a/test/mongo/topology_description_test.exs b/test/mongo/topology_description_test.exs index 2f44874..2b1d04a 100644 --- a/test/mongo/topology_description_test.exs +++ b/test/mongo/topology_description_test.exs @@ -137,4 +137,12 @@ defmodule Mongo.TopologyDescriptionTest do assert :single = TopologyDescription.get_type(opts) end + + test "Set read_preference to :primaryPreferred when topology is single and server is replica set" do + assert {:ok, {_, opts}} = TopologyDescription.select_servers(single(), :read, []) + assert nil == Keyword.get(opts, :read_preference) + + assert {:ok, {_, opts}} = TopologyDescription.select_servers(single_with_repl_set(), :read, []) + assert :primaryPreferred = Keyword.get(opts, :read_preference) |> Map.get(:mode) + end end diff --git a/test/support/topology_test_data.ex b/test/support/topology_test_data.ex index e22ef3a..58bb963 100644 --- a/test/support/topology_test_data.ex +++ b/test/support/topology_test_data.ex @@ -30,7 +30,43 @@ defmodule Mongo.TopologyTestData do set_version: nil, tag_set: %{}, type: :standalone, - compression: [] + compression: [], + replica?: false + } + } + } + + def single_with_repl_set(), + do: %{ + set_name: nil, + type: :single, + compatibility_error: nil, + compatible: true, + local_threshold_ms: 15, + max_election_id: nil, + max_set_version: nil, + servers: %{ + "localhost:27017" => %{ + address: "localhost:27017", + arbiters: [], + election_id: nil, + error: nil, + hosts: [], + last_update_time: nil, + last_write_date: nil, + max_wire_version: 4, + me: nil, + min_wire_version: 0, + op_time: nil, + passives: [], + primary: nil, + round_trip_time: 44, + set_name: nil, + set_version: nil, + tag_set: %{}, + type: :standalone, + compression: [], + replica?: true } } } @@ -64,7 +100,8 @@ defmodule Mongo.TopologyTestData do set_version: nil, tag_set: %{}, type: :mongos, - compression: [] + compression: [], + replica?: false } } } @@ -102,7 +139,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + replica?: true }, "localhost:27019" => %{ address: "localhost:27019", @@ -127,7 +165,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + replica?: true }, "localhost:27020" => %{ address: "localhost:27020", @@ -152,7 +191,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + replica?: true } } } @@ -186,7 +226,8 @@ defmodule Mongo.TopologyTestData do tag_set: %{}, type: :unknown, hosts: [], - compression: [] + compression: [], + replica?: true }, "localhost:27019" => %{ address: "localhost:27019", @@ -211,7 +252,8 @@ defmodule Mongo.TopologyTestData do "localhost:27019", "localhost:27020" ], - compression: [] + compression: [], + replica?: true }, "localhost:27020" => %{ address: "localhost:27020", @@ -236,7 +278,8 @@ defmodule Mongo.TopologyTestData do "localhost:27019", "localhost:27020" ], - compression: [] + compression: [], + replica?: true } } } @@ -270,7 +313,8 @@ defmodule Mongo.TopologyTestData do tag_set: %{}, type: :rs_primary, hosts: ["localhost:27018"], - compression: [] + compression: [], + replica?: true } } }