From 8232d7e6e0c8ef6f6a1cb13dadabcae34facbef2 Mon Sep 17 00:00:00 2001
From: girolamo-giordano <girolamo.giordano1998@gmail.com>
Date: Thu, 5 Dec 2024 23:35:00 +0100
Subject: [PATCH] add custom sleep parameter to async_bulk and
 async_streaming_bulk

---
 elasticsearch/_async/helpers.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py
index 1bc339917..94f03e0f2 100644
--- a/elasticsearch/_async/helpers.py
+++ b/elasticsearch/_async/helpers.py
@@ -21,6 +21,7 @@
     Any,
     AsyncIterable,
     AsyncIterator,
+    Awaitable,
     Callable,
     Collection,
     Dict,
@@ -167,6 +168,7 @@ async def async_streaming_bulk(
     expand_action_callback: Callable[
         [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
     ] = expand_action,
+    sleep: Callable[[float],Awaitable[None]] = asyncio.sleep,
     raise_on_exception: bool = True,
     max_retries: int = 0,
     initial_backoff: float = 2,
@@ -202,6 +204,7 @@ async def async_streaming_bulk(
     :arg expand_action_callback: callback executed on each action passed in,
         should return a tuple containing the action line and the data line
         (`None` if data line should be omitted).
+    :arg sleep: custom callable defined for custom action on cancelling
     :arg retry_on_status: HTTP status code that will trigger a retry.
         (if `None` is specified only status 429 will retry).
     :arg max_retries: maximum number of times a document will be retried when
@@ -246,7 +249,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
                 ]
             ] = []
             if attempt:
-                await asyncio.sleep(
+                await sleep(
                     min(max_backoff, initial_backoff * 2 ** (attempt - 1))
                 )
 
@@ -304,6 +307,7 @@ async def async_bulk(
     client: AsyncElasticsearch,
     actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
     stats_only: bool = False,
+    sleep: Callable[[float],Awaitable[None]] = asyncio.sleep,
     ignore_status: Union[int, Collection[int]] = (),
     *args: Any,
     **kwargs: Any,
@@ -329,6 +333,7 @@ async def async_bulk(
     :arg actions: iterator containing the actions
     :arg stats_only: if `True` only report number of successful/failed
         operations instead of just number of successful and a list of error responses
+    :arg sleep: custom callable defined for custom action on cancelling
     :arg ignore_status: list of HTTP status code that you want to ignore
 
     Any additional keyword arguments will be passed to
@@ -344,7 +349,7 @@ async def async_bulk(
     # make streaming_bulk yield successful results so we can count them
     kwargs["yield_ok"] = True
     async for ok, item in async_streaming_bulk(
-        client, actions, ignore_status=ignore_status, *args, **kwargs  # type: ignore[misc]
+        client, actions, sleep=sleep, ignore_status=ignore_status, *args, **kwargs  # type: ignore[misc]
     ):
         # go through request-response pairs and detect failures
         if not ok: