-
Notifications
You must be signed in to change notification settings - Fork 900
Cpp Programming Guide
The Aeron API is designed to be as simple as possible and no simpler. In this guide, we will walk through a set of applications demonstrating specific points as we do. The entire applications can be found in the locations below. Note the the C++ implementation is client only, the Java Media Driver process must be running to support the C++ and Java clients.
NOTE: The Doxygen doc is the definitive source of documentation. Please consider this guide as only a starting point.
Aeron client applications need to coordinate operation with a running Media Driver. This interaction handles creating Publication
s and Subscription
s and housekeeping. The interaction point for the application is the Aeron
class.
Context context;
Aeron aeron(context);
Alternatively, it is preferred to use an Aeron
instance wrapped in a std::shared_ptr
and so, a static method is provided that handles this easily and mimics
the semantics of the Java API. It uses std::make_shared
as well.
Context context;
std::shared_ptr<Aeron> aeron = Aeron::connect(context);
Settings for the instance may be changed via a Context
instance that is passed into the Aeron::connect
method or passed into the Aeron
constructor.
Similar to the Java API, the C++11 API runs an internal thread for each Aeron
instance. So, the material here applies equally.
Aeron
instances have a set of handlers that might be called for some events. The application can specify these handlers via the Context
instance used to create the Aeron
instance.
-
Context::errorHandler
lets the application specify a lambda to call when errors/exceptions occur. -
Context::availableImageHandler
specifies a lambda to call when new Images are encountered. -
Context::unavailableImageHandler
specifies a lambda to call when an Image becomes inactive. An image is the replication of the publication stream on the subscription side. -
Context::newPublicationHandler
specifies a lambda to call when the media driver successfully adds a Publication. -
Context::newSubscriptionHandler
specifies a lambda to call when the media driver successfully adds a Subscription.
These handlers are called from the ClientConductor
thread.
From BasicSubscriber
:
Context context;
context.newSubscriptionHandler(
[](const std::string& channel, std::int32_t streamId, std::int64_t correlationId)
{
std::cout << "Subscription: " << channel << " " << correlationId << ":" << streamId << std::endl;
});
context.availableImageHandler([](Image &image)
{
std::cout << "Available image correlationId=" << image.correlationId() << " sessionId=" << image.sessionId();
std::cout << " at position=" << image.position() << " from " << image.sourceIdentity() << std::endl;
});
context.unavailableImageHandler([](Image &image)
{
std::cout << "Unavailable image on correlationId=" << image.correlationId() << " sessionId=" << image.sessionId();
std::cout << " at position=" << image.position() << " from " << image.sourceIdentity() << std::endl;
});
std::shared_ptr<Aeron> aeron = Aeron::connect(context);
Accessing and modifying buffers that Aeron uses for sending and receiving of messages is done via an instance of
AtomicBuffer
.
The methods should look familiar to anyone you uses Java ByteBuffer
regularly. However, it can be easily bypassed for those more familiar handling pointers to memory.
An AtomicBuffer
wraps a buffer. This can be done in the constructor or via AtomicBuffer::wrap
. Access to the underlying buffer can be accessed via AtomicBuffer::buffer()
.
From BasicPublisher
, putting some bytes into a buffer:
AERON_DECL_ALIGNED(buffer_t buffer, 16);
concurrent::AtomicBuffer srcBuffer(&buffer[0], buffer.size());
char message[256];
...
const int messageLen = ::snprintf(message, sizeof(message), "Hello World! %ld", i);
srcBuffer.putBytes(0, reinterpret_cast<std::uint8_t *>(message), messageLen);
For a subscriber, grabbing some bytes from a buffer:
(const AtomicBuffer& buffer, util::index_t offset, util::index_t length, const Header& header)
{
...
std::cout << std::string(reinterpret_cast<const char *>(buffer.buffer()) + offset, static_cast<std::size_t>(length)) << ">>" << std::endl;
};
Using a struct to overlay a buffer at an offset:
struct DataLayout
{
std::uint16_t type;
std::uint16_t version;
// ...
};
...
DataLayout& data = buffer.overlayStruct<DataLayout>(0);
data.type = 5;
data.version = 1;
An application that subscribes to data streams needs to use a channel and stream to listen on.
From BasicSubscriber
, listen on a channel and a stream:
std::int64_t id = aeron->addSubscription(settings.channel, settings.streamId);
The Aeron.addSubscription
method is non-blocking in the C++11 API. The method returns an id, called a registrationId
that can be used to determine if the Media Driver has acknowledged the add command successfully, a timeout has occurred, or an error has been returned.
It is the applications responsibility to check for the status. This can be done via Aeron::findSubscription
.
From BasicSubscriber
, wait until Media Driver has responded or an error/timeout has occurred.
std::shared_ptr<Subscription> subscription = aeron->findSubscription(id);
while (!subscription)
{
std::this_thread::yield();
subscription = aeron->findSubscription(id);
}
Messages arrive into Subscription
instances via fragment_handler_t
method calls. The arguments are:
- buffer holding the data
- offset indicating the offset in the buffer that starts the message
- length of the message
- header holding the metadata of the message
Example of printing the contents of a message as a string along with some metadata:
(const AtomicBuffer& buffer, util::index_t offset, util::index_t length, const Header& header)
{
std::cout << "Message to stream " << header.streamId() << " from session " << header.sessionId();
std::cout << "(" << length << "@" << offset << ") <<";
std::cout << std::string(reinterpret_cast<const char *>(buffer.buffer()) + offset, static_cast<std::size_t>(length)) << ">>" << std::endl;
};
Subscribing applications totally control when data is delivered to the fragment_handler_t methods via the Subscription::poll
method. When called, this method determines if there is any fragments to deliver and delivers them via the passed in fragment_handler_t
up to the limit of the number of fragments to deliver before returning.
Example of polling for new messages with a per poll limit of 10 fragments and an Idle Strategy:
fragment_handler_t handler = printStringMessage();
SleepingIdleStrategy idleStrategy(IDLE_SLEEP_MS);
while (running)
{
const int fragmentsRead = subscription->poll(handler, 10);
idleStrategy.idle(fragmentsRead);
}
Publication
instances automatically fragment large messages into data frames that Aeron sends. Subscription
instances that desire these fragments to be reassembled prior to delivery to the fragment_handler_t
by chaining an instance of FragmentAssembler
to do this.
FragmentAssembler fragmentAssembler(printStringMessage());
fragment_handler_t handler = fragmentAssembler.handler();
SleepingIdleStrategy idleStrategy(IDLE_SLEEP_MS);
while (running)
{
const int fragmentsRead = subscription->poll(handler, 10);
idleStrategy.idle(fragmentsRead);
}
NOTE: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.
An application that desires to send data needs to specify a channel and stream to send to.
From BasicPublisher
, send to a channel and a stream:
std::int64_t id = aeron->addPublication(settings.channel, settings.streamId);
The Aeron.addPublication
method is non-blocking in the C++11 API. The method returns an id, called a registrationId
that
can be used to determine if the Media Driver has acknowledged the add command successfully, a timeout has occurred, or an error has been
returned.
It is the applications responsibility to check for the status. This can be done via Aeron::findPublication
.
From BasicPublisher
, wait until Media Driver has responded or an error/timeout has occurred.
std::shared_ptr<Publication> publication = aeron->findPublication(id);
while (!publication)
{
std::this_thread::yield();
publication = aeron->findPublication(id);
}
After successful acknowledgement, the application is free to send data via the Publication::offer
method.
AERON_DECL_ALIGNED(buffer_t buffer, 16);
concurrent::AtomicBuffer srcBuffer(&buffer[0], buffer.size());
char message[256];
...
const int messageLen = ::snprintf(message, sizeof(message), "Hello World! %ld", i);
srcBuffer.putBytes(0, reinterpret_cast<std::uint8_t *>(message), messageLen);
const std::int64_t result = publication->offer(srcBuffer, 0, messageLen);
Aeron has built in back pressure for a publisher. It will not allow a publisher to send data that exceeds proscribed flow control limits.
When calling Publication::offer
a return value greater than 0 indicates the message was sent. Negative values indicate that the messages has not be enqueued for sending. Constants for negative values are as follows:
-
NOT_CONNECTED
means no subscriber is connected to the publication, this can be a transient state as subscribers come and go. -
BACK_PRESSURED
indicates the message was not sent due to back pressure from Subscribers, but can be retried if desired. -
ADMIN_ACTION
indicates the message was not sent due to an administration action, such as log rotation, but can be retried if desired. -
PUBLICATION_CLOSED
indicates the message was not sent due to thePublication
being closed. This is a permanent error. -
MAX_POSITION_EXCEEDED
indicates that the Publication has reached the maximum possible position given the term-length. This is possible with a small term-length. Max position is 2^31 * term-length for a Publication.
The ways that an application may handle back pressure are, by necessity, dependent on the application semantics. Here are a few options. This is not an exhaustive list.
- Retry until success. Keep calling
Publication::offer
until it succeeds. This may spin or have some sort of idle strategy. Many examples do this. - Ignore failure and continue. Ignore that the data didn't send and move on. This is usually appropriate for situations where the data being sent has some lifetime and it would be better to not send stale data.
- Retry until success or timeout. As normal retry with or without some sort of idle strategy but with a timeout attached.
- Retry asynchronously. Retry periodically, but instead of idling, do some other work.
The needs of an application, or system, are quite complex. The common use case is one of non-blocking offer, though. Out of this more complex scenarios may be developed.
The Aeron Media Driver and the status of various buffers may be monitored outside of the driver via the counter files in use by the driver. Below is the major parts of an example application, AeronStat that reads this data and prints it periodically.
Settings settings = parseCmdLine(cp, argc, argv);
MemoryMappedFile::ptr_t cncFile = MemoryMappedFile::mapExistingReadOnly(
(settings.basePath + "/" + CncFileDescriptor::CNC_FILE).c_str());
const std::int32_t cncVersion = CncFileDescriptor::cncVersionVolatile(cncFile);
if (semanticVersionMajor(cncVersion) != semanticVersionMajor(CncFileDescriptor::CNC_VERSION))
{
std::cerr << "CNC version not supported: "
<< " file=" << semanticVersionToString(cncVersion)
<< " app=" << semanticVersionToString(CncFileDescriptor::CNC_VERSION) << std::endl;
return EXIT_FAILURE;
}
const std::int64_t clientLivenessTimeoutNs = CncFileDescriptor::clientLivenessTimeout(cncFile);
const std::int64_t pid = CncFileDescriptor::pid(cncFile);
AtomicBuffer metadataBuffer = CncFileDescriptor::createCounterMetadataBuffer(cncFile);
AtomicBuffer valuesBuffer = CncFileDescriptor::createCounterValuesBuffer(cncFile);
CountersReader counters(metadataBuffer, valuesBuffer);
while(running)
{
time_t rawtime;
char currentTime[80];
::time(&rawtime);
struct tm localTm;
::localtime_r(&rawtime, &localTm);
::strftime(currentTime, sizeof(currentTime) - 1, "%H:%M:%S", &localTm);
std::printf("\033[H\033[2J");
std::printf(
"%s - Aeron Stat (CnC v%s), pid %" PRId64 ", client liveness %s ns\n",
currentTime,
semanticVersionToString(cncVersion).c_str(),
pid,
toStringWithCommas(clientLivenessTimeoutNs).c_str());
std::printf("===========================\n");
counters.forEach([&](std::int32_t counterId, std::int32_t, const AtomicBuffer&, const std::string& l)
{
std::int64_t value = counters.getCounterValue(counterId);
std::printf("%3d: %20s - %s\n", counterId, toStringWithCommas(value).c_str(), l.c_str());
});
std::this_thread::sleep_for(std::chrono::milliseconds(settings.updateIntervalMs));
}
std::cout << "Exiting..." << std::endl;
The AeronStat
application above does the following:
- Find the CnC file in the file system
- Map the file and return
AtomicBuffer
instances that expose the counters metadata and values - Use a
CountersReader
to access the metadata and values. - While running, in a loop do the following:
- Grab the time and print it out along with the version and client liveness value.
- For each counter, print out a line with the counter id, value, and label.