Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

ZMQ based state tables #715

Merged
merged 49 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c734d59
Make shmproducerstatetable update redis in thread
liuh-80 Nov 11, 2022
3811db1
Save code change
liuh-80 Nov 11, 2022
9aa1286
Save code change
liuh-80 Nov 14, 2022
b5355ea
Save code
liuh-80 Nov 14, 2022
974bfbf
Fix UT and code issue found by UT
liuh-80 Nov 15, 2022
1286018
Save code change
liuh-80 Nov 16, 2022
67d894a
Hide shm from header file
liuh-80 Nov 17, 2022
86eeb3f
Merge remote-tracking branch 'origin' into dev/liuh/mq_state_table
liuh-80 Nov 17, 2022
3801611
Fix issue
liuh-80 Nov 17, 2022
8b7499c
ZMQ based state tables
liuh-80 Nov 18, 2022
cd10e9a
Save code change
liuh-80 Dec 5, 2022
2b91ba5
Save code
liuh-80 Dec 5, 2022
7ca638f
Fix code bug
liuh-80 Dec 6, 2022
77c300c
Improve code
liuh-80 Dec 6, 2022
a88fc01
improve code
liuh-80 Dec 6, 2022
eae1131
Fix race condition
liuh-80 Dec 7, 2022
30d94e9
Merge remote-tracking branch 'origin' into dev/liuh/zmq_state_table
liuh-80 Dec 7, 2022
bbe2f9d
Fix code
liuh-80 Dec 7, 2022
496392b
Fix UT
liuh-80 Dec 7, 2022
8eb6c38
Fix code
liuh-80 Dec 7, 2022
78e4670
Remove write to redis code from Zmq tables
liuh-80 Dec 9, 2022
2bf5795
Add write redis code to consumer table
liuh-80 Dec 9, 2022
576cbfc
Improve code and UT
liuh-80 Dec 9, 2022
c7d1041
Improve performance by use zmq_poll
liuh-80 Dec 9, 2022
103ef22
Add binary serializer
liuh-80 Jan 9, 2023
9eb3e16
Fix PR comments
liuh-80 Jan 10, 2023
e85a2f2
Fix UT
liuh-80 Jan 10, 2023
53afb3d
Increase poll timeout
liuh-80 Jan 10, 2023
124ab73
Improve UT
liuh-80 Jan 10, 2023
42728f5
Improve UT
liuh-80 Jan 10, 2023
75d55f1
Fix build issue
liuh-80 Jan 13, 2023
2aafa33
Merge remote-tracking branch 'origin' into dev/liuh/zmq_state_table
liuh-80 Feb 17, 2023
08625ee
Merge remote-tracking branch 'origin' into dev/liuh/zmq_state_table
liuh-80 Feb 28, 2023
defe641
Improve code
liuh-80 Feb 28, 2023
a8c8406
update header
liuh-80 Mar 2, 2023
cb1d47b
Fix build issue
liuh-80 Mar 2, 2023
7d88285
Fix code bug
liuh-80 Mar 7, 2023
7461aea
Fix PR comments
liuh-80 Mar 13, 2023
6ccac19
Improve performance by change background use low priority
liuh-80 Mar 17, 2023
2b55423
Improve code by PR comments
liuh-80 Apr 4, 2023
b2bd2e7
Share connection between multiple tables
liuh-80 Apr 10, 2023
465316f
Fix PR comments
liuh-80 Apr 25, 2023
750c2de
Fix code bug
liuh-80 Apr 25, 2023
3458949
Fix code bug
liuh-80 Apr 25, 2023
ac893f2
Fix PR comments
liuh-80 Apr 26, 2023
56b5b03
Delete entry from redis before set.
liuh-80 Apr 26, 2023
1ae2ae6
Improve exception and UT
liuh-80 Apr 26, 2023
4a2274a
Fix PR comments
liuh-80 May 11, 2023
8e12b9c
Fix class inheritance
liuh-80 May 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ common_libswsscommon_la_SOURCES = \
common/json.cpp \
common/producertable.cpp \
common/producerstatetable.cpp \
common/zmqproducerstatetable.cpp \
common/rediscommand.cpp \
common/redistran.cpp \
common/redisselect.cpp \
Expand All @@ -42,6 +43,7 @@ common_libswsscommon_la_SOURCES = \
common/consumertable.cpp \
common/consumertablebase.cpp \
common/consumerstatetable.cpp \
common/zmqconsumerstatetable.cpp \
common/ipaddress.cpp \
common/ipprefix.cpp \
common/ipaddresses.cpp \
Expand All @@ -66,6 +68,8 @@ common_libswsscommon_la_SOURCES = \
common/redisutility.cpp \
common/restart_waiter.cpp \
common/profileprovider.cpp \
common/zmqclient.cpp \
common/zmqserver.cpp \
common/redis_table_waiter.cpp

