Skip to content

Commit

Permalink
Throw errors if topic or broker are invalid
Browse files Browse the repository at this point in the history
Signed-off-by: Nijat Khanbabayev <nijat.khanbabayev@cubistsystematic.com>
  • Loading branch information
NeejWeej committed Jan 30, 2025
1 parent a82030b commit 61c792d
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 46 deletions.
67 changes: 67 additions & 0 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ KafkaAdapterManager::KafkaAdapterManager( csp::Engine * engine, const Dictionary
{
m_maxThreads = properties.get<uint64_t>( "max_threads" );
m_pollTimeoutMs = properties.get<TimeDelta>( "poll_timeout" ).asMilliseconds();
m_brokerConnectTimeoutMs = properties.get<TimeDelta>( "broker_connect_timeout" ).asMilliseconds();

m_eventCb = std::make_unique<EventCb>( this );
m_producerCb = std::make_unique<DeliveryReportCb>( this );
Expand Down Expand Up @@ -151,6 +152,72 @@ void KafkaAdapterManager::forceConsumerReplayComplete()
consumer -> forceReplayCompleted();
}

void KafkaAdapterManager::fetchMetadata() {
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err;

// Try with producer first if we have one
if ( m_producer ) {
err = m_producer -> metadata(
true, // get all topics
nullptr, // Topic pointer to specific topic
&metadata, // pointer to hold metadata. It must be released by calling delete
m_brokerConnectTimeoutMs // timeout before failing
);
}

// Otherwise try with first consumer
else if (!m_consumerVector.empty()) {
err = m_consumerVector[0].get()->getMetadata(
true,
nullptr,
&metadata,
m_brokerConnectTimeoutMs
);
} else {
CSP_THROW(RuntimeException, "No producer or consumer available to fetch metadata");
}

if (err != RdKafka::ERR_NO_ERROR) {
if (metadata) {
delete metadata;
}
CSP_THROW(RuntimeException, "Failed to get metadata: " << RdKafka::err2str(err));
}

m_metadata.reset(metadata);
}


void KafkaAdapterManager::validateTopic(const std::string& topic){
// This also serves as a validation check for the broker
if (m_validated_topics.find(topic) != m_validated_topics.end()) {
return;
}
if (!m_metadata) {
fetchMetadata();
}
const std::vector<const RdKafka::TopicMetadata*>* topics_vec = m_metadata->topics();
auto it = std::find_if(
topics_vec->begin(),
topics_vec->end(),
[&topic](const RdKafka::TopicMetadata* mt) {
return mt->topic() == topic;
}
);

if (it == topics_vec->end())
CSP_THROW(RuntimeException, "Topic does not exist: " << topic);

const RdKafka::TopicMetadata* topic_metadata = *it;
if (topic_metadata->err() != RdKafka::ERR_NO_ERROR) {
std::stringstream err_msg;
err_msg << "Topic error for " << topic << ": " << RdKafka::err2str(topic_metadata->err());
CSP_THROW(RuntimeException, err_msg.str());
}
m_validated_topics.insert(topic);
}

void KafkaAdapterManager::start( DateTime starttime, DateTime endtime )
{
std::string errstr;
Expand Down
8 changes: 8 additions & 0 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Conf;
class DeliveryReportCb;
class EventCb;
class Producer;
class Metadata;
}

namespace csp::adapters::kafka
Expand Down Expand Up @@ -72,14 +73,18 @@ class KafkaAdapterManager final : public csp::AdapterManager
const Dictionary::Value & startOffsetProperty() const { return m_startOffsetProperty; }

int pollTimeoutMs() const { return m_pollTimeoutMs; }
int brokerConnectTimeoutMs() const { return m_brokerConnectTimeoutMs; }

void forceShutdown( const std::string & err );

void validateTopic(const std::string& topic);

private:

using TopicKeyPair = std::pair<std::string, std::string>;

void setConfProperties( RdKafka::Conf * conf, const Dictionary & properties );
void fetchMetadata();
void pollProducers();
void forceConsumerReplayComplete();

Expand All @@ -102,6 +107,7 @@ class KafkaAdapterManager final : public csp::AdapterManager
Subscribers m_subscribers;

int m_pollTimeoutMs;
int m_brokerConnectTimeoutMs;
size_t m_maxThreads;
size_t m_consumerIdx;

Expand All @@ -114,6 +120,8 @@ class KafkaAdapterManager final : public csp::AdapterManager
std::unique_ptr<RdKafka::Conf> m_consumerConf;
std::unique_ptr<RdKafka::Conf> m_producerConf;
Dictionary::Value m_startOffsetProperty;
std::unique_ptr<RdKafka::Metadata> m_metadata;
std::unordered_set<std::string> m_validated_topics;
};

}
Expand Down
57 changes: 26 additions & 31 deletions cpp/csp/adapters/kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ class RebalanceCb : public RdKafka::RebalanceCb
for( auto * partition : partitions )
numPartitions[ partition -> topic() ] += 1;

