Skip to content

Commit

Permalink
add publication cache support (#274)
Browse files Browse the repository at this point in the history
* add publication cache support

* do not build z_pub_cache_example if unstable api is not enabled

* docs update
  • Loading branch information
DenisBiryukov91 authored Nov 5, 2024
1 parent 602bca8 commit e060cf0
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 15 deletions.
3 changes: 2 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ API Reference
serialization_deserialization
channels
interop
shared_memory
shared_memory
ext
21 changes: 21 additions & 0 deletions docs/ext.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
..
.. Copyright (c) 2024 ZettaScale Technology
..
.. This program and the accompanying materials are made available under the
.. terms of the Eclipse Public License 2.0 which is available at
.. http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
.. which is available at https://www.apache.org/licenses/LICENSE-2.0.
..
.. SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
..
.. Contributors:
.. ZettaScale Zenoh Team, <zenoh@zettascale.tech>
..
Extensions
==========
Extra functionality, which is not a part of core Zenoh API.

.. doxygenclass:: zenoh::ext::PublicationCache
:members:
:membergroups: Constructors Operators Methods
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ function(add_examples glob mode lib)
if(${file} MATCHES ".*liveliness.*$")
continue()
endif()
if(${file} MATCHES ".*pub_cache.*$")
continue()
endif()
endif()
if("${mode}" STREQUAL "zenohpico")
if (((${file} MATCHES "^.*pub.*$") OR (${file} MATCHES "^.*delete.*$") OR (${file} MATCHES "^.*put.*$")
Expand Down
77 changes: 77 additions & 0 deletions examples/zenohc/z_pub_cache.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
#include <stdio.h>
#include <string.h>

#include <chrono>
#include <iostream>
#include <limits>
#include <sstream>
#include <thread>

#include "../getargs.h"
#include "zenoh.hxx"

using namespace zenoh;
using namespace std::chrono_literals;

const char *default_value = "Pub from C++ zenoh-c!";
const char *default_keyexpr = "demo/example/zenoh-cpp-zenoh-c-pub";
const char *default_history = "1";
const char *default_prefix = "";

int _main(int argc, char **argv) {
const char *keyexpr = default_keyexpr;
const char *value = default_value;
const char *history = default_history;
const char *prefix = default_prefix;
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}},
{{"-i", {"history", &history}}, {"-x", {"query prefix", &prefix}}});
config.insert_json5(Z_CONFIG_ADD_TIMESTAMP_KEY, "true");

std::cout << "Opening session..." << std::endl;
auto session = Session::open(std::move(config));

std::cout << "Declaring Publication cache on '" << keyexpr << "'..." << std::endl;
Session::PublicationCacheOptions opts;
opts.history = std::atoi(history);
if (!std::string(prefix).empty()) {
opts.queryable_prefix = KeyExpr(prefix);
}
auto pub_cache = session.declare_publication_cache(keyexpr, std::move(opts));

std::cout << "Publication cache on '" << keyexpr << "' declared" << std::endl;

std::cout << "Press CTRL-C to quit..." << std::endl;
for (int idx = 0; idx < std::numeric_limits<int>::max(); ++idx) {
std::this_thread::sleep_for(1s);
std::ostringstream ss;
ss << "[" << idx << "] " << value;
auto s = ss.str();
std::cout << "Putting Data ('" << keyexpr << "': '" << s << "')...\n";
Session::PutOptions put_options;
put_options.encoding = Encoding("text/plain");
session.put(keyexpr, std::move(s), std::move(put_options));
}
return 0;
}

