Skip to content

Commit

Permalink
Prototype of fragmented message support.
Browse files Browse the repository at this point in the history
  • Loading branch information
smoriemb authored and smoriemb committed Dec 31, 2022
1 parent 1410a87 commit 9ac80c3
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 18 deletions.
1 change: 1 addition & 0 deletions include/rtps/common/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace rtps {

typedef uint16_t Ip4Port_t;
typedef uint16_t DataSize_t;
typedef uint32_t FragDataSize_t;
typedef int8_t ParticipantId_t; // With UDP only 120 possible

enum class EntityKind_t : uint8_t {
Expand Down
2 changes: 2 additions & 0 deletions include/rtps/entities/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ template <class NetworkDriver> class StatefulWriterT final : public Writer {
void progress() override;
const CacheChange *newChange(ChangeKind_t kind, const uint8_t *data,
DataSize_t size) override;
const CacheChange *newChangeCallback(ChangeKind_t kind,
CacheChange::SerializerCallback func, FragDataSize_t size) override;
void setAllChangesToUnsent() override;
void onNewAckNack(const SubmessageAckNack &msg,
const GuidPrefix_t &sourceGuidPrefix) override;
Expand Down
7 changes: 7 additions & 0 deletions include/rtps/entities/StatefulWriter.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ void StatefulWriterT<NetworkDriver>::removeReaderOfParticipant(
resetSendOptions();
}

template <class NetworkDriver>
const rtps::CacheChange *StatefulWriterT<NetworkDriver>::newChangeCallback(
ChangeKind_t kind, CacheChange::SerializerCallback func, FragDataSize_t size){
// Not supported
return nullptr;
}

template <class NetworkDriver>
const rtps::CacheChange *StatefulWriterT<NetworkDriver>::newChange(
ChangeKind_t kind, const uint8_t *data, DataSize_t size) {
Expand Down
2 changes: 2 additions & 0 deletions include/rtps/entities/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ template <typename NetworkDriver> class StatelessWriterT : public Writer {
void progress() override;
const CacheChange *newChange(ChangeKind_t kind, const uint8_t *data,
DataSize_t size) override;
const CacheChange *newChangeCallback(ChangeKind_t kind,
CacheChange::SerializerCallback func, FragDataSize_t size) override;
void setAllChangesToUnsent() override;
void onNewAckNack(const SubmessageAckNack &msg,
const GuidPrefix_t &sourceGuidPrefix) override;
Expand Down
100 changes: 82 additions & 18 deletions include/rtps/entities/StatelessWriter.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,32 @@ void StatelessWriterT<NetworkDriver>::removeReaderOfParticipant(
resetSendOptions();
}

template <typename NetworkDriver>
const CacheChange *StatelessWriterT<NetworkDriver>::newChangeCallback(
rtps::ChangeKind_t kind, CacheChange::SerializerCallback func,
FragDataSize_t size) {
if (isIrrelevant(kind)) {
return nullptr;
}
Lock lock(m_mutex);

if (m_history.isFull()) {
SequenceNumber_t newMin = ++SequenceNumber_t(m_history.getSeqNumMin());
if (m_nextSequenceNumberToSend < newMin) {
m_nextSequenceNumberToSend =
newMin; // Make sure we have the correct sn to send
}
}

auto *result = m_history.addChange(func, size);
if (mp_threadPool != nullptr) {
mp_threadPool->addWorkload(this);
}

SLW_LOG("Adding new data as a callback function.\n");
return result;
}

template <typename NetworkDriver>
const CacheChange *StatelessWriterT<NetworkDriver>::newChange(
rtps::ChangeKind_t kind, const uint8_t *data, DataSize_t size) {
Expand Down Expand Up @@ -234,15 +260,9 @@ void StatelessWriterT<NetworkDriver>::progress() {
}

for (const auto &proxy : m_proxies) {

SLW_LOG("Progess.\n");
// Do nothing, if someone else sends for me... (Multicast)
if (proxy.useMulticast || !proxy.suppressUnicast || m_enforceUnicast) {
PacketInfo info;
info.srcPort = m_packetInfo.srcPort;

MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix);
MessageFactory::addSubMessageTimeStamp(info.buffer);

{
Lock lock(m_mutex);
Expand All @@ -269,24 +289,68 @@ void StatelessWriterT<NetworkDriver>::progress() {
} else {
reid = proxy.remoteReaderGuid.entityId;
}
MessageFactory::addSubMessageData(info.buffer, next->data, false,

if (next->serializerCallback) {
int fragment_start_number = 1;
FragDataSize_t sampleSize = next->sizeToBeSerialized;
CacheChange::SerializedBuf serialized_buf;
while(1) {
serialized_buf = next->serializerCallback();
if (0 != serialized_buf.second) {
PBufWrapper data;
PacketInfo info;
info.srcPort = m_packetInfo.srcPort;

MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix);
MessageFactory::addSubMessageTimeStamp(info.buffer);
data.reserve(serialized_buf.second);
data.append(serialized_buf.first,
serialized_buf.second);
MessageFactory::addSubMessageDataFrag(info.buffer, data, false,
next->sequenceNumber,
fragment_start_number ++,
serialized_buf.second,
sampleSize,
m_attributes.endpointGuid.entityId,
reid); // TODO
}
// Just usable for IPv4
// Decide which locator to be used unicast/multicast
if (proxy.useMulticast && !m_enforceUnicast) {
info.destAddr = proxy.remoteMulticastLocator.getIp4Address();
info.destPort = (Ip4Port_t)proxy.remoteMulticastLocator.port;
} else {
info.destAddr = proxy.remoteLocator.getIp4Address();
info.destPort = (Ip4Port_t)proxy.remoteLocator.port;
}

m_transport->sendPacket(info);
} else {
break;
}
}
} else {
PacketInfo info;
info.srcPort = m_packetInfo.srcPort;

// Just usable for IPv4
// Decide which locator to be used unicast/multicast
MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix);
MessageFactory::addSubMessageTimeStamp(info.buffer);
MessageFactory::addSubMessageData(info.buffer, next->data, false,
next->sequenceNumber,
m_attributes.endpointGuid.entityId,
reid); // TODO
// Just usable for IPv4
// Decide which locator to be used unicast/multicast
if (proxy.useMulticast && !m_enforceUnicast) {
info.destAddr = proxy.remoteMulticastLocator.getIp4Address();
info.destPort = (Ip4Port_t)proxy.remoteMulticastLocator.port;
} else {
info.destAddr = proxy.remoteLocator.getIp4Address();
info.destPort = (Ip4Port_t)proxy.remoteLocator.port;
}

if (proxy.useMulticast && !m_enforceUnicast) {
info.destAddr = proxy.remoteMulticastLocator.getIp4Address();
info.destPort = (Ip4Port_t)proxy.remoteMulticastLocator.port;
} else {
info.destAddr = proxy.remoteLocator.getIp4Address();
info.destPort = (Ip4Port_t)proxy.remoteLocator.port;
m_transport->sendPacket(info);
}
}

m_transport->sendPacket(info);
}
}

Expand Down
2 changes: 2 additions & 0 deletions include/rtps/entities/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Writer {
virtual void progress() = 0;
virtual const CacheChange *newChange(ChangeKind_t kind, const uint8_t *data,
DataSize_t size) = 0;
virtual const CacheChange *newChangeCallback(ChangeKind_t kind,
CacheChange::SerializerCallback func, FragDataSize_t) = 0;
virtual void setAllChangesToUnsent() = 0;
virtual void onNewAckNack(const SubmessageAckNack &msg,
const GuidPrefix_t &sourceGuidPrefix) = 0;
Expand Down
43 changes: 43 additions & 0 deletions include/rtps/messages/MessageFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,49 @@ void addSubMessageTimeStamp(Buffer &buffer, bool setInvalid = false) {
}
}

template <class Buffer>
void addSubMessageDataFrag(Buffer &buffer, const Buffer &filledPayload,
bool containsInlineQos, const SequenceNumber_t &SN,
uint32_t fragStartingNumber,
uint16_t fragSize,
uint32_t sampleSize,
const EntityId_t &writerID, const EntityId_t &readerID) {
SubmessageDataFrag msg;
msg.header.submessageId = SubmessageKind::DATA_FRAG;
#if IS_LITTLE_ENDIAN
msg.header.flags = FLAG_LITTLE_ENDIAN;
#else
msg.header.flags = FLAG_BIG_ENDIAN;
#endif

msg.header.octetsToNextHeader = 0;

if (containsInlineQos) {
msg.header.flags |= FLAG_INLINE_QOS;
}

msg.writerSN = SN;
msg.extraFlags = 0;
msg.readerId = readerID;
msg.writerId = writerID;
msg.fragStartingNumber = fragStartingNumber;
msg.fragmentsInSubmessage = 1;
msg.fragmentSize = fragSize;
msg.sampleSize = sampleSize;

constexpr uint16_t octetsToInlineQoS =
4 + 4 + 8 + 4 + 2 + 2 + 4;
// EntityIds + SequenceNumber + FragmentNumber + fragmentsInSubmessage + fragmentSize + sampleSize
msg.octetsToInlineQos = octetsToInlineQoS;

serializeMessage(buffer, msg);

if (filledPayload.isValid()) {
Buffer shallowCopy = filledPayload;
buffer.append(std::move(shallowCopy));
}
}

template <class Buffer>
void addSubMessageData(Buffer &buffer, const Buffer &filledPayload,
bool containsInlineQos, const SequenceNumber_t &SN,
Expand Down
56 changes: 56 additions & 0 deletions include/rtps/messages/MessageTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,29 @@ struct SubmessageHeader {
}
};

struct SubmessageDataFrag {
SubmessageHeader header;
uint16_t extraFlags;
uint16_t octetsToInlineQos;
EntityId_t readerId;
EntityId_t writerId;
SequenceNumber_t writerSN;
uint32_t fragStartingNumber;
uint16_t fragmentsInSubmessage;
uint16_t fragmentSize;
uint32_t sampleSize;
static constexpr uint16_t getRawSize() {
return SubmessageHeader::getRawSize()
+ sizeof(uint16_t) + sizeof(uint16_t)
+ (2 * 3 + 2 * 1) // EntityID
+ sizeof(SequenceNumber_t)
+ sizeof(uint32_t)
+ sizeof(uint16_t)
+ sizeof(uint16_t)
+ sizeof(uint32_t);
}
};

struct SubmessageData {
SubmessageHeader header;
uint16_t extraFlags;
Expand Down Expand Up @@ -291,6 +314,39 @@ bool serializeMessage(Buffer &buffer, SubmessageData &msg) {
return true;
}

template <typename Buffer>
bool serializeMessage(Buffer &buffer, SubmessageDataFrag &msg) {
if (!buffer.reserve(SubmessageDataFrag::getRawSize())) {
return false;
}

serializeMessage(buffer, msg.header);

buffer.append(reinterpret_cast<uint8_t *>(&msg.extraFlags), sizeof(uint16_t));
buffer.append(reinterpret_cast<uint8_t *>(&msg.octetsToInlineQos),
sizeof(uint16_t));
buffer.append(msg.readerId.entityKey.data(), msg.readerId.entityKey.size());
buffer.append(reinterpret_cast<uint8_t *>(&msg.readerId.entityKind),
sizeof(EntityKind_t));
buffer.append(msg.writerId.entityKey.data(), msg.writerId.entityKey.size());
buffer.append(reinterpret_cast<uint8_t *>(&msg.writerId.entityKind),
sizeof(EntityKind_t));
buffer.append(reinterpret_cast<uint8_t *>(&msg.writerSN.high),
sizeof(msg.writerSN.high));
buffer.append(reinterpret_cast<uint8_t *>(&msg.writerSN.low),
sizeof(msg.writerSN.low));
buffer.append(reinterpret_cast<uint8_t *>(&msg.fragStartingNumber),
sizeof(uint32_t));
buffer.append(reinterpret_cast<uint8_t *>(&msg.fragmentsInSubmessage),
sizeof(uint16_t));
buffer.append(reinterpret_cast<uint8_t *>(&msg.fragmentSize),
sizeof(uint16_t));
buffer.append(reinterpret_cast<uint8_t *>(&msg.sampleSize),
sizeof(uint32_t));

return true;
}

template <typename Buffer>
bool serializeMessage(Buffer &buffer, SubmessageHeartbeat &msg) {
if (!buffer.reserve(SubmessageHeartbeat::getRawSize())) {
Expand Down
5 changes: 5 additions & 0 deletions include/rtps/storages/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Author: i11 - Embedded Software, RWTH Aachen University
#ifndef PROJECT_CACHECHANGE_H
#define PROJECT_CACHECHANGE_H

#include <functional>
#include "rtps/common/types.h"
#include "rtps/storages/PBufWrapper.h"

Expand All @@ -33,6 +34,10 @@ struct CacheChange {
ChangeKind_t kind = ChangeKind_t::INVALID;
SequenceNumber_t sequenceNumber = SEQUENCENUMBER_UNKNOWN;
PBufWrapper data{};
typedef std::pair<uint8_t *, DataSize_t> SerializedBuf;
typedef std::function<SerializedBuf()> SerializerCallback;
SerializerCallback serializerCallback = nullptr;
FragDataSize_t sizeToBeSerialized;

CacheChange() = default;
CacheChange(ChangeKind_t kind, SequenceNumber_t sequenceNumber)
Expand Down
15 changes: 15 additions & 0 deletions include/rtps/storages/SimpleHistoryCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ template <uint16_t SIZE> class SimpleHistoryCache {
return it == m_tail;
}

const CacheChange *addChange(CacheChange::SerializerCallback func,
FragDataSize_t size) {
CacheChange change;
change.kind = ChangeKind_t::ALIVE;
change.sizeToBeSerialized = size;
change.serializerCallback = func;
change.sequenceNumber = ++m_lastUsedSequenceNumber;

CacheChange *place = &m_buffer[m_head];
incrementHead();

*place = std::move(change);
return place;
}

const CacheChange *addChange(const uint8_t *data, DataSize_t size) {
CacheChange change;
change.kind = ChangeKind_t::ALIVE;
Expand Down

0 comments on commit 9ac80c3

Please # to comment.