common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
Expand All @@ -76,6 +80,7 @@ common_swssloglevel_SOURCES = \
common/loglevel.cpp \
common/loglevel_util.cpp


common_swssloglevel_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CODE_COVERAGE_CXXFLAGS)
common_swssloglevel_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CODE_COVERAGE_CPPFLAGS)
common_swssloglevel_LDADD = common/libswsscommon.la $(CODE_COVERAGE_LIBS)
15 changes: 15 additions & 0 deletions common/armhelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef __ARM_HELPER__
#define __ARM_HELPER__

#if defined(__arm__) || defined(__aarch64__)
#define WARNINGS_NO_CAST_ALIGN \
_Pragma ("GCC diagnostic push") \
_Pragma ("GCC diagnostic ignored \"-Wcast-align\"")
#define WARNINGS_RESET \
_Pragma ("GCC diagnostic pop")
#else
#define WARNINGS_NO_CAST_ALIGN
#define WARNINGS_RESET
#endif

#endif
154 changes: 154 additions & 0 deletions common/binaryserializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#ifndef __BINARY_SERIALIZER__
#define __BINARY_SERIALIZER__

#include "common/armhelper.h"

namespace swss {

class BinarySerializer {
public:
static size_t serializeBuffer(
const char* buffer,
const size_t size,
const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
const std::string& tableName)
{
auto tmpSerializer = BinarySerializer(buffer, size);

tmpSerializer.setKeyAndValue(
dbName.c_str(), dbName.length(),
tableName.c_str(), tableName.length());
tmpSerializer.setKeyAndValue(
key.c_str(), key.length(),
command.c_str(), command.length());
for (auto& kvp : values)
{
auto& field = fvField(kvp);
auto& value = fvValue(kvp);
tmpSerializer.setKeyAndValue(
field.c_str(), field.length(),
value.c_str(), value.length());
}

return tmpSerializer.finalize();
}

static void deserializeBuffer(
const char* buffer,
const size_t size,
std::vector<swss::FieldValueTuple>& values)
{
WARNINGS_NO_CAST_ALIGN;
auto pkvp_count = (const size_t*)buffer;
WARNINGS_RESET;

size_t kvp_count = *pkvp_count;
auto tmp_buffer = buffer + sizeof(size_t);
while (kvp_count > 0)
{
kvp_count--;

// read key and value from buffer
WARNINGS_NO_CAST_ALIGN;
auto pkeylen = (const size_t*)tmp_buffer;
WARNINGS_RESET;

tmp_buffer += sizeof(size_t);
if ((size_t)(tmp_buffer - buffer + *pkeylen) > size)
{
SWSS_LOG_THROW("serialized key data was truncated, key length: %zu, increase buffer size: %zu",
*pkeylen,
size);
}

auto pkey = tmp_buffer;
tmp_buffer += *pkeylen;

WARNINGS_NO_CAST_ALIGN;
auto pvallen = (const size_t*)tmp_buffer;
WARNINGS_RESET;

tmp_buffer += sizeof(size_t);
if ((size_t)(tmp_buffer - buffer + *pvallen) > size)
{
SWSS_LOG_THROW("serialized value data was truncated,, value length: %zu increase buffer size: %zu",
*pvallen,
size);
}

auto pval = tmp_buffer;
tmp_buffer += *pvallen;

values.push_back(std::make_pair(pkey, pval));
}
}

private:
const char* m_buffer;
const size_t m_buffer_size;
char* m_current_position;
size_t m_kvp_count;

BinarySerializer(const char* buffer, const size_t size)
: m_buffer(buffer), m_buffer_size(size)
{
resetSerializer();
}

void resetSerializer()
{
m_current_position = const_cast<char*>(m_buffer) + sizeof(size_t);
m_kvp_count = 0;
}

void setKeyAndValue(const char* key, size_t klen,
const char* value, size_t vlen)
{
// to improve deserialize performance, copy null-terminated string.
setData(key, klen + 1);
setData(value, vlen + 1);

m_kvp_count++;
}

size_t finalize()
{
// set key value pair count to message
WARNINGS_NO_CAST_ALIGN;
size_t* pkvp_count = (size_t*)const_cast<char*>(m_buffer);
WARNINGS_RESET;

*pkvp_count = m_kvp_count;

// return size
return m_current_position - m_buffer;
}

void setData(const char* data, size_t datalen)
{
if ((size_t)(m_current_position - m_buffer + datalen + sizeof(size_t)) > m_buffer_size)
{
SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\
key count: %zu, data length %zu, buffer size: %zu",
m_kvp_count,
datalen,
m_buffer_size);
}

WARNINGS_NO_CAST_ALIGN;
size_t* pdatalen = (size_t*)m_current_position;
WARNINGS_RESET;

*pdatalen = datalen;
m_current_position += sizeof(size_t);

memcpy(m_current_position, data, datalen);
m_current_position += datalen;
}
};

}
#endif
12 changes: 1 addition & 11 deletions common/defaultvalueprovider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,7 @@
#include "logger.h"
#include "table.h"
#include "json.h"