for( auto & entry : numPartitions ){
for( auto & entry : numPartitions )
m_consumer.setNumPartitions( entry.first, entry.second );
// Flag wildcard subscribers as complete immediately after getting partition info
if(auto* wildcard = m_consumer.getWildcardSubscriber(entry.first))
wildcard->flagReplayComplete();
}

if( !m_startTime.isNone() )
{
Expand Down Expand Up @@ -93,15 +89,15 @@ KafkaConsumer::KafkaConsumer( KafkaAdapterManager * mgr, const Dictionary & prop
KafkaConsumer::~KafkaConsumer()
{
// in case destructor is called before stop()
stop();
}

KafkaSubscriber* KafkaConsumer::getWildcardSubscriber(const std::string& topic)
{
auto it = m_topics.find(topic);
if(it != m_topics.end())
return it->second.wildcardSubscriber;
return nullptr;
try
{
if( m_running )
stop();
}
catch( const Exception & err )
{
m_mgr -> rootEngine() -> shutdown( std::current_exception() );
}
}

void KafkaConsumer::addSubscriber( const std::string & topic, const std::string & key, KafkaSubscriber * subscriber )
Expand All @@ -117,6 +113,11 @@ void KafkaConsumer::addSubscriber( const std::string & topic, const std::string

void KafkaConsumer::start( DateTime starttime )
{
if( !m_consumer )
{
CSP_THROW( RuntimeException, "Consumer is null" );
}

//RebalanceCb is only used / available if we requested a start_offset
if( m_rebalanceCb )
{
Expand Down Expand Up @@ -151,8 +152,15 @@ void KafkaConsumer::start( DateTime starttime )
forceReplayCompleted();

std::vector<std::string> topics;
for( auto & entry : m_topics )
topics.emplace_back( entry.first );
for (const auto& [topic, topic_data] : m_topics)
{
topics.emplace_back( topic );
// wildcard subscription has no guarantee of being in order
// we flag replay complete as soon as we identify it.
if( topic_data.wildcardSubscriber )
topic_data.wildcardSubscriber -> flagReplayComplete();
m_mgr->validateTopic(topic);
}

RdKafka::ErrorCode err = m_consumer -> subscribe( topics );
if( err )
Expand Down Expand Up @@ -184,21 +192,8 @@ void KafkaConsumer::setNumPartitions( const std::string & topic, size_t num )

void KafkaConsumer::forceReplayCompleted()
{
for( auto & entry : m_topics )
{
auto & topicData = entry.second;
if( !topicData.flaggedReplayComplete )
{
for( auto & subscriberEntry : topicData.subscribers )
{
for( auto * subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}
// Also handle wildcard subscriber if present
if(topicData.wildcardSubscriber)
topicData.wildcardSubscriber->flagReplayComplete();
topicData.flaggedReplayComplete = true;
}
for( auto & entry : m_topics ){
entry.second.markReplayComplete();
}
}

Expand Down
26 changes: 25 additions & 1 deletion cpp/csp/adapters/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ class KafkaConsumer
void setNumPartitions( const std::string & topic, size_t num );

void forceReplayCompleted();
KafkaSubscriber* getWildcardSubscriber(const std::string& topic);

RdKafka::ErrorCode getMetadata(
bool all_topics,
const RdKafka::Topic* only_rkt,
RdKafka::Metadata** metadatap,
int timeout_ms) const
{
return m_consumer->metadata(all_topics, only_rkt, metadatap, timeout_ms);
}

private:
//should align with python side enum
Expand All @@ -47,6 +55,22 @@ class KafkaConsumer
KafkaSubscriber * wildcardSubscriber = nullptr;
std::vector<bool> partitionLive;
bool flaggedReplayComplete = false;

void markReplayComplete() {
if ( !flaggedReplayComplete ) {
// Flag all regular subscribers
for( auto& subscriberEntry : subscribers ) {
for( auto* subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}

// Handle wildcard subscriber if present
if ( wildcardSubscriber )
wildcardSubscriber -> flagReplayComplete();

flaggedReplayComplete = true;
}
}
};

std::unordered_map<std::string,TopicData> m_topics;
Expand Down
10 changes: 7 additions & 3 deletions cpp/csp/adapters/kafka/KafkaPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ PushInputAdapter * KafkaPublisher::getStatusAdapter()
void KafkaPublisher::start( std::shared_ptr<RdKafka::Producer> producer )
{
m_producer = producer;

std::unique_ptr<RdKafka::Conf> tconf( RdKafka::Conf::create( RdKafka::Conf::CONF_TOPIC ) );

std::string errstr;
m_adapterMgr.validateTopic( m_topic ); // make sure we can access topic
// Create topic
std::unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
m_kafkaTopic = std::shared_ptr<RdKafka::Topic>(
RdKafka::Topic::create(m_producer.get(), m_topic, tconf.get(), errstr)
);

m_kafkaTopic = std::shared_ptr<RdKafka::Topic>( RdKafka::Topic::create( m_producer.get(), m_topic, tconf.get(), errstr ) );
if( !m_kafkaTopic )
CSP_THROW( RuntimeException, "Failed to create RdKafka::Topic for producer on topic " << m_topic << ":" << errstr );
Expand Down
2 changes: 2 additions & 0 deletions csp/adapters/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
rd_kafka_conf_options=None,
debug: bool = False,
poll_timeout: timedelta = timedelta(seconds=1),
broker_connect_timeout: timedelta = timedelta(seconds=5),
):
"""
:param broker - broker URL
Expand Down Expand Up @@ -100,6 +101,7 @@ def __init__(
"rd_kafka_conf_properties": conf_properties,
"rd_kafka_consumer_conf_properties": consumer_properties,
"rd_kafka_producer_conf_properties": producer_properties,
"broker_connect_timeout": broker_connect_timeout,
}

if auth:
Expand Down
12 changes: 7 additions & 5 deletions csp/tests/adapters/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ def kafkabroker():


@pytest.fixture(scope="module", autouse=True)
def kafkaadapter(kafkabroker):
group_id = "group.id123"
_kafkaadapter = KafkaAdapterManager(
broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"}
)
def kafkaadapterkwargs(kafkabroker):
return dict(broker=kafkabroker, group_id="group.id123", rd_kafka_conf_options={"allow.auto.create.topics": "true"})


@pytest.fixture(scope="module", autouse=True)
def kafkaadapter(kafkaadapterkwargs):
_kafkaadapter = KafkaAdapterManager(**kafkaadapterkwargs)
return _kafkaadapter
51 changes: 46 additions & 5 deletions csp/tests/adapters/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,17 +372,58 @@ def graph(symbols: list, count: int):
assert [v[1] for v in sub_bytes] == [v[1] for v in pub[:count]]

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
@pytest.fixture(autouse=True)
def test_invalid_topic(self, kafkaadapter):
def test_invalid_topic(self, kafkaadapterkwargs):
class SubData(csp.Struct):
msg: str

kafkaadapter1 = KafkaAdapterManager(**kafkaadapterkwargs)

# Was a bug where engine would stall
def graph():
def graph_sub():
# csp.print('status', kafkaadapter.status())
return kafkaadapter.subscribe(
return kafkaadapter1.subscribe(
ts_type=SubData, msg_mapper=RawTextMessageMapper(), field_map={"": "msg"}, topic="foobar", key="none"
)

# With bug this would deadlock
csp.run(graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
with pytest.raises(RuntimeError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
kafkaadapter2 = KafkaAdapterManager(**kafkaadapterkwargs)

def graph_pub():
msg_mapper = RawTextMessageMapper()
kafkaadapter2.publish(msg_mapper, x=csp.const("heyyyy"), topic="foobar", key="test_key124")

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_pub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
def test_invalid_broker(self, kafkaadapterkwargs):
dict_with_broker = kafkaadapterkwargs.copy()
dict_with_broker["broker"] = "foobar"

kafkaadapter1 = KafkaAdapterManager(**dict_with_broker)

class SubData(csp.Struct):
msg: str

# Was a bug where engine would stall
def graph_sub():
return kafkaadapter1.subscribe(
ts_type=SubData, msg_mapper=RawTextMessageMapper(), field_map={"": "msg"}, topic="foobar", key="none"
)

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

kafkaadapter2 = KafkaAdapterManager(**dict_with_broker)

def graph_pub():
msg_mapper = RawTextMessageMapper()
kafkaadapter2.publish(msg_mapper, x=csp.const("heyyyy"), topic="foobar", key="test_key124")

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_pub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
3 changes: 2 additions & 1 deletion docs/wiki/api-references/Input-Output-Adapters-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ KafkaAdapterManager(
sasl_kerberos_service_name='kafka',
rd_kafka_conf_options=None,
debug: bool = False,
poll_timeout: timedelta = timedelta(seconds=1)
poll_timeout: timedelta = timedelta(seconds=1),
broker_connect_timeout: timedelta = timedelta(seconds=5)
):
```

Expand Down

0 comments on commit 61c792d

Please # to comment.