diff --git a/examples/chat-pubsub-regex.cpp b/examples/chat-pubsub-regex.cpp new file mode 100644 index 0000000..6013eda --- /dev/null +++ b/examples/chat-pubsub-regex.cpp @@ -0,0 +1,167 @@ +/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2012-2023 University of California, Los Angeles + * + * This file is part of ndn-svs, synchronization library for distributed realtime + * applications for NDN. + * + * ndn-svs library is free software: you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free Software + * Foundation, in version 2.1 of the License. + * + * ndn-svs library is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + */ + +#include +#include +#include +#include +#include +#include + +#include + +using namespace ndn::svs; + +struct Options +{ + std::string prefix; + std::string m_id; +}; + +class Program +{ +public: + Program(const Options& options) + : m_options(options) + { + // Use HMAC signing for Sync Interests + // Note: this is not generally recommended, but is used here for simplicity + SecurityOptions secOpts(m_keyChain); + secOpts.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl"); + + // Sign data packets using SHA256 (for simplicity) + secOpts.dataSigner->signingInfo.setSha256Signing(); + + // Do not fetch publications older than 10 seconds + SVSPubSubOptions opts; + opts.useTimestamp = true; + opts.maxPubAge = ndn::time::seconds(10); + + // Create the Pub/Sub instance + m_svsps = std::make_shared( + ndn::Name(m_options.prefix), + ndn::Name(m_options.m_id), + face, + std::bind(&Program::onMissingData, this, _1), + opts, + secOpts); + + std::cout << "SVS client starting: " << m_options.m_id << std::endl; + + // Subscribe to all data packets with prefix /chat (the "topic") + m_svsps->subscribeWithRegex(ndn::Regex("^"), [] (const auto& subData) + { + std::string content(reinterpret_cast(subData.data.data()), subData.data.size()); + std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " << + subData.name << " : "; + if (content.length() > 200) { + std::cout << "[LONG] " << content.length() << " bytes" + << " [" << std::hash{}(content) << "]"; + } else { + std::cout << content; + } + std::cout << std::endl; + }); + } + + void + run() + { + // Begin processing face events in a separate thread. + std::thread svsThread([this] { face.processEvents(); }); + + // Announce our presence. + // Note that the SVS-PS instance is thread-safe. + publishMsg("User " + m_options.m_id + " has joined the groupchat"); + + // Read from stdin and publish messages. + std::string userInput; + while (true) { + std::getline(std::cin, userInput); + publishMsg(userInput); + } + + // Wait for the SVS-PS thread to finish. + svsThread.join(); + } + +protected: + /** + * Callback on receving a new State Vector from another node. + * This will be called regardless of whether the missing data contains any topics + * or producers that we are subscribed to. + */ + void + onMissingData(const std::vector&) + { + // Ignore any other missing data for this example + } + + /** + * Publish a string message to the group + */ + void + publishMsg(const std::string& msg) + { + // Message to send + std::string content = msg; + + // If the message starts with "SEND " generate a new message + // with random content with length after send + if (msg.length() > 5 && msg.substr(0, 5) == "SEND ") { + auto len = std::stoi(msg.substr(5)); + + content = std::string(len, 'a'); + std::srand(std::time(nullptr)); + for (auto& c : content) + c = std::rand() % 26 + 'a'; + + std::cout << "> Sending random message with hash [" << std::hash{}(content) << "]" << std::endl; + } + + // Note that unlike SVSync, names can be arbitrary, + // and need not be prefixed with the producer prefix. + ndn::Name name("chat"); // topic of publication + name.append(m_options.m_id); // who sent this + name.appendTimestamp(); // and when + + m_svsps->publish(name, ndn::make_span(reinterpret_cast(content.data()), content.size())); + } + +private: + const Options m_options; + ndn::Face face; + std::shared_ptr m_svsps; + ndn::KeyChain m_keyChain; +}; + +int +main(int argc, char** argv) +{ + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + return 1; + } + + Options opt; + opt.prefix = "/ndn/svs"; + opt.m_id = argv[1]; + + Program program(opt); + program.run(); + + return 0; +} diff --git a/ndn-svs/svspubsub.cpp b/ndn-svs/svspubsub.cpp index 723c890..a3eaadf 100644 --- a/ndn-svs/svspubsub.cpp +++ b/ndn-svs/svspubsub.cpp @@ -15,11 +15,15 @@ */ #include "svspubsub.hpp" +#include "tlv.hpp" #include #include + + + namespace ndn::svs { SVSPubSub::SVSPubSub(const Name& syncPrefix, @@ -86,10 +90,30 @@ SVSPubSub::publish(const Name& name, span value, data.setContent(value); data.setFreshnessPeriod(freshnessPeriod); m_securityOptions.dataSigner->sign(data); + // if the data size is smaller than MAX_SIZE_OF_PIGGYDATA, add it to the piggyback queue + if (data.wireEncode().size() <= MAX_SIZE_OF_PIGGYDATA) + m_piggyDataQueue.push(data); return publishPacket(data, nodePrefix); } } + +SeqNo +SVSPubSub::publish(const Name& name, + const Name& nodePrefix, time::milliseconds freshnessPeriod, + std::vector mappingBlocks) +{ + // Segment the data if larger than MAX_DATA_SIZE + NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; + SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1; + + // Insert mapping and manually update the sequence number + insertMapping(nid, seqNo, name, mappingBlocks); + m_svsync.getCore().updateSeqNo(seqNo, nid); + + return seqNo; +} + SeqNo SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix, std::vector mappingBlocks) @@ -112,7 +136,7 @@ SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, unsigned long now = std::chrono::duration_cast (std::chrono::system_clock::now().time_since_epoch()).count(); - auto timestamp = Name::Component::fromNumber(now, tlv::TimestampNameComponent); + auto timestamp = Name::Component::fromNumber(now, ndn::tlv::TimestampNameComponent); additional.push_back(timestamp); } @@ -139,6 +163,15 @@ SVSPubSub::subscribe(const Name& prefix, const SubscriptionCallback& callback, b return handle; } +uint32_t +SVSPubSub::subscribeWithRegex(const Regex ®ex, const SubscriptionCallback &callback,bool autofetch, bool packets) +{ + uint32_t handle = ++m_subscriptionCount; + Subscription sub = { handle, ndn::Name(), callback, packets, false, autofetch, std::make_shared(regex)}; + m_regexSubscriptions.push_back(sub); + return handle; +} + uint32_t SVSPubSub::subscribeToProducer(const Name& nodePrefix, const SubscriptionCallback& callback, bool prefetch, bool packets) @@ -190,8 +223,8 @@ SVSPubSub::updateCallbackInternal(const std::vector& info) } } - // Fetch all mappings if we have prefix subscription(s) - if (!m_prefixSubscriptions.empty()) + // Fetch all mappings if we have prefix subscription(s) or regex subscription(s) + if (!m_prefixSubscriptions.empty() or !m_regexSubscriptions.empty()) { MissingDataInfo remainingInfo = stream; @@ -256,7 +289,7 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) // if no timestamp block is present, we just skip this step for (const auto& block : mapping.second) { - if (block.type() != tlv::TimestampNameComponent) + if (block.type() != ndn::tlv::TimestampNameComponent) continue; unsigned long now = @@ -277,8 +310,76 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) { if (sub.prefix.isPrefixOf(mapping.first)) { - m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); - queued = true; + if (sub.autofetch) + { + // try to find in the piggyDataCache + auto data = m_piggyDataCache.find(mapping.first); + if(data != nullptr){ + // return data to subscription + SubscriptionData subData = { + mapping.first, + data->getContent().value_bytes(), + nodeId, + seqNo, + ndn::Data()}; + sub.callback(subData); + } + else + { + // try to fetch from network + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } + } + else + { + SubscriptionData subData = { + mapping.first, + ndn::span{}, + nodeId, + seqNo, + ndn::Data() + }; + sub.callback(subData); + } + } + } + for (auto &sub : m_regexSubscriptions) + { + if (sub.regex->match(mapping.first)) + { + if (sub.autofetch) + { + // try to fetch from network + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } + else + { + // try to find in the piggyDataCache + auto data = m_piggyDataCache.find(mapping.first); + if (data != nullptr) + { + // return data to subscription + SubscriptionData subData = { + mapping.first, + data->getContent().value_bytes(), + nodeId, + seqNo, + ndn::Data()}; + sub.callback(subData); + } + else + { + SubscriptionData subData = { + mapping.first, + ndn::span{}, + nodeId, + seqNo, + ndn::Data()}; + sub.callback(subData); + } + } } } @@ -458,9 +559,32 @@ SVSPubSub::cleanUpFetch(const std::pair& publication) Block SVSPubSub::onGetExtraData(const VersionVector&) { + // Create a block and it's type is tlv::ApplicationParameters + // This block will be sent to the other node as extra data in the Sync Interest + // It contains the notification mapping list and one or a list of piggybacked data packets + ndn::Block block(ndn::tlv::Content); MappingList copy = m_notificationMappingList; + auto mappingBlock = copy.encode(); + block.push_back(mappingBlock); + + size_t size = mappingBlock.size(); + + while (!m_piggyDataQueue.empty()) + { + const auto &data = m_piggyDataQueue.front(); // Access the front element + // If the size of the block is greater than the maximum size of the application parameters, then do not add any more data packets + auto dataBlock = data.wireEncode(); + size = size + dataBlock.size(); + if (size > MAX_SIZE_OF_APPLICATION_PARAMETERS) + break; + block.push_back(dataBlock); + m_piggyDataQueue.pop(); // Remove the front element + } + block.encode(); + m_notificationMappingList = MappingList(); - return copy.encode(); + + return block; } void @@ -468,10 +592,25 @@ SVSPubSub::onRecvExtraData(const Block& block) { try { - MappingList list(block); - for (const auto& p : list.pairs) + block.parse(); + for (const auto &childBlock : block.elements()) { - m_mappingProvider.insertMapping(list.nodeId, p.first, p.second); + // if block is tlv::MappingData, then it's mapping data + if (childBlock.type() == ndn::svs::tlv::MappingData) + { + MappingList list(childBlock); + for (const auto &p : list.pairs) + { + m_mappingProvider.insertMapping(list.nodeId, p.first, p.second); + } + } + // if block is ndn::svs::tlv::PiggybackData, then it's a piggybacked data packet + if (childBlock.type() == ndn::tlv::Data) + { + // Add it to the piggyback data cache + auto dataPtr = std::make_shared(ndn::Data(childBlock)); + m_piggyDataCache.insert(*dataPtr); + } } } catch (const std::exception&) {} diff --git a/ndn-svs/svspubsub.hpp b/ndn-svs/svspubsub.hpp index 941d4da..0cfcb38 100644 --- a/ndn-svs/svspubsub.hpp +++ b/ndn-svs/svspubsub.hpp @@ -24,6 +24,8 @@ #include "svsync.hpp" #include +#include +#include namespace ndn::svs { @@ -116,6 +118,20 @@ class SVSPubSub : noncopyable time::milliseconds freshnessPeriod = FRESH_FOREVER, std::vector mappingBlocks = {}); + /** + * @brief Publish data names only on the pub/sub group. + * + * @param name name for the publication + * @param nodePrefix Name to publish the data under + * @param freshnessPeriod freshness period for the data + * @param mappingBlocks Additional blocks to be published with the mapping (use sparingly) + */ + SeqNo + publish(const Name& name, + const Name& nodePrefix = EMPTY_NAME, + time::milliseconds freshnessPeriod = FRESH_FOREVER, + std::vector mappingBlocks = {}); + /** * @brief Subscribe to a application name prefix. * @@ -128,6 +144,18 @@ class SVSPubSub : noncopyable uint32_t subscribe(const Name& prefix, const SubscriptionCallback& callback, bool packets = false); + /** + * @brief Subscribe with a regex to name. + * + * @param regex regex of the application data + * @param callback Callback when new data is received + * @param packets Subscribe to the raw Data packets instead of BLOBs + * + * @returns Handle to the subscription + */ + uint32_t + subscribeWithRegex(const Regex& regex, const SubscriptionCallback& callback, bool autofetch = true, bool packets = false); + /** * @brief Subscribe to a data producer * @@ -181,6 +209,9 @@ class SVSPubSub : noncopyable SubscriptionCallback callback; bool isPacketSubscription; bool prefetch; + bool autofetch = true; + std::shared_ptr regex; + }; void @@ -241,10 +272,20 @@ class SVSPubSub : noncopyable uint32_t m_subscriptionCount; std::vector m_producerSubscriptions; std::vector m_prefixSubscriptions; + std::vector m_regexSubscriptions; // Queue of publications to fetch std::map, std::vector> m_fetchMap; std::map, bool> m_fetchingMap; + + + size_t MAX_SIZE_OF_APPLICATION_PARAMETERS = 1024; + size_t MAX_SIZE_OF_PIGGYDATA = 800; + bool Enable_PiggyData = true; + // Queue of Pending Piggy Data (to be sent in the next update with sync interest) : First in first out + std::queue m_piggyDataQueue; + // A cache for received piggy data + ndn::InMemoryStorageLru m_piggyDataCache; }; } // namespace ndn::svs