Skip to content

Commit 9ee629c

Browse files
committed
RDBC-833 Implement session.advanced.get_current_session_node
1 parent 0ea9979 commit 9ee629c

File tree

4 files changed

+60
-27
lines changed

4 files changed

+60
-27
lines changed

ravendb/documents/session/document_session.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
from ravendb.documents.operations.lazy.definition import LazyOperation
103103
from ravendb.http.request_executor import RequestExecutor
104104
from ravendb.documents.store.definition import DocumentStore
105+
from ravendb.http.server_node import ServerNode
105106

106107
_T = TypeVar("_T")
107108
_TIndex = TypeVar("_TIndex", bound=AbstractCommonApiForIndexes)
@@ -759,6 +760,9 @@ def refresh(self, entity: object) -> object:
759760
def raw_query(self, query: str, object_type: Optional[Type[_T]] = None) -> RawDocumentQuery[_T]:
760761
return RawDocumentQuery(object_type, self._session, query)
761762

763+
def get_current_session_node(self) -> ServerNode:
764+
return self.session_info.get_current_session_node(self.request_executor)
765+
762766
@property
763767
def lazily(self) -> LazySessionOperations:
764768
return self._session._lazily

ravendb/documents/session/misc.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@
66
from enum import Enum
77
from typing import Union, Optional, TYPE_CHECKING, List, Dict, Generic, TypeVar
88

9-
from ravendb.http.misc import LoadBalanceBehavior
9+
from ravendb.http.misc import LoadBalanceBehavior, ReadBalanceBehavior
1010

1111
if TYPE_CHECKING:
1212
from ravendb.http.request_executor import RequestExecutor
13-
from ravendb.documents.session.query import Query
13+
from ravendb.documents.queries.misc import Query
1414
from ravendb.documents.session.operations.query import QueryOperation
1515
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
1616
InMemoryDocumentSessionOperations,
1717
)
18-
from ravendb.documents import DocumentStore
18+
from ravendb.documents.store.definition import DocumentStore
19+
from ravendb.http.server_node import ServerNode
1920

2021
_T_Key = TypeVar("_T_Key")
2122
_T_Value = TypeVar("_T_Value")
@@ -111,6 +112,25 @@ def _set_context_internal(self, session_id: str) -> None:
111112
def increment_request_count(self) -> None:
112113
self._session.increment_requests_count()
113114

115+
def get_current_session_node(self, request_executor: RequestExecutor) -> ServerNode:
116+
if request_executor.conventions.load_balance_behavior == LoadBalanceBehavior.USE_SESSION_CONTEXT:
117+
if self._can_use_load_balance_behavior:
118+
result = request_executor.get_node_by_session_id(self.session_id)
119+
return result.current_node
120+
121+
read_balance_behavior = request_executor.conventions.read_balance_behavior
122+
123+
if read_balance_behavior == ReadBalanceBehavior.NONE:
124+
result = request_executor.preferred_node
125+
elif read_balance_behavior == ReadBalanceBehavior.ROUND_ROBIN:
126+
result = request_executor.get_node_by_session_id(self.session_id)
127+
elif read_balance_behavior == ReadBalanceBehavior.FASTEST_NODE:
128+
result = request_executor.get_fastest_node()
129+
else:
130+
raise ValueError(f"Unsupported read balance behavior '{str(read_balance_behavior)}'")
131+
132+
return result.current_node
133+
114134

