diff --git a/aeron-client/src/main/cpp/Image.h b/aeron-client/src/main/cpp/Image.h index ff7ca15f4f..1b56d27330 100644 --- a/aeron-client/src/main/cpp/Image.h +++ b/aeron-client/src/main/cpp/Image.h @@ -419,6 +419,83 @@ class Image return outcome.fragmentsRead; } + /** + * Poll for new messages in a stream. If new messages are found beyond the last consumed position then they + * will be delivered via the fragment_handler_t up to a limited number of fragments as specified or the + * maximum position specified. + * + * @param fragmentHandler to which messages are delivered. + * @param limitPosition to consume messages up to. + * @param fragmentLimit for the number of fragments to be consumed during one polling operation. + * @return the number of fragments that have been consumed. + * + * @see fragment_handler_t + */ + template + inline int boundedPoll(F&& fragmentHandler, std::int64_t limitPosition, int fragmentLimit) + { + if (isClosed()) + { + return 0; + } + + int fragmentsRead = 0; + const std::int64_t initialPosition = m_subscriberPosition.get(); + const std::int32_t initialOffset = static_cast(initialPosition & m_termLengthMask); + const int index = LogBufferDescriptor::indexByPosition(initialPosition, m_positionBitsToShift); + assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT); + AtomicBuffer &termBuffer = m_termBuffers[index]; + std::int32_t offset = initialOffset; + const std::int64_t capacity = termBuffer.capacity(); + const std::int32_t limitOffset = + static_cast(std::min(capacity, limitPosition - initialPosition + offset)); + + m_header.buffer(termBuffer); + + try + { + while (fragmentsRead < fragmentLimit && offset < limitOffset) + { + const std::int32_t length = FrameDescriptor::frameLengthVolatile(termBuffer, offset); + if (length <= 0) + { + break; + } + + const std::int32_t frameOffset = offset; + const std::int32_t alignedLength = util::BitUtil::align(length, FrameDescriptor::FRAME_ALIGNMENT); + offset += alignedLength; + + if (FrameDescriptor::isPaddingFrame(termBuffer, frameOffset)) + { + continue; + } + + m_header.offset(frameOffset); + + fragmentHandler( + termBuffer, + frameOffset + DataFrameHeader::LENGTH, + length - DataFrameHeader::LENGTH, + m_header); + + ++fragmentsRead; + } + } + catch (const std::exception& ex) + { + m_exceptionHandler(ex); + } + + const std::int64_t resultingPosition = initialPosition + (offset - initialOffset); + if (resultingPosition > initialPosition) + { + m_subscriberPosition.setOrdered(resultingPosition); + } + + return fragmentsRead; + } + /** * Poll for new messages in a stream. If new messages are found beyond the last consumed position then they * will be delivered to the controlled_poll_fragment_handler_t up to a limited number of fragments as specified. diff --git a/aeron-client/src/test/cpp/ImageTest.cpp b/aeron-client/src/test/cpp/ImageTest.cpp index c6d4bd0fdb..9375d58c35 100644 --- a/aeron-client/src/test/cpp/ImageTest.cpp +++ b/aeron-client/src/test/cpp/ImageTest.cpp @@ -295,6 +295,144 @@ TEST_F(ImageTest, shouldEnsureImageIsOpenBeforePoll) EXPECT_EQ(image.poll(m_handler, INT_MAX), 0); } +TEST_F(ImageTest, shouldPollNoFragmentsToBoundedFragmentHandlerWithMaxPositionBeforeInitialPosition) +{ + const std::int32_t messageIndex = 0; + const std::int32_t initialTermOffset = offsetOfFrame(messageIndex); + const std::int64_t initialPosition = LogBufferDescriptor::computePosition( + INITIAL_TERM_ID, initialTermOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + const std::int64_t maxPosition = initialPosition - DataFrameHeader::LENGTH; + + m_subscriberPosition.set(initialPosition); + Image image( + SESSION_ID, CORRELATION_ID, SUBSCRIPTION_REGISTRATION_ID, + SOURCE_IDENTITY, m_subscriberPosition, m_logBuffers, exceptionHandler); + + EXPECT_EQ(m_subscriberPosition.get(), initialPosition); + EXPECT_EQ(image.position(), initialPosition); + + insertDataFrame(INITIAL_TERM_ID, offsetOfFrame(messageIndex)); + insertDataFrame(INITIAL_TERM_ID, offsetOfFrame(messageIndex + 1)); + + EXPECT_CALL(m_fragmentHandler, onFragment(testing::_, testing::_, static_cast(DATA.size()), testing::_)) + .Times(0); + + const int fragments = image.boundedPoll(m_handler, maxPosition, INT_MAX); + EXPECT_EQ(fragments, 0); + EXPECT_EQ(m_subscriberPosition.get(), initialPosition); + EXPECT_EQ(image.position(), initialPosition); +} + +TEST_F(ImageTest, shouldPollFragmentsToBoundedFragmentHandlerWithInitialOffsetNotZero) +{ + const std::int32_t messageIndex = 1; + const std::int32_t initialTermOffset = offsetOfFrame(messageIndex); + const std::int64_t initialPosition = LogBufferDescriptor::computePosition( + INITIAL_TERM_ID, initialTermOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + const std::int64_t maxPosition = initialPosition + ALIGNED_FRAME_LENGTH; + + m_subscriberPosition.set(initialPosition); + Image image( + SESSION_ID, CORRELATION_ID, SUBSCRIPTION_REGISTRATION_ID, + SOURCE_IDENTITY, m_subscriberPosition, m_logBuffers, exceptionHandler); + + EXPECT_EQ(m_subscriberPosition.get(), initialPosition); + EXPECT_EQ(image.position(), initialPosition); + + insertDataFrame(INITIAL_TERM_ID, offsetOfFrame(messageIndex)); + insertDataFrame(INITIAL_TERM_ID, offsetOfFrame(messageIndex + 1)); + + EXPECT_CALL(m_fragmentHandler, onFragment(testing::_, testing::_, static_cast(DATA.size()), testing::_)) + .Times(1); + + const int fragments = image.boundedPoll(m_handler, maxPosition, INT_MAX); + EXPECT_EQ(fragments, 1); + EXPECT_EQ(m_subscriberPosition.get(), maxPosition); + EXPECT_EQ(image.position(), maxPosition); +} + +TEST_F(ImageTest, shouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionBeforeNextMessage) +{ + const std::int32_t messageIndex = 0; + const std::int32_t initialTermOffset = offsetOfFrame(messageIndex); + const std::int64_t initialPosition = LogBufferDescriptor::computePosition( + INITIAL_TERM_ID, initialTermOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + const std::int64_t maxPosition = initialPosition + ALIGNED_FRAME_LENGTH; + + m_subscriberPosition.set(initialPosition); + Image image( + SESSION_ID, CORRELATION_ID, SUBSCRIPTION_REGISTRATION_ID, + SOURCE_IDENTITY, m_subscriberPosition, m_logBuffers, exceptionHandler); + + EXPECT_EQ(m_subscriberPosition.get(), initialPosition); + EXPECT_EQ(image.position(), initialPosition); + + insertDataFrame(INITIAL_TERM_ID, offsetOfFrame(messageIndex)); + insertDataFrame(INITIAL_TERM_ID, offsetOfFrame(messageIndex + 1)); + + EXPECT_CALL(m_fragmentHandler, onFragment(testing::_, testing::_, static_cast(DATA.size()), testing::_)) + .Times(1); + + const int fragments = image.boundedPoll(m_handler, maxPosition, INT_MAX); + EXPECT_EQ(fragments, 1); + EXPECT_EQ(m_subscriberPosition.get(), maxPosition); + EXPECT_EQ(image.position(), maxPosition); +} + +TEST_F(ImageTest, shouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAfterEndOfTerm) +{ + const std::int32_t initialOffset = TERM_LENGTH - (ALIGNED_FRAME_LENGTH * 2); + const std::int64_t initialPosition = LogBufferDescriptor::computePosition( + INITIAL_TERM_ID, initialOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + const std::int64_t maxPosition = initialPosition + TERM_LENGTH; + + m_subscriberPosition.set(initialPosition); + Image image( + SESSION_ID, CORRELATION_ID, SUBSCRIPTION_REGISTRATION_ID, + SOURCE_IDENTITY, m_subscriberPosition, m_logBuffers, exceptionHandler); + + EXPECT_EQ(m_subscriberPosition.get(), initialPosition); + EXPECT_EQ(image.position(), initialPosition); + + insertDataFrame(INITIAL_TERM_ID, initialOffset); + insertPaddingFrame(INITIAL_TERM_ID, initialOffset + ALIGNED_FRAME_LENGTH); + + EXPECT_CALL(m_fragmentHandler, onFragment(testing::_, testing::_, static_cast(DATA.size()), testing::_)) + .Times(1); + + const int fragments = image.boundedPoll(m_handler, maxPosition, INT_MAX); + EXPECT_EQ(fragments, 1); + EXPECT_EQ(m_subscriberPosition.get(), TERM_LENGTH); + EXPECT_EQ(image.position(), TERM_LENGTH); +} + +TEST_F(ImageTest, shouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAboveIntMaxValue) +{ + const std::int32_t initialOffset = TERM_LENGTH - (ALIGNED_FRAME_LENGTH * 2); + const std::int64_t initialPosition = LogBufferDescriptor::computePosition( + INITIAL_TERM_ID, initialOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + const std::int64_t maxPosition = static_cast(INT32_MAX) + 1000; + + m_subscriberPosition.set(initialPosition); + Image image( + SESSION_ID, CORRELATION_ID, SUBSCRIPTION_REGISTRATION_ID, + SOURCE_IDENTITY, m_subscriberPosition, m_logBuffers, exceptionHandler); + + EXPECT_EQ(m_subscriberPosition.get(), initialPosition); + EXPECT_EQ(image.position(), initialPosition); + + insertDataFrame(INITIAL_TERM_ID, initialOffset); + insertPaddingFrame(INITIAL_TERM_ID, initialOffset + ALIGNED_FRAME_LENGTH); + + EXPECT_CALL(m_fragmentHandler, onFragment(testing::_, testing::_, static_cast(DATA.size()), testing::_)) + .Times(1); + + const int fragments = image.boundedPoll(m_handler, maxPosition, INT_MAX); + EXPECT_EQ(fragments, 1); + EXPECT_EQ(m_subscriberPosition.get(), TERM_LENGTH); + EXPECT_EQ(image.position(), TERM_LENGTH); +} + TEST_F(ImageTest, shouldPollNoFragmentsToControlledFragmentHandler) { const std::int32_t messageIndex = 0;