#if defined(__arm__) || defined(__aarch64__)
#define WARNINGS_NO_CAST_ALIGN \
_Pragma ("GCC diagnostic push") \
_Pragma ("GCC diagnostic ignored \"-Wcast-align\"")
#define WARNINGS_RESET \
_Pragma ("GCC diagnostic pop")
#else
#define WARNINGS_NO_CAST_ALIGN
#define WARNINGS_RESET
#endif
#include "armhelper.h"

using namespace std;
using namespace swss;
Expand Down
25 changes: 15 additions & 10 deletions common/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,34 @@ class TableBase {
#endif
TableBase(int dbId, const std::string &tableName)
: m_tableName(tableName)
{
m_tableSeparator = getTableSeparator(dbId);
}

TableBase(const std::string &tableName, const std::string &tableSeparator)
: m_tableName(tableName), m_tableSeparator(tableSeparator)
{
static const std::string legalSeparators = ":|";
if (legalSeparators.find(tableSeparator) == std::string::npos)
throw std::invalid_argument("Invalid table name separator");
}

static std::string getTableSeparator(int dbId)
{
/* Look up table separator for the provided DB */
auto it = tableNameSeparatorMap.find(dbId);

if (it != tableNameSeparatorMap.end())
{
m_tableSeparator = it->second;
return it->second;
}
else
{
SWSS_LOG_NOTICE("Unrecognized database ID. Using default table name separator ('%s')", TABLE_NAME_SEPARATOR_VBAR.c_str());
m_tableSeparator = TABLE_NAME_SEPARATOR_VBAR;
return TABLE_NAME_SEPARATOR_VBAR;
}
}

TableBase(const std::string &tableName, const std::string &tableSeparator)
: m_tableName(tableName), m_tableSeparator(tableSeparator)
{
static const std::string legalSeparators = ":|";
if (legalSeparators.find(tableSeparator) == std::string::npos)
throw std::invalid_argument("Invalid table name separator");
}

std::string getTableName() const { return m_tableName; }

/* Return the actual key name as a combination of tableName<table_separator>key */
Expand Down
Loading