int main(int argc, char **argv) {
try {
init_log_from_env_or("error");
_main(argc, argv);
} catch (ZException e) {
std::cout << "Received an error :" << e.what() << "\n";
}
}
5 changes: 4 additions & 1 deletion include/zenoh/api.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
#include "api/session.hxx"
#include "api/subscriber.hxx"
#include "api/timestamp.hxx"
#if defined Z_FEATURE_SHARED_MEMORY && defined Z_FEATURE_UNSTABLE_API
#if defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API)
#include "api/shm/shm.hxx"
#endif
#include "api/ext/serialization.hxx"
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
#include "api/ext/publication_cache.hxx"
#endif
4 changes: 2 additions & 2 deletions include/zenoh/api/bytes.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class Bytes : public Owned<::z_owned_bytes_t> {
int64_t tell() { return ::z_bytes_reader_tell(&this->_0); }

/// @brief Return the number of bytes that can still be read.
/// @return Number of bytes that can still be read.
/// @return number of bytes that can still be read.
size_t remaining() const { return ::z_bytes_reader_remaining(&this->_0); }

/// @brief Set the reader position indicator to the value pointed to by offset, starting from the current
Expand Down Expand Up @@ -256,7 +256,7 @@ class Bytes : public Owned<::z_owned_bytes_t> {
}

/// @brief Finalize all writes and return underlying ``Bytes`` object.
/// @return Underlying ``Bytes`` object.
/// @return underlying ``Bytes`` object.
Bytes finish() && {
Bytes b;
::z_bytes_writer_finish(interop::as_moved_c_ptr(*this), interop::as_owned_c_ptr(b));
Expand Down
52 changes: 52 additions & 0 deletions include/zenoh/api/ext/publication_cache.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>

#pragma once

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
#include "../base.hxx"
#include "../interop.hxx"

namespace zenoh {
namespace ext {

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief A Zenoh publication cache.
///
/// Used to store publications on intersecting key expressions. Can be queried later via `zenoh::Session::get` to
/// retrieve this data.
/// @note Zenoh-c only
class PublicationCache : public Owned<::ze_owned_publication_cache_t> {
PublicationCache(zenoh::detail::null_object_t) : Owned(nullptr){};
friend struct interop::detail::Converter;

public:
/// @name Methods

/// @brief Get the key expression of the publication cache.
const KeyExpr& get_keyexpr() const {
return interop::as_owned_cpp_ref<KeyExpr>(::ze_publication_cache_keyexpr(interop::as_loaned_c_ptr(*this)));
}

/// @brief Undeclare publication cache.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
void undeclare(ZResult* err = nullptr) && {
__ZENOH_RESULT_CHECK(::ze_undeclare_publication_cache(interop::as_moved_c_ptr(*this)), err,
"Failed to undeclare Publication Cache");
}
};

} // namespace ext
} // namespace zenoh
#endif
22 changes: 18 additions & 4 deletions include/zenoh/api/interop.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ const CopyableType* as_copyable_c_ptr(const Copyable<CopyableType>& cpp_obj) {
}

/// @brief Get zenoh-c representation of std::optional of trivially copyable zenoh-cpp object.
/// @return Pointer to zenoh-c representation of trivially copyable zenoh-cpp object, or NULL if cpp_obj is empty.
/// @return pointer to zenoh-c representation of trivially copyable zenoh-cpp object, or NULL if cpp_obj is empty.
template <class CopyableCppObj>
auto* as_copyable_c_ptr(std::optional<CopyableCppObj>& cpp_obj) {
return cpp_obj.has_value() ? as_copyable_c_ptr(cpp_obj.value()) : nullptr;
}

/// @brief Get zenoh-c representation of std::optional of trivially copyable zenoh-cpp object.
/// @return Pointer to zenoh-c representation of trivially copyable zenoh-cpp object, or NULL if cpp_obj is empty.
/// @return pointer to zenoh-c representation of trivially copyable zenoh-cpp object, or NULL if cpp_obj is empty.
template <class CopyableCppObj>
const auto* as_copyable_c_ptr(const std::optional<CopyableCppObj>& cpp_obj) {
return cpp_obj.has_value() ? as_copyable_c_ptr(cpp_obj.value()) : nullptr;
Expand Down Expand Up @@ -90,6 +90,13 @@ const Loaned* as_loaned_c_ptr(const Owned<OwnedType>& cpp_obj) {
return ::z_loan(*as_owned_c_ptr(cpp_obj));
}

/// @brief Get loaned zenoh-c representation of owned zenoh-cpp object.
/// @return pointer to zenoh-c representation of loaned zenoh-cpp object, or NULL if cpp_obj is empty.
template <class OwnedCppObj>
const auto* as_loaned_c_ptr(const std::optional<OwnedCppObj>& cpp_obj) {
return cpp_obj.has_value() ? as_loaned_c_ptr(*cpp_obj) : nullptr;
}

/// @brief Get loaned zenoh-c representation of owned zenoh-cpp object.
template <class OwnedType,
class Loaned = typename ::z_owned_to_loaned_type_t<OwnedType>::type, // SFINAE here if no loaned type
Expand All @@ -101,14 +108,21 @@ Loaned* as_loaned_c_ptr(Owned<OwnedType>& cpp_obj) {
return ::z_loan_mut(*as_owned_c_ptr(cpp_obj));
}

/// @brief Get loaned zenoh-c representation of owned zenoh-cpp object.
/// @return pointer to zenoh-c representation of loaned zenoh-cpp object, or NULL if cpp_obj is empty.
template <class OwnedCppObj>
auto* as_loaned_c_ptr(std::optional<OwnedCppObj>& cpp_obj) {
return cpp_obj.has_value() ? as_loaned_c_ptr(*cpp_obj) : nullptr;
}

/// @brief Get moved zenoh-c representation of owned zenoh-cpp object.
template <class OwnedType>
auto* as_moved_c_ptr(Owned<OwnedType>& cpp_obj) {
return ::z_move(*as_owned_c_ptr(cpp_obj));
}

/// @brief Get moved zenoh-c representation of std::optional of owned zenoh-cpp object.
/// @return Pointer to zenoh-c representation of moved owned zenoh-cpp object, or NULL if cpp_obj is empty.
/// @return pointer to zenoh-c representation of moved owned zenoh-cpp object, or NULL if cpp_obj is empty.
template <class OwnedCppObj>
auto* as_moved_c_ptr(std::optional<OwnedCppObj>& cpp_obj) {
return cpp_obj.has_value() ? ::z_move(*as_owned_c_ptr(cpp_obj.value())) : nullptr;
Expand Down Expand Up @@ -215,4 +229,4 @@ T into_copyable_cpp_obj(const CopyableType& copyable_c_obj) {
return detail::Converter::copyable_to_cpp<T>(copyable_c_obj);
}

} // namespace zenoh::interop
} // namespace zenoh::interop
8 changes: 4 additions & 4 deletions include/zenoh/api/reply.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ class ReplyError : public Owned<::z_owned_reply_err_t> {
/// @name Methods

/// @brief The payload of this error.
/// @return Error payload.
/// @return error payload.
const Bytes& get_payload() const {
return interop::as_owned_cpp_ref<Bytes>(::z_reply_err_payload(interop::as_loaned_c_ptr(*this)));
}

/// @brief The encoding of this error.
/// @return Error encoding.
/// @return error encoding.
const Encoding& get_encoding() const {
return interop::as_owned_cpp_ref<Encoding>(::z_reply_err_encoding(interop::as_loaned_c_ptr(*this)));
}
Expand All @@ -54,7 +54,7 @@ class Reply : public Owned<::z_owned_reply_t> {
bool is_ok() const { return ::z_reply_is_ok(interop::as_loaned_c_ptr(*this)); }

/// @brief Get the reply sample. Will throw a ZException if ``Reply::is_ok`` returns ``false``.
/// @return Reply sample.
/// @return reply sample.
const Sample& get_ok() const {
if (!::z_reply_is_ok(interop::as_loaned_c_ptr(*this))) {
throw ZException("Reply data sample was requested, but reply contains error", Z_EINVAL);
Expand All @@ -63,7 +63,7 @@ class Reply : public Owned<::z_owned_reply_t> {
}

/// @brief Get the reply error. Will throw a ZException if ``Reply::is_ok`` returns ``true``.
/// @return Reply error.
/// @return reply error.
const ReplyError& get_err() const {
if (::z_reply_is_ok(interop::as_loaned_c_ptr(*this))) {
throw ZException("Reply error was requested, but reply contains data sample", Z_EINVAL);
Expand Down
Loading

0 comments on commit e060cf0

Please # to comment.