-
Notifications
You must be signed in to change notification settings - Fork 899
Java Programming Guide
The Aeron API is designed to be as simple as possible and no simpler. In this guide, we will walk through a set of applications demonstrating specific points as we do. The entire applications can be found in the locations below.
NOTE: The javadoc is the definitive source of documentation. Please consider this guide as only a starting point.
The Aeron Media Driver can be run standalone and handle many applications. However, in some situations, it is desirable to run the media driver within the application.
In this case, a MediaDriver
can be instantiated in the process. Only a single one is needed, but it does require some resources as discussed here.
When running an embedded media driver, it is recommended to set the following via system properties or directly via MediaDriver.Context
passed into MediaDriver.launch
:
- Buffer Locations should point to a specific location as to not interfere with other Media Driver instances and
- Threading Modes should be considered carefully as they will be spawned within the parent process.
An example of starting up an embedded Media Driver.
final MediaDriver driver = MediaDriver.launch();
Aeron client applications need to coordinate operation with a running Media Driver. Either an embedded one or one that is standalone. This interaction handles creating Publication
s and Subscription
s and housekeeping. The interaction point for the application
is the Aeron
class.
final Aeron aeron = Aeron.connect(new Aeron.Context());
Settings for the instance may be changed via an Aeron.Context
instance that is passed into the Aeron.connect
method. As mentioned here.
Aeron
instances have a set of handlers that might be called for some events. The application can specify these handlers via the Aeron.Context
instance used to create the instance.
-
Aeron.Context.errorHandler
lets the application specify a lambda to call when errors/exceptions occur. -
Aeron.Context.availableImageHandler
specifies a lambda to call when images are available. An image is the replication of the publication stream on the subscription side. -
Aeron.Context.unavailableImageHandler
specifies a lambda to call when a image becomes unavailable.
These handlers are called from the ClientConductor
thread.
From BasicSubscriber
:
final Aeron.Context ctx = new Aeron.Context()
.availableImageHandler(SamplesUtil::printAvailableImage)
.unavailableImageHandler(SamplesUtil::printUnavailableImage);
Accessing and modifying buffers that Aeron uses for sending and receiving of messages is done via a set of interfaces.
The methods should look familiar to anyone you uses ByteBuffer
regularly. However, it extends and provides a more appropriate implementation for efficient handling of data layout.
In many cases, the use of UnsafeBuffer
will allow for the most efficient operation. To be useful, a ByteBuffer
, byte[]
, etc. must be wrapped. Once wrapped, then mutation or access of the underlying data can be done.
From BasicPublisher
, putting some bytes into a buffer:
private static final UnsafeBuffer BUFFER = new UnsafeBuffer(ByteBuffer.allocateDirect(256));
...
final String message = "Hello World!";
BUFFER.putBytes(0, message.getBytes());
For a subscriber, grabbing some bytes from a buffer:
(buffer, offset, length, header) ->
{
final byte[] data = new byte[length];
buffer.getBytes(offset, data);
...
}
An application that desires to listen to data needs to use a channel and stream to listen on. A Subscription
aggregates zero or more Image
s for the same channel and stream id. Image
s are identified by session id from unique sources that is encoded in the opaque Image.sourceIdentity()
.
From BasicSubscriber
, listen on channel aeron:udp?endpoint=localhost:40123
and stream 10:
final FragmentHandler fragmentHandler = printStringMessage(10);
final Aeron aeron = Aeron.connect(new Aeron.Context());
final Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:40123", 10);
NOTE: The Aeron.addSubscription
method will block until the Media Driver acknowledges the request or a timeout occurs.
Messages arrive into Image
instances via FragmentHandler
method calls. This interface is a functional interface.
The arguments are:
- buffer holding the data
- offset indicating the offset in the buffer that starts the message
- length of the message
- header holding the metadata of the message
Example of printing the contents of a message as a string along with some metadata:
(buffer, offset, length, header) ->
{
final byte[] data = new byte[length];
buffer.getBytes(offset, data);
System.out.println(
String.format(
"message to stream %d from session %x (%d@%d) <<%s>>",
header.streamId(), header.sessionId(), length, offset, new String(data)));
};
Subscribing applications totally control when data is delivered to the FragmentHandler methods via the Subscription.poll
or Image.poll
methods, Subscription
s delegate polling to the aggregated Image
s. When called, this method determines if there is any messages to deliver and delivers them via the FragmentHandler
interface up to the limit of the number of messages to deliver before returning.
Example of polling for new messages with a per poll limit of 10 messages and an Idle Strategy:
final IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100));
while (...)
{
final int fragmentsRead = subscription.poll(fragmentHandler, 10);
idleStrategy.idle(fragmentsRead);
}
Publication
instances automatically fragment large messages into data frames that Aeron sends. Subscription
instances that desire
these fragments to be reassembled prior to delivery to the FragmentHandler
can use an instance of FragmentAssembler
to do this.
final FragmentHandler fragmentHandler = new FragmentAssembler(printStringMessage(10));
final Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:40123", 10);
NOTE: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.
At times you may wish to take more control in how a Subscription/Image is polled. For example, if you wish to archive a stream of messages in parallel then the Image.blockPoll
or Image.filePoll
can be used to efficiently copy available ranges of messages in a stream to another location.
It is also possible to control the polling action with the Image
or Subscription.controlledPoll
method. This method takes a ControlledFragmentHandler
that returns the action which should be taken after the message fragment is handled.
When handling a fragment with the ControlledFragmentHandler
the following return codes can be used to control the polling action:
-
ABORT
the current polling operation and do not advance the position for this fragment. -
BREAK
from the current polling operation and commit the position as of the end of the current fragment being handled. -
COMMIT
Continue processing but commit the position as of the end of the current fragment so that flow control is applied to this point. -
CONTINUE
Continue processing taking the same approach as the in the standardFragmentHandler
An application that desires to send data needs to specify a channel and stream to send to.
From Basicpublisher
, send to channel aeron:udp?endpoint=localhost:40123
and stream 10:
final Aeron aeron = Aeron.connect(new Aeron.Context());
final Publication publication = aeron.addPublication("aeron:udp?endpoint=localhost:40123", 10);
NOTE: The Aeron.addPublication
method will block until the Media Driver acknowledges the request or a timeout occurs.
Afterward, the application is free to send data via the Publication.offer
method.
private static final UnsafeBuffer BUFFER = new UnsafeBuffer(ByteBuffer.allocateDirect(256));
...
final String message = "Hello World!";
BUFFER.putBytes(0, message.getBytes());
final long resultingPosition = publication.offer(BUFFER, 0, message.getBytes().length);
Aeron has built in back pressure for a publisher. It will not allow a publisher to send data that exceeds proscribed flow control limits.
When calling Publication.offer
a return value greater than 0 indicates the message was sent. Negative values indicate that the messages has not be enqueued for sending. Constants for negative values are as follows:
-
NOT_CONNECTED
means no subscriber has yet connected to the publication. -
BACK_PRESSURED
indicates the message was not sent due to back pressure from Subscribers, but can be retried if desired. -
ADMIN_ACTION
indicates the message was not sent due to an administration action, such as log rotation, but can be retried if desired.
The ways that an application may handle back pressure are, by necessity, dependent on the application semantics. Here are a few options. This is not an exhaustive list.
- Retry until success. Keep calling
Publication.offer
until it succeeds. This may spin or have some sort of idle strategy. Many examples do this. - Ignore failure and continue. Ignore that the data didn't send and move on. This is usually appropriate for situations where the data being sent has some lifetime and it would be better to not send stale data.
- Retry until success or timeout. As normal retry with or without some sort of idle strategy but with a timeout attached.
- Retry asynchronously. Retry periodically, but instead of idling, do some other work.
The needs of an application, or system, are quite complex. The common use case is one of non-blocking offer, though. Out of this more complex scenarios may be developed.
The Aeron Media Driver and the status of various buffers may be monitored outside of the driver via the counter files in use by the driver. Below is an example application that reads this data and prints it periodically.
public class AeronStat
{
public static void main(final String[] args) throws Exception
{
final File cncFile = CommonContext.newDefaultCncFile();
System.out.println("Command `n Control file " + cncFile);
final MappedByteBuffer cncByteBuffer = IoUtil.mapExistingFile(cncFile, "cnc");
final DirectBuffer metaDataBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer);
final int cncVersion = metaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0));
final long clientLiveness = metaDataBuffer.getLong(CncFileDescriptor.clientLivenessTimeoutOffset(0));
if (CncFileDescriptor.CNC_VERSION != cncVersion)
{
throw new IllegalStateException("CNC version not supported: version=" + cncVersion);
}
final AtomicBuffer labelsBuffer = CncFileDescriptor.createCounterLabelsBuffer(cncByteBuffer, metaDataBuffer);
final AtomicBuffer valuesBuffer = CncFileDescriptor.createCounterValuesBuffer(cncByteBuffer, metaDataBuffer);
final CountersManager countersManager = new CountersManager(labelsBuffer, valuesBuffer);
// Setup the SIGINT handler for graceful shutdown
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
while (running.get())
{
System.out.print("\033[H\033[2J");
System.out.format("%1$tH:%1$tM:%1$tS - Aeron Stat", new Date());
System.out.format(" (CnC v%d), client liveness %,d ns\n", cncVersion, clientLiveness);
System.out.println("=========================");
countersManager.forEach(
(id, label) ->
{
final long value = countersManager.getCounterValue(id);
System.out.format("%3d: %,20d - %s\n", id, value, label);
});
Thread.sleep(1000);
}
}
}
The AeronStat
application above does the following:
- Find labels and values files in the file system
- Map the files into
MappedByteBuffer
instances - Use an
UnsafeBuffer
to read the values - Use a CountersManager` to grab context for the values and labels.
- Set up a
SigInt
to handle control-C out of the application - While running, in a loop do the following:
- Grab the time
- For each counter, grab its value and print out a line with the timestamp, label, and value.