diff --git a/cpp/csp/adapters/kafka/KafkaConsumer.cpp b/cpp/csp/adapters/kafka/KafkaConsumer.cpp index c42fb7717..d4bf987ad 100644 --- a/cpp/csp/adapters/kafka/KafkaConsumer.cpp +++ b/cpp/csp/adapters/kafka/KafkaConsumer.cpp @@ -111,12 +111,16 @@ void KafkaConsumer::start( DateTime starttime ) auto & startOffsetProperty = m_mgr -> startOffsetProperty(); if( std::holds_alternative( startOffsetProperty ) ) { - KafkaStartOffset sOffset = ( KafkaStartOffset ) std::get( startOffsetProperty ); - switch( sOffset ) + ReplayMode replayMode = ( ReplayMode ) std::get( startOffsetProperty ); + switch( replayMode ) { - case KafkaStartOffset::EARLIEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_BEGINNING ); break; - case KafkaStartOffset::LATEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_END ); break; - case KafkaStartOffset::START_TIME: m_rebalanceCb -> setStartTime( starttime ); break; + case ReplayMode::EARLIEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_BEGINNING ); break; + case ReplayMode::LATEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_END ); break; + case ReplayMode::START_TIME: m_rebalanceCb -> setStartTime( starttime ); break; + + case ReplayMode::NUM_TYPES: + case ReplayMode::UNKNOWN: + CSP_THROW( ValueError, "start_offset is unset" ); } } else if( std::holds_alternative( startOffsetProperty ) ) diff --git a/cpp/csp/adapters/kafka/KafkaConsumer.h b/cpp/csp/adapters/kafka/KafkaConsumer.h index 398783418..50cdf4e37 100644 --- a/cpp/csp/adapters/kafka/KafkaConsumer.h +++ b/cpp/csp/adapters/kafka/KafkaConsumer.h @@ -30,13 +30,6 @@ class KafkaConsumer void forceReplayCompleted(); private: - //should align with python side enum - enum class KafkaStartOffset - { - EARLIEST = 1, - LATEST = 2, - START_TIME = 3, - }; struct TopicData { diff --git a/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp b/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp index 5d95176ea..03c95a326 100644 --- a/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp +++ b/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp @@ -82,10 +82,6 @@ void KafkaInputAdapter::processMessage( RdKafka::Message* message, bool live, cs if( ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE ) { msgTime = DateTime::fromMilliseconds( ts.timestamp ); - - //If user requested kafka data earlier than engine start, we will force it as live so it makes it into the engine - if( msgTime < rootEngine() -> startTime() ) - pushLive = true; } else { diff --git a/cpp/csp/engine/Enums.h b/cpp/csp/engine/Enums.h index 9581ba0e0..f7cb13804 100644 --- a/cpp/csp/engine/Enums.h +++ b/cpp/csp/engine/Enums.h @@ -6,7 +6,7 @@ namespace csp { -// NOTE this must align with the python side Enum definition /// +// NOTE these must align with the python side Enum definition /// struct PushModeTraits { enum _enum : unsigned char @@ -25,6 +25,25 @@ struct PushModeTraits using PushMode = Enum; +//ReplayMode is used by PushPull adapters +struct ReplayModeTraits +{ + enum _enum : unsigned char + { + UNKNOWN = 0, + EARLIEST = 1, //Replay all available data + LATEST = 2, //no replay at all, start from latest + START_TIME = 3, //replay from engine start time + + NUM_TYPES + }; + +protected: + _enum m_value; +}; + +using ReplayMode = Enum; + } #endif diff --git a/cpp/csp/engine/PushPullInputAdapter.cpp b/cpp/csp/engine/PushPullInputAdapter.cpp index 051b0a204..5dd1c86aa 100644 --- a/cpp/csp/engine/PushPullInputAdapter.cpp +++ b/cpp/csp/engine/PushPullInputAdapter.cpp @@ -81,8 +81,17 @@ PushPullInputAdapter::PullDataEvent * PushPullInputAdapter::nextPullEvent() auto * event = m_poppedPullEvents.front(); m_poppedPullEvents.pop(); - if( m_adjustOutOfOrderTime && event ) - event -> time = std::max( event -> time, rootEngine() -> now() ); + if( event ) + { + //Always force time before start to start. There are two possibilities: + //- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime + //- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime + if( unlikely( event -> time < rootEngine() -> startTime() ) ) + event -> time = rootEngine() -> startTime(); + + if( m_adjustOutOfOrderTime ) + event -> time = std::max( event -> time, rootEngine() -> now() ); + } return event; } diff --git a/csp/adapters/kafka.py b/csp/adapters/kafka.py index 16054be06..e4f7f6cf0 100644 --- a/csp/adapters/kafka.py +++ b/csp/adapters/kafka.py @@ -17,7 +17,7 @@ RawTextMessageMapper, hash_mutable, ) -from csp.impl.wiring import input_adapter_def, output_adapter_def, status_adapter_def +from csp.impl.wiring import ReplayMode, input_adapter_def, output_adapter_def, status_adapter_def from csp.lib import _kafkaadapterimpl _ = BytesMessageProtoMapper, DateTimeType, JSONTextMessageMapper, RawBytesMessageMapper, RawTextMessageMapper @@ -32,10 +32,8 @@ class KafkaStatusMessageType(IntEnum): GENERIC_ERROR = 4 -class KafkaStartOffset(csp.Enum): - EARLIEST = 1 # Replay all of history - LATEST = 2 # Start from new msgs - START_TIME = 3 # Start from csp run starttime +# Backward compatible +KafkaStartOffset = ReplayMode class KafkaAdapterManager: diff --git a/csp/impl/types/common_definitions.py b/csp/impl/types/common_definitions.py index c08163fbc..ce94a8dd4 100644 --- a/csp/impl/types/common_definitions.py +++ b/csp/impl/types/common_definitions.py @@ -289,6 +289,18 @@ class PushMode(IntEnum): BURST = 3 +class ReplayMode(IntEnum): + """PushPull adapters can take a replay_mode option to specify how to replay data + EARLIEST will replay all available data (Note that data with timestamps before engine start will be forced to playback at starttime ) + LATEST only run from latest data ( effectively, no replay ) + START_TIME playback all data from engine starttime + """ + + EARLIEST = 1 + LATEST = 2 + START_TIME = 3 + + class DuplicatePolicy(IntEnum): """An 'enum' that specifies the policy for handling the last value in functions like value_at.""" diff --git a/csp/impl/wiring/adapters.py b/csp/impl/wiring/adapters.py index 794779e80..47f5e54bb 100644 --- a/csp/impl/wiring/adapters.py +++ b/csp/impl/wiring/adapters.py @@ -7,11 +7,13 @@ from csp.impl.mem_cache import csp_memoized_graph_object from csp.impl.outputadapter import OutputAdapter # noqa: F401 from csp.impl.types import tstype -from csp.impl.types.common_definitions import ArgKind, InputDef, OutputDef, PushMode +from csp.impl.types.common_definitions import ArgKind, InputDef, OutputDef, PushMode, ReplayMode from csp.impl.types.container_type_normalizer import ContainerTypeNormalizer from csp.impl.types.tstype import ts from csp.impl.wiring.signature import Signature +_ = ReplayMode + # Every AdapterDefMeta instance represents an input or output adapter *definition* type class AdapterDefMeta(type):