Skip to content

Commit

Permalink
Improve Consumer interface to support ZMQ based Producer/Consumer tab…
Browse files Browse the repository at this point in the history
…le. (sonic-net#2562)

**What I did**
Improve Consumer interface to support ZMQ based Producer/Consumer table.

**Why I did it**
To improve route create performance, swsscommon lib will add ZMQ based Producer/Consumer table.
Because currently Consumer interface only support Redis based Producer/Consumer table, so improve this interface to support ZMQ based Producer/Consumer table.

ZMQ based Producer/Consumer table can find in this PR: sonic-net/sonic-swss-common#715

**How I verified it**
Pass all UT.

**Details if related**
  • Loading branch information
liuh-80 authored May 12, 2023
1 parent 867e355 commit a215441
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 54 deletions.
61 changes: 30 additions & 31 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ vector<Selectable *> Orch::getSelectables()
return selectables;
}

void Consumer::addToSync(const KeyOpFieldsValuesTuple &entry)
void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -157,7 +157,7 @@ void Consumer::addToSync(const KeyOpFieldsValuesTuple &entry)

}

size_t Consumer::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries)
size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -194,9 +194,7 @@ size_t Consumer::refillToSync(Table* table)

size_t Consumer::refillToSync()
{
ConsumerTableBase *consumerTable = getConsumerTable();

auto subTable = dynamic_cast<SubscriberStateTable *>(consumerTable);
auto subTable = dynamic_cast<SubscriberStateTable *>(getSelectable());
if (subTable != NULL)
{
size_t update_size = 0;
Expand All @@ -213,35 +211,14 @@ size_t Consumer::refillToSync()
else
{
// consumerTable is either ConsumerStateTable or ConsumerTable
auto db = consumerTable->getDbConnector();
string tableName = consumerTable->getTableName();
auto db = getDbConnector();
string tableName = getTableName();
auto table = Table(db, tableName);
return refillToSync(&table);
}
}

void Consumer::execute()
{
SWSS_LOG_ENTER();

size_t update_size = 0;
do
{
std::deque<KeyOpFieldsValuesTuple> entries;
getConsumerTable()->pops(entries);
update_size = addToSync(entries);
} while (update_size != 0);

drain();
}

void Consumer::drain()
{
if (!m_toSync.empty())
m_orch->doTask(*this);
}

string Consumer::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
string ConsumerBase::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
{
string s = getTableName() + getConsumerTable()->getTableNameSeparator() + kfvKey(tuple)
+ "|" + kfvOp(tuple);
Expand All @@ -253,7 +230,7 @@ string Consumer::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
return s;
}

void Consumer::dumpPendingTasks(vector<string> &ts)
void ConsumerBase::dumpPendingTasks(vector<string> &ts)
{
for (auto &tm : m_toSync)
{
Expand All @@ -265,6 +242,28 @@ void Consumer::dumpPendingTasks(vector<string> &ts)
}
}

void Consumer::execute()
{
SWSS_LOG_ENTER();

size_t update_size = 0;
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
do
{
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
update_size = addToSync(entries);
} while (update_size != 0);

drain();
}

void Consumer::drain()
{
if (!m_toSync.empty())
m_orch->doTask(*this);
}

size_t Orch::addExistingData(const string& tableName)
{
auto consumer = dynamic_cast<Consumer *>(getExecutor(tableName));
Expand Down Expand Up @@ -586,7 +585,7 @@ void Orch::logfileReopen()
}
}

void Orch::recordTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple)
void Orch::recordTuple(ConsumerBase &consumer, const KeyOpFieldsValuesTuple &tuple)
{
string s = consumer.dumpTuple(tuple);

Expand Down
63 changes: 40 additions & 23 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,49 +132,66 @@ class Executor : public swss::Selectable
swss::Selectable *getSelectable() const { return m_selectable; }
};

class Consumer : public Executor {
class ConsumerBase : public Executor {
public:
ConsumerBase(swss::Selectable *selectable, Orch *orch, const std::string &name)
: Executor(selectable, orch, name)
{
}

virtual swss::TableBase *getConsumerTable() const = 0;

std::string getTableName() const
{
return getConsumerTable()->getTableName();
}

std::string dumpTuple(const swss::KeyOpFieldsValuesTuple &tuple);
void dumpPendingTasks(std::vector<std::string> &ts);

/* Store the latest 'golden' status */
// TODO: hide?
SyncMap m_toSync;

void addToSync(const swss::KeyOpFieldsValuesTuple &entry);

// Returns: the number of entries added to m_toSync
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);
};

class Consumer : public ConsumerBase {
public:
Consumer(swss::ConsumerTableBase *select, Orch *orch, const std::string &name)
: Executor(select, orch, name)
: ConsumerBase(select, orch, name)
{
}

swss::ConsumerTableBase *getConsumerTable() const
swss::TableBase *getConsumerTable() const override
{
// ConsumerTableBase is a subclass of TableBase
return static_cast<swss::ConsumerTableBase *>(getSelectable());
}

std::string getTableName() const
const swss::DBConnector* getDbConnector() const
{
return getConsumerTable()->getTableName();
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
return table->getDbConnector();
}

int getDbId() const
{
return getConsumerTable()->getDbConnector()->getDbId();
return getDbConnector()->getDbId();
}

std::string getDbName() const
{
return getConsumerTable()->getDbConnector()->getDbName();
return getDbConnector()->getDbName();
}

std::string dumpTuple(const swss::KeyOpFieldsValuesTuple &tuple);
void dumpPendingTasks(std::vector<std::string> &ts);

size_t refillToSync();
size_t refillToSync(swss::Table* table);
void execute();
void drain();

/* Store the latest 'golden' status */
// TODO: hide?
SyncMap m_toSync;

void addToSync(const swss::KeyOpFieldsValuesTuple &entry);

// Returns: the number of entries added to m_toSync
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);
void execute() override;
void drain() override;
};

typedef std::map<std::string, std::shared_ptr<Executor>> ConsumerMap;
Expand Down Expand Up @@ -215,12 +232,12 @@ class Orch
virtual void doTask();

/* Run doTask against a specific executor */
virtual void doTask(Consumer &consumer) = 0;
virtual void doTask(Consumer &consumer) { };
virtual void doTask(swss::NotificationConsumer &consumer) { }
virtual void doTask(swss::SelectableTimer &timer) { }

/* TODO: refactor recording */
static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);
static void recordTuple(ConsumerBase &consumer, const swss::KeyOpFieldsValuesTuple &tuple);

void dumpPendingTasks(std::vector<std::string> &ts);

Expand Down

0 comments on commit a215441

Please # to comment.