Skip to content

Java Programming Guide

Radai Rosenblatt edited this page Apr 12, 2017 · 52 revisions

Java Programming Guide

The Aeron API is designed to be as simple as possible and no simpler. In this guide, we will walk through a set of applications demonstrating specific points as we do. The entire applications can be found in the locations below.

NOTE: The javadoc is the definitive source of documentation. Please consider this guide as only a starting point.

Embedded Media Driver

The Aeron Media Driver can be run standalone and handle many applications. However, in some situations, it is desirable to run the media driver within the application.

In this case, a MediaDriver can be instantiated in the process. Only a single one is needed, but it does require some resources as discussed here.

When running an embedded media driver, it is recommended to set the following via system properties or directly via MediaDriver.Context passed into MediaDriver.launch:

  • Buffer Locations should point to a specific location as to not interfere with other Media Driver instances and
  • Threading Modes should be considered carefully as they will be spawned within the parent process.

An example of starting up an embedded Media Driver.

final MediaDriver driver = MediaDriver.launch();

Aeron

Aeron client applications need to coordinate operation with a running Media Driver. Either an embedded one or one that is standalone. This interaction handles creating Publications and Subscriptions and housekeeping. The interaction point for the application is the Aeron class.

final Aeron aeron = Aeron.connect(new Aeron.Context());

Settings for the instance may be changed via an Aeron.Context instance that is passed into the Aeron.connect method. As mentioned here.

Event Handling

Aeron instances have a set of handlers that might be called for some events. The application can specify these handlers via the Aeron.Context instance used to create the instance.

  • Aeron.Context.errorHandler lets the application specify a lambda to call when errors/exceptions occur.
  • Aeron.Context.availableImageHandler specifies a lambda to call when images are available. An image is the replication of the publication stream on the subscription side.
  • Aeron.Context.unavailableImageHandler specifies a lambda to call when an image becomes unavailable.

These handlers are called from the ClientConductor thread.

From BasicSubscriber:

final Aeron.Context ctx = new Aeron.Context()
    .availableImageHandler(SamplesUtil::printAvailableImage)
    .unavailableImageHandler(SamplesUtil::printUnavailableImage);

DirectBuffer

Accessing and modifying buffers that Aeron uses for sending and receiving of messages is done via a set of interfaces.

The methods should look familiar to anyone you uses ByteBuffer regularly. However, it extends and provides a more appropriate implementation for efficient handling of data layout.

In many cases, the use of UnsafeBuffer will allow for the most efficient operation. To be useful, a ByteBuffer, byte[], etc. must be wrapped. Once wrapped, then mutation or access of the underlying data can be done.

From BasicPublisher, putting some bytes into a buffer:

private static final UnsafeBuffer BUFFER = new UnsafeBuffer(ByteBuffer.allocateDirect(256));

...

    final String message = "Hello World!";
    BUFFER.putBytes(0, message.getBytes());

For a subscriber, grabbing some bytes from a buffer:

(buffer, offset, length, header) ->
{
    final byte[] data = new byte[length];
    buffer.getBytes(offset, data);
	...
}

Subscription

An application that desires to listen to data needs to use a channel and stream to listen on. A Subscription aggregates zero or more Images for the same channel and stream id. Images are identified by session id from unique sources that is encoded in the opaque Image.sourceIdentity().

From BasicSubscriber, listen on channel aeron:udp?endpoint=localhost:40123 and stream 10:

final FragmentHandler fragmentHandler = printStringMessage(10);
final Aeron aeron = Aeron.connect(new Aeron.Context());
final Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:40123", 10);

NOTE: The Aeron.addSubscription method will block until the Media Driver acknowledges the request or a timeout occurs.

FragmentHandler

Messages arrive into Image instances via FragmentHandler method calls. This interface is a functional interface. The arguments are:

  • buffer holding the data
  • offset indicating the offset in the buffer that starts the message
  • length of the message
  • header holding the metadata of the message

Example of printing the contents of a message as a string along with some metadata:

(buffer, offset, length, header) ->
{
    final byte[] data = new byte[length];
    buffer.getBytes(offset, data);

    System.out.println(
        String.format(
            "message to stream %d from session %x (%d@%d) <<%s>>",
            header.streamId(), header.sessionId(), length, offset, new String(data)));
};

Polling

Subscribing applications totally control when data is delivered to the FragmentHandler methods via the Subscription.poll or Image.poll methods, Subscriptions delegate polling to the aggregated Images. When called, this method determines if there is any messages to deliver and delivers them via the FragmentHandler interface up to the limit of the number of messages to deliver before returning.

Example of polling for new messages with a per poll limit of 10 messages and an Idle Strategy:

final IdleStrategy idleStrategy = new BackoffIdleStrategy(
    100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100));

while (...)
{
    final int fragmentsRead = subscription.poll(fragmentHandler, 10);
    idleStrategy.idle(fragmentsRead);
}

Message Reassembly

Publication instances automatically fragment large messages into data frames that Aeron sends. Subscription instances that desire these fragments to be reassembled prior to delivery to the FragmentHandler can use an instance of FragmentAssembler to do this.

final FragmentHandler fragmentHandler = new FragmentAssembler(printStringMessage(10));
final Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:40123", 10);