115135
class SessionOptions:
116136
def __init__(

ravendb/http/request_executor.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,11 @@ def default_timeout(self, value: datetime.timedelta) -> None:
197197

198198
@property
199199
def preferred_node(self) -> CurrentIndexAndNode:
200-
self.__ensure_node_selector()
200+
self._ensure_node_selector()
201201
return self._node_selector.get_preferred_node()
202202

203203
def get_requested_node(self, node_tag: str, throw_if_contains_failures: bool = False) -> CurrentIndexAndNode:
204-
self.__ensure_node_selector()
204+
self._ensure_node_selector()
205205
current_index_and_node = self._node_selector.get_requested_node(node_tag)
206206

207207
if throw_if_contains_failures and not self._node_selector.node_is_available(
@@ -213,6 +213,16 @@ def get_requested_node(self, node_tag: str, throw_if_contains_failures: bool = F
213213

214214
return current_index_and_node
215215

216+
def get_fastest_node(self) -> CurrentIndexAndNode:
217+
self._ensure_node_selector()
218+
219+
return self._node_selector.get_fastest_node()
220+
221+
def get_node_by_session_id(self, session_id: int = None) -> CurrentIndexAndNode:
222+
self._ensure_node_selector()
223+
224+
return self._node_selector.get_node_by_session_id(session_id)
225+
216226
def __on_failed_request_invoke(self, url: str, e: Exception):
217227
for event in self.__on_failed_request:
218228
event(FailedRequestEventArgs(self._database_name, url, e, None, None))
@@ -1229,7 +1239,7 @@ def __init__(self, index: int, response: requests.Response):
12291239
def client_configuration_etag(self, value):
12301240
self._client_configuration_etag = value
12311241

1232-
def __ensure_node_selector(self) -> None:
1242+
def _ensure_node_selector(self) -> None:
12331243
if not self._disable_topology_updates:
12341244
self.__wait_for_topology_update(self._first_topology_update_task)
12351245

ravendb/http/topology.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,19 +96,19 @@ def __exit__(self, exc_type, exc_val, exc_tb):
9696
self.__update_fastest_node_timer.__exit__()
9797

9898
def __init__(self, topology: Topology, thread_pool: ThreadPoolExecutor):
99-
self.__state = self.__NodeSelectorState(topology)
99+
self._state = self.__NodeSelectorState(topology)
100100
self.__update_fastest_node_timer: Union[None, time] = None
101101
self.__thread_pool_executor = thread_pool
102102

103103
@property
104104
def topology(self) -> Topology:
105-
return self.__state.topology
105+
return self._state.topology
106106

107107
def node_is_available(self, index: int) -> bool:
108-
return self.__state.failures[index] == 0
108+
return self._state.failures[index] == 0
109109

110110
def on_failed_request(self, node_index: int) -> None:
111-
state = self.__state
111+
state = self._state
112112
if node_index < 0 or node_index >= len(state.failures):
113113
return
114114
state.failures[node_index] += 1
@@ -117,19 +117,19 @@ def on_update_topology(self, topology: Topology, force_update: bool = False) ->
117117
if topology is None:
118118
return False
119119

120-
state_etag = self.__state.topology.etag if self.__state.topology.etag else 0
120+
state_etag = self._state.topology.etag if self._state.topology.etag else 0
121121
topology_etag = topology.etag if topology.etag else 0
122122

123123
if state_etag >= topology_etag and not force_update:
124124
return False
125125

126126
state = NodeSelector.__NodeSelectorState(topology)
127-
self.__state = state
127+
self._state = state
128128

129129
return True
130130

131131
def get_requested_node(self, node_tag: str) -> CurrentIndexAndNode:
132-
state = self.__state
132+
state = self._state
133133
server_nodes = state.nodes
134134
for i in range(len(server_nodes)):
135135
if server_nodes[i].cluster_tag == node_tag:
@@ -140,7 +140,7 @@ def get_requested_node(self, node_tag: str) -> CurrentIndexAndNode:
140140
raise RequestedNodeUnavailableException(f"Could not find requested node {node_tag}")
141141

142142
def get_preferred_node(self) -> CurrentIndexAndNode:
143-
state = self.__state
143+
state = self._state
144144
return self.get_preferred_node_internal(state)
145145

146146
@classmethod
@@ -154,7 +154,7 @@ def get_preferred_node_internal(cls, state: NodeSelector.__NodeSelectorState) ->
154154
return cls.unlikely_everyone_faulted_choice(state)
155155

156156
def get_preferred_node_with_topology(self) -> CurrentIndexAndNodeAndEtag:
157-
state = self.__state
157+
state = self._state
158158
preferred_node = self.get_preferred_node_internal(state)
159159
etag = (state.topology.etag if state.topology.etag else -2) if state.topology else -2
160160
return CurrentIndexAndNodeAndEtag(preferred_node.current_index, preferred_node.current_node, etag)
@@ -168,7 +168,7 @@ def unlikely_everyone_faulted_choice(state: NodeSelector.__NodeSelectorState) ->
168168
return state.node_when_everyone_marked_as_faulted
169169

170170
def get_node_by_session_id(self, session_id: int) -> CurrentIndexAndNode:
171-
state = self.__state
171+
state = self._state
172172
if len(state.topology.nodes) == 0:
173173
raise AllTopologyNodesDownException("There are no nodes in the topology at all")
174174
index = abs(session_id % len(state.topology.nodes))
@@ -183,19 +183,19 @@ def get_node_by_session_id(self, session_id: int) -> CurrentIndexAndNode:
183183
return self.get_preferred_node()
184184

185185
def get_fastest_node(self) -> CurrentIndexAndNode:
186-
state = self.__state
186+
state = self._state
187187
if state.failures[state.fastest] == 0 and state.nodes[state.fastest].server_role == ServerNode.Role.MEMBER:
188188
return CurrentIndexAndNode(state.fastest, state.nodes[state.fastest])
189189

190190
# if the fastest node has failures, we'll immediately schedule
191191
# another run of finding who the fastest node is, in the mantime
192192
# we'll just use the server preferred node or failover as usual
193193

194-
self.__switch_to_speed_test_phase()
194+
self._switch_to_speed_test_phase()
195195
return self.get_preferred_node()
196196

197197
def restore_node_index(self, node_index: int) -> None:
198-
state = self.__state
198+
state = self._state
199199
if len(state.failures) <= node_index:
200200
return
201201

@@ -205,24 +205,23 @@ def restore_node_index(self, node_index: int) -> None:
205205
def _throw_empty_topology() -> None:
206206
raise RuntimeError("Empty database topology, this shouldn't happen.")
207207

208-
def __switch_to_speed_test_phase(self) -> None:
209-
state = self.__state
208+
def _switch_to_speed_test_phase(self) -> None:
209+
state = self._state
210210

211211
if not state.speed_test_mode == 0:
212212
state.speed_test_mode = 1
213213
return
214214

215-
for i in state.fastest_records:
216-
i = 0
215+
state.fastest_records = len(state.fastest_records) * [0]
217216

218217
state.speed_test_mode += 1
219218

220219
@property
221220
def in_speed_test_phase(self) -> bool:
222-
return self.__state.speed_test_mode > 1
221+
return self._state.speed_test_mode > 1
223222

224223
def record_fastest(self, index: int, node: ServerNode) -> None:
225-
state = self.__state
224+
state = self._state
226225
state_fastest = state.fastest_records
227226

228227
# the following two checks are to verify that things didn't move
@@ -273,11 +272,11 @@ def __select_fastest(self, state: NodeSelector.__NodeSelectorState, index: int)
273272

274273
else:
275274
self.__update_fastest_node_timer = threading.Timer(
276-
datetime.timedelta(minutes=1), self.__switch_to_speed_test_phase
275+
datetime.timedelta(minutes=1), self._switch_to_speed_test_phase
277276
)
278277

279278
def schedule_speed_test(self) -> None:
280-
self.__switch_to_speed_test_phase()
279+
self._switch_to_speed_test_phase()
281280

282281

283282
class CurrentIndexAndNode:

0 commit comments

Comments
 (0)