-
Notifications
You must be signed in to change notification settings - Fork 899
Java Programming Guide
The Aeron API is designed to be as simple as possible and no simpler. In this guide, we will walk through a set of applications demonstrating specific points as we do. The entire applications can be found in the locations below.
NOTE: The javadoc is the definitive source of documentation. Please consider this guide as only a starting point.
The Aeron Media Driver can be run standalone and handle many applications. However, in some situations, it is desirable to run the media driver within the application.
In this case, a MediaDriver
can be instantiated in the process. Only a single one is needed, but it does require some resources as discussed here.
When running an embedded Media Driver, it is recommended to set the following via system properties or directly via MediaDriver.Context
passed into MediaDriver.launch
:
- Log Buffer Locations, specified by
MediaDriver.Context.aeronDirectoryName()
, should point to a specific location as to not interfere with other Media Driver instances and - Threading Modes should be considered carefully as they will be spawned within the parent process.
An example of starting up an embedded Media Driver.
final MediaDriver driver = MediaDriver.launch();
To guarantee that an embedded Media Driver does not interfere with other Media Drivers, one can use the following launch method
final MediaDriver driver = MediaDriver.launchEmbedded();
The difference is that the latter launches a Media Driver with a randomly generated aeronDirectoryName
if it detects that the
default value has not been changed. This is enough to isolate it from other instances of a Media Driver.
Aeron client applications need to coordinate operation with a running Media Driver. Either an embedded one or one that is standalone. This interaction handles creating Publication
s and Subscription
s and housekeeping. The interaction point for the application
is the Aeron
class.
final Aeron aeron = Aeron.connect(new Aeron.Context());
Settings for the instance may be changed via an Aeron.Context
instance that is passed into the Aeron.connect
method, as mentioned here.
To be able to establish connection with a Media Driver, Aeron must know the Aeron directory name used by the Media Driver.
This can be left unspecified (default value is then used), passed as a system property, or set manually.
When the Media Driver is launched in an embedded mode and the directory is randomly generated, one can use a convenient
method MediaDriver.aeronDirectoryName()
that provides the directory name of the Media Driver. It can be then used to set
Aeron.Context.aeronDirectoryName()
with this value and pass this context to the Aeron.connect
method, as shown below.
final MediaDriver driver = MediaDriver.launchEmbedded();
Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(driver.aeronDirectoryName()))
Aeron
instances have a set of handlers that might be called for some events. The application can specify these handlers via the Aeron.Context
instance used to create the instance.
-
Aeron.Context.errorHandler
lets the application specify a lambda to call when errors/exceptions occur. -
Aeron.Context.availableImageHandler
specifies a lambda to call when images are available. An image is the replication of the publication stream on the subscription side. -
Aeron.Context.unavailableImageHandler
specifies a lambda to call when an image becomes unavailable.
These handlers are called from the ClientConductor
thread.
From BasicSubscriber
:
final Aeron.Context ctx = new Aeron.Context()
.availableImageHandler(SamplesUtil::printAvailableImage)
.unavailableImageHandler(SamplesUtil::printUnavailableImage);
Accessing and modifying buffers that Aeron uses for sending and receiving of messages is done via a set of interfaces.
The methods should look familiar to anyone you uses ByteBuffer
regularly. However, it extends and provides a more appropriate implementation for efficient handling of data layout.
In many cases, the use of UnsafeBuffer
will allow for the most efficient operation. To be useful, a ByteBuffer
, byte[]
, etc. must be wrapped. Once wrapped, then mutation or access of the underlying data can be done.
From BasicPublisher
, putting some bytes into a buffer:
private static final UnsafeBuffer BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
...
final String message = "Hello World!";
BUFFER.putBytes(0, message.getBytes());
For a subscriber, grabbing some bytes from a buffer:
(buffer, offset, length, header) ->
{
final byte[] data = new byte[length];
buffer.getBytes(offset, data);
...
}
An application that desires to listen to data needs to use a channel and stream to listen on. A Subscription
aggregates zero or more Image
s for the same channel and stream id. Image
s are identified by session id from unique sources that is encoded in the opaque Image.sourceIdentity()
.
From BasicSubscriber
, listen on a channel and a stream:
final Aeron aeron = Aeron.connect(new Aeron.Context());
final Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID);
NOTE: The Aeron.addSubscription
method will block until the Media Driver acknowledges the request or a timeout occurs.
Subscribing applications totally control when data is delivered to the FragmentHandler methods via the Subscription.poll
or Image.poll
methods, Subscription
s delegate polling to the matching Image
s. When called, this method determines if there is any messages to deliver and delivers them via the FragmentHandler
interface up to the limit of the number of fragments to deliver before returning.
Example of polling for new messages with a per poll limit of 10 fragments and an Idle Strategy.
final FragmentHandler fragmentHandler = ... // defined below
final IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100));
while (...)
{
final int fragmentsRead = subscription.poll(fragmentHandler, 10);
idleStrategy.idle(fragmentsRead);
}
Messages are read from Image
instances via FragmentHandler
callbacks. This interface is a functional interface.
The arguments are:
- buffer holding the data
- offset indicating the offset in the buffer that starts the message
- length of the message
- header holding the metadata of the message
Example of printing the contents of a message as a string along with some metadata:
final FragmentHandler fragmentHandler = (buffer, offset, length, header) ->
{
final byte[] data = new byte[length];
buffer.getBytes(offset, data);
System.out.println(String.format(
"Message to stream %d from session %d (%d@%d) <<%s>>",
streamId, header.sessionId(), length, offset, new String(data)));
};
Publication
instances automatically fragment large messages into data frames that Aeron sends. Subscription
instances that desire these fragments to be reassembled prior to delivery to the FragmentHandler
can chain an instance of FragmentAssembler
to do this by composition.
FragmentHandler reassemblingFragmentHandler = new FragmentAssembler(fragmentHandler);
final int fragmentsRead = subscription.poll(reassemblingFragmentHandler, 10);
NOTE: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.
NOTE: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy.
At times you may wish to take more control in how a Subscription/Image is polled. For example, if you wish to archive a stream of messages in parallel then the Image.blockPoll
or Image.rawPoll
can be used to efficiently copy available ranges of messages in a stream to another location.
It is also possible to control the polling action with the Image
or Subscription.controlledPoll
method. This method takes a ControlledFragmentHandler
that returns the action which should be taken after the message fragment is handled.
When handling a fragment with the ControlledFragmentHandler
the following return codes can be used to control the polling action:
-
ABORT
the current polling operation and do not advance the position for this fragment. -
BREAK
from the current polling operation and commit the position as of the end of the current fragment being handled. -
COMMIT
Continue processing but commit the position as of the end of the current fragment so that flow control is applied to this point. -
CONTINUE
Continue processing taking the same approach as the in the standardFragmentHandler
An application that desires to send data needs to specify a channel and stream to send to.
From Basicpublisher
, send to a channel and a stream:
final Aeron aeron = Aeron.connect(new Aeron.Context());
final Publication publication = aeron.addPublication(CHANNEL, STREAM_ID);
NOTE: The Aeron.addPublication
method will block until the Media Driver acknowledges the request or a timeout occurs.
Afterwards, the application is free to send data via the Publication.offer
method.
private static final UnsafeBuffer BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
...
final String message = "Hello World!";
BUFFER.putBytes(0, message.getBytes());
final long resultingPosition = publication.offer(BUFFER, 0, message.getBytes().length);
Aeron has built in back pressure for a publisher. It will not allow a publisher to send data that exceeds proscribed flow control limits.
When calling Publication.offer
a return value greater than 0 indicates the message was sent. Negative values indicate that the message has not been enqueued for sending. Constants for negative values are as follows:
-
NOT_CONNECTED
means no subscriber is connected to the publication, this can be a transient state as subscribers come and go. -
BACK_PRESSURED
indicates the message was not sent due to back pressure from Subscribers, but can be retried if desired. -
ADMIN_ACTION
indicates the message was not sent due to an administration action, such as log rotation, but can be retried if desired. -
CLOSED
indicates the Publication has been closed either by another client thread, or if the channel is invalid, or if the client has timed out. -
MAX_POSITION_EXCEEDED
indicates that the Publication has reached the maximum possible position given the term-length. This is possible with a small term-length. Max position is 2^31 * term-length for a Publication.
The ways that an application may handle back pressure are, by necessity, dependent on the application semantics. Here are a few options. This is not an exhaustive list.
- Retry until success. Keep calling
Publication.offer
until it succeeds. This may spin or have some sort of idle strategy. Many examples do this. - Ignore failure and continue. Ignore that the data didn't send and move on. This is usually appropriate for situations where the data being sent has some lifetime and it would be better to not send stale data.
- Retry until success or timeout. As normal retry with or without some sort of idle strategy but with a timeout attached.
- Retry asynchronously. Retry periodically, but instead of idling, do some other work.
The needs of an application, or system, are quite complex. The common use case is one of non-blocking offer, though. Out of this more complex scenarios may be developed.
The Aeron Media Driver and the status of various buffers may be monitored outside of the driver via the counter files in use by the driver. Below is an example application that reads this data and prints it periodically. Full source can be found here.
/**
* Tool for printing out Aeron counters. A command-and-control (CnC) file is maintained by media driver
* in shared memory. This application reads the the cnc file and prints the counters. Layout of the cnc file is
* described in {@link CncFileDescriptor}.
* <p>
* This tool accepts filters on the command line, e.g. for connections only see example below:
* <p>
* <code>
* java -cp aeron-samples/build/libs/samples.jar io.aeron.samples.AeronStat type=[1-9] identity=12345
* </code>
*/
public class AeronStat
{
private static final String ANSI_CLS = "\u001b[2J";
private static final String ANSI_HOME = "\u001b[H";
/**
* The delay in seconds between each update.
*/
private static final String DELAY = "delay";
/**
* Whether to watch for updates or run once.
*/
private static final String WATCH = "watch";
/**
* Types of the counters.
* <ul>
* <li>0: System Counters</li>
* <li>1 - 5, 9, 10, 11: Stream Positions and Indicators</li>
* <li>6 - 7: Channel Endpoint Status</li>
* </ul>
*/
private static final String COUNTER_TYPE_ID = "type";
/**
* The identity of each counter that can either be the system counter id or registration id for positions.
*/
private static final String COUNTER_IDENTITY = "identity";
/**
* Session id filter to be used for position counters.
*/
private static final String COUNTER_SESSION_ID = "session";
/**
* Stream id filter to be used for position counters.
*/
private static final String COUNTER_STREAM_ID = "stream";
/**
* Channel filter to be used for position counters.
*/
private static final String COUNTER_CHANNEL = "channel";
public static void main(final String[] args) throws Exception
{
long delayMs = 1000L;
boolean watch = true;
Pattern typeFilter = null;
Pattern identityFilter = null;
Pattern sessionFilter = null;
Pattern streamFilter = null;
Pattern channelFilter = null;
if (0 != args.length)
{
checkForHelp(args);
for (final String arg : args)
{
final int equalsIndex = arg.indexOf('=');
if (-1 == equalsIndex)
{
System.out.println("Arguments must be in name=pattern format: Invalid '" + arg + "'");
return;
}
final String argName = arg.substring(0, equalsIndex);
final String argValue = arg.substring(equalsIndex + 1);
switch (argName)
{
case WATCH:
watch = Boolean.parseBoolean(argValue);
break;
case DELAY:
delayMs = Long.parseLong(argValue) * 1000L;
break;
case COUNTER_TYPE_ID:
typeFilter = Pattern.compile(argValue);
break;
case COUNTER_IDENTITY:
identityFilter = Pattern.compile(argValue);
break;
case COUNTER_SESSION_ID:
sessionFilter = Pattern.compile(argValue);
break;
case COUNTER_STREAM_ID:
streamFilter = Pattern.compile(argValue);
break;
case COUNTER_CHANNEL:
channelFilter = Pattern.compile(argValue);
break;
default:
System.out.println("Unrecognised argument: '" + arg + "'");
return;
}
}
}
final CncFileReader cncFileReader = CncFileReader.map();
final CounterFilter counterFilter = new CounterFilter(
typeFilter, identityFilter, sessionFilter, streamFilter, channelFilter);
if (watch)
{
workLoop(delayMs, () -> printOutput(cncFileReader, counterFilter));
}
else
{
printOutput(cncFileReader, counterFilter);
}
}
private static void workLoop(final long delayMs, final Runnable outputPrinter) throws Exception
{
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
do
{
clearScreen();
outputPrinter.run();
Thread.sleep(delayMs);
}
while (running.get());
}
private static void printOutput(final CncFileReader cncFileReader, final CounterFilter counterFilter)
{
final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
System.out.print(dateFormat.format(new Date()));
System.out.println(
" - Aeron Stat (CnC v" + cncFileReader.semanticVersion() + ")" +
", pid " + SystemUtil.getPid() +
", heartbeat age " + cncFileReader.driverHeartbeatAgeMs() + "ms");
System.out.println("======================================================================");
final CountersReader counters = cncFileReader.countersReader();
counters.forEach(
(counterId, typeId, keyBuffer, label) ->
{
if (counterFilter.filter(typeId, keyBuffer))
{
final long value = counters.getCounterValue(counterId);
System.out.format("%3d: %,20d - %s%n", counterId, value, label);
}
}
);
System.out.println("--");
}
private static void checkForHelp(final String[] args)
{
for (final String arg : args)
{
if ("-?".equals(arg) || "-h".equals(arg) || "-help".equals(arg))
{
System.out.format(
"Usage: [-Daeron.dir=<directory containing CnC file>] AeronStat%n" +
"\t[delay=<seconds between updates>]%n" +
"\t[watch=<true|false>]%n" +
"filter by optional regex patterns:%n" +
"\t[type=<pattern>]%n" +
"\t[identity=<pattern>]%n" +
"\t[sessionId=<pattern>]%n" +
"\t[streamId=<pattern>]%n" +
"\t[channel=<pattern>]%n");
System.exit(0);
}
}
}
private static void clearScreen() throws Exception
{
if (SystemUtil.osName().contains("win"))
{
new ProcessBuilder("cmd", "/c", "cls").inheritIO().start().waitFor();
}
else
{
System.out.print(ANSI_CLS + ANSI_HOME);
}
}
static class CounterFilter
{
private final Pattern typeFilter;
private final Pattern identityFilter;
private final Pattern sessionFilter;
private final Pattern streamFilter;
private final Pattern channelFilter;
CounterFilter(
final Pattern typeFilter,
final Pattern identityFilter,
final Pattern sessionFilter,
final Pattern streamFilter,
final Pattern channelFilter)
{
this.typeFilter = typeFilter;
this.identityFilter = identityFilter;
this.sessionFilter = sessionFilter;
this.streamFilter = streamFilter;
this.channelFilter = channelFilter;
}
private static boolean match(final Pattern pattern, final Supplier<String> supplier)
{
return null == pattern || pattern.matcher(supplier.get()).find();
}
boolean filter(final int typeId, final DirectBuffer keyBuffer)
{
if (!match(typeFilter, () -> Integer.toString(typeId)))
{
return false;
}
if (SYSTEM_COUNTER_TYPE_ID == typeId && !match(identityFilter, () -> Integer.toString(keyBuffer.getInt(0))))
{
return false;
}
else if ((typeId >= PUBLISHER_LIMIT_TYPE_ID && typeId <= RECEIVER_POS_TYPE_ID) ||
typeId == SENDER_LIMIT_TYPE_ID || typeId == PER_IMAGE_TYPE_ID || typeId == PUBLISHER_POS_TYPE_ID)
{
return
match(identityFilter, () -> Long.toString(keyBuffer.getLong(REGISTRATION_ID_OFFSET))) &&
match(sessionFilter, () -> Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) &&
match(streamFilter, () -> Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) &&
match(channelFilter, () -> keyBuffer.getStringAscii(CHANNEL_OFFSET));
}
else if (typeId >= SEND_CHANNEL_STATUS_TYPE_ID && typeId <= RECEIVE_CHANNEL_STATUS_TYPE_ID)
{
return match(channelFilter, () -> keyBuffer.getStringAscii(ChannelEndpointStatus.CHANNEL_OFFSET));
}
return true;
}
}
}
The AeronStat
application above does the following:
- Find labels and values files in the file system
- Map the files into
MappedByteBuffer
instances - Use an
UnsafeBuffer
to read the values - Use a
CountersReader
to grab context for the values and labels. - Set up a
SigInt
to handle control-C out of the application - While running, in a loop do the following:
- Grab the time
- For each counter, grab its value and print out a line with the timestamp, label, and value.