NOTE: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.

Advanced Polling

At times you may wish to take more control in how a Subscription/Image is polled. For example, if you wish to archive a stream of messages in parallel then the Image.blockPoll or Image.filePoll can be used to efficiently copy available ranges of messages in a stream to another location.

It is also possible to control the polling action with the Image or Subscription.controlledPoll method. This method takes a ControlledFragmentHandler that returns the action which should be taken after the message fragment is handled.

When handling a fragment with the ControlledFragmentHandler the following return codes can be used to control the polling action:

  • ABORT the current polling operation and do not advance the position for this fragment.
  • BREAK from the current polling operation and commit the position as of the end of the current fragment being handled.
  • COMMIT Continue processing but commit the position as of the end of the current fragment so that flow control is applied to this point.
  • CONTINUE Continue processing taking the same approach as the in the standard FragmentHandler

Publication

An application that desires to send data needs to specify a channel and stream to send to.

From Basicpublisher, send to channel aeron:udp?endpoint=localhost:40123 and stream 10:

final Aeron aeron = Aeron.connect(new Aeron.Context());
final Publication publication = aeron.addPublication("aeron:udp?endpoint=localhost:40123", 10);

NOTE: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.

Afterward, the application is free to send data via the Publication.offer method.

private static final UnsafeBuffer BUFFER = new UnsafeBuffer(ByteBuffer.allocateDirect(256));

...
final String message = "Hello World!";
BUFFER.putBytes(0, message.getBytes());

final long resultingPosition = publication.offer(BUFFER, 0, message.getBytes().length);

Handling Back Pressure

Aeron has built in back pressure for a publisher. It will not allow a publisher to send data that exceeds proscribed flow control limits.

When calling Publication.offer a return value greater than 0 indicates the message was sent. Negative values indicate that the messages has not be enqueued for sending. Constants for negative values are as follows:

  • NOT_CONNECTED means no subscriber has yet connected to the publication.
  • BACK_PRESSURED indicates the message was not sent due to back pressure from Subscribers, but can be retried if desired.
  • ADMIN_ACTION indicates the message was not sent due to an administration action, such as log rotation, but can be retried if desired.

The ways that an application may handle back pressure are, by necessity, dependent on the application semantics. Here are a few options. This is not an exhaustive list.

  • Retry until success. Keep calling Publication.offer until it succeeds. This may spin or have some sort of idle strategy. Many examples do this.
  • Ignore failure and continue. Ignore that the data didn't send and move on. This is usually appropriate for situations where the data being sent has some lifetime and it would be better to not send stale data.
  • Retry until success or timeout. As normal retry with or without some sort of idle strategy but with a timeout attached.
  • Retry asynchronously. Retry periodically, but instead of idling, do some other work.

The needs of an application, or system, are quite complex. The common use case is one of non-blocking offer, though. Out of this more complex scenarios may be developed.

Monitoring

The Aeron Media Driver and the status of various buffers may be monitored outside of the driver via the counter files in use by the driver. Below is an example application that reads this data and prints it periodically.

public class AeronStat
{
    public static void main(final String[] args) throws Exception
    {
        final File cncFile = CommonContext.newDefaultCncFile();
        System.out.println("Command `n Control file " + cncFile);

        final MappedByteBuffer cncByteBuffer = IoUtil.mapExistingFile(cncFile, "cnc");
        final DirectBuffer metaDataBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer);
        final int cncVersion = metaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0));
        final long clientLiveness = metaDataBuffer.getLong(CncFileDescriptor.clientLivenessTimeoutOffset(0));

        if (CncFileDescriptor.CNC_VERSION != cncVersion)
        {
            throw new IllegalStateException("CNC version not supported: version=" + cncVersion);
        }

        final AtomicBuffer labelsBuffer = CncFileDescriptor.createCounterLabelsBuffer(cncByteBuffer, metaDataBuffer);
        final AtomicBuffer valuesBuffer = CncFileDescriptor.createCounterValuesBuffer(cncByteBuffer, metaDataBuffer);
        final CountersManager countersManager = new CountersManager(labelsBuffer, valuesBuffer);

        // Setup the SIGINT handler for graceful shutdown
        final AtomicBoolean running = new AtomicBoolean(true);
        SigInt.register(() -> running.set(false));

        while (running.get())
        {
            System.out.print("\033[H\033[2J");
            System.out.format("%1$tH:%1$tM:%1$tS - Aeron Stat", new Date());
            System.out.format(" (CnC v%d), client liveness %,d ns\n", cncVersion, clientLiveness);
            System.out.println("=========================");

            countersManager.forEach(
                (id, label) ->
                {
                    final long value = countersManager.getCounterValue(id);
                    System.out.format("%3d: %,20d - %s\n", id, value, label);
                });

            Thread.sleep(1000);
        }
    }
}

The AeronStat application above does the following:

  1. Find labels and values files in the file system
  2. Map the files into MappedByteBuffer instances
  3. Use an UnsafeBuffer to read the values
  4. Use a CountersManager` to grab context for the values and labels.
  5. Set up a SigInt to handle control-C out of the application
  6. While running, in a loop do the following:
    1. Grab the time
    2. For each counter, grab its value and print out a line with the timestamp, label